#!/usr/bin/env python # Load an AlsaModularSynth patch file into Ingen # Copyright 2012-2015 David Robillard # # Permission to use, copy, modify, and/or distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THIS SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import ingen import rdflib import rdflib.namespace import sys ams_prefix = 'http://github.com/blablack/ams-lv2/' fomp_prefix = 'http://drobilla.net/plugins/fomp/' note_uri = 'http://drobilla.net/ns/ingen-internals#Note' class World: def __init__(self, server_uri): self.server_uri = server_uri self.server = ingen.Remote(server_uri) self.pending_arcs = [] self.server.get('/') self.mod_prototypes = {} def mod_sym(self, mod_id): return 'mod%d' % int(mod_id) def add_block(self, mod_id, plugin_uri, x, y): self.mod_prototypes[self.mod_sym(mod_id)] = plugin_uri self.server.put('/' + self.mod_sym(mod_id), ('\t\ta ingen:Block ;\n' + 'lv2:prototype <%s> ;\n' % plugin_uri + 'ingen:canvasX %f ;\n' % x + 'ingen:canvasY %f' % y).replace('\n', '\n\t\t')) def add_arc(self, head_port_id, tail_port_id, head_mod_id, tail_mod_id, jack_color, cable_color): self.pending_arcs += [(head_port_id, tail_port_id, head_mod_id, tail_mod_id, jack_color, cable_color)] def get_ports(self, mod_uri, port_type): ports = [] for i in self.server.model.triples([None, ingen.NS.rdf.type, port_type]): if str(i[0]).startswith(mod_uri + '/'): if not [i[0], ingen.NS.rdf.type, ingen.NS.lv2.ControlPort] in self.server.model: # Unfortunately ingen.NS.lv2.index is a method index = self.server.model.value(i[0], ingen.NS.lv2['index'], None) ports += [[int(index), i[0]]] return ports def input_by_id(self, mod_id, port_id): mod_uri = rdflib.URIRef(self.server.server_base + self.mod_sym(mod_id)) # Get all input ports on this module sorted by index inputs = sorted(self.get_ports(mod_uri, ingen.NS.lv2.InputPort)) # Return the port_id'th port in the list index = 0 for i in inputs: if index == int(port_id): return i[1] index += 1 return None def output_by_id(self, mod_id, port_id): mod_uri = rdflib.URIRef(self.server.server_base + self.mod_sym(mod_id)) # Get all output ports on this module sorted by index outputs = sorted(self.get_ports(mod_uri, ingen.NS.lv2.OutputPort)) port_index = int(port_id) if world.mod_prototypes[self.mod_sym(mod_id)] == note_uri: # Adapt MCV/ADVMCV port index to Note port index port_mapping = [3, 0, 2, 4, 6, 5, -1, -1, -1, -1] port_index = port_mapping[port_index] if port_index == -1: sys.stderr.write('warning: unsupported MCV port %d\n' % int(port_id)) return # Return the port_id'th port in the list if port_index < len(outputs): return outputs[port_index][1] return None def create_arcs(self): for (head_port_id, tail_port_id, head_mod_id, tail_mod_id, jack_color, cable_color) in self.pending_arcs: print('%s:%s => %s:%s' % (tail_mod_id, tail_port_id, head_mod_id, head_port_id)) try: tail = self.output_by_id(tail_mod_id, tail_port_id) head = self.input_by_id(head_mod_id, head_port_id) if tail and head: self.server.connect(self.server.uri_to_path(tail), self.server.uri_to_path(head)) except Exception: pass # Static enumeration of special module type IDs class Special: CUSTOM = 0 LADSPA = 6 MCV = 10 SCMCV = 30 SCQUANTIZER = 31 ADVMCV = 35 # Module types list, indexed by numeric ID in file # Except where otherwise commented, these correspond to internal modules, # and the string is the suffix of the corresponding AMS LV2 plugin URI module_types = [ "custom", # 0 = custom (unsupported) "vco", "vca", "lfo", "delay", "ringmod", "ladspa", # 6 = LADSPA plugin "pcmout", "mix", "vcf", "mcv", "env", "seq", "inv", "noise", "slew", "quantizer", "pcmin", "cvs", "sh", "vcorgan", "dynamicwaves", "advenv", "wavout", "scope", "spectrum", "vcswitch", "jackin", "jackout", "midiout", "scmcv", # Scala module (different line format) "scquantizer", # Scala module (different line format) "stereomix", "conv", "vcenv", "advmcv", "function", "vcpanning", "vcenv2", "vcdoubledecay", "vquant", "amp", "ad", "mphlfo", "noise2", "vco2" ] class Module: def __init__(self, num, plugin_uri, properties={}): self.num = num self.plugin_uri = plugin_uri self.properties = properties self.ports = [] class Patch: def __init__(self): self.modules = [] def ladspa_module(world, mod_id, x, y, poly, lib, label): lv2_uri = '' # Kludge LADSPA library and label to LV2 URIs where applicable if lib == 'blvco': lv2_uri = fomp_prefix + label.lower().replace('-', '_') elif lib == 'mvclpf24' or lib == 'mvchpf24': lv2_uri = fomp_prefix + label.lower().replace('-', '') elif lib == 'cs_chorus' or lib == 'cs_phaser': lv2_uri = fomp_prefix + 'cs_' + label.lower().replace('+', '_') if lv2_uri: world.add_block(mod_id, lv2_uri, x, y) else: print('MOD %3d LADSPA %s %s %s' % (mod_id, poly, lib, label)) def scala_module(world, mod_id, scala_name): sys.stderr.write('warning: scala module %3d (%s) unsupported\n' % (mod_id, scala_name)) def standard_module(world, mod_id, x, y, name, arg): if name == 'vca': if int(arg) > 0: name += 'exp' else: name += 'lin' elif name == 'mix': name += 'er_%dch' % int(arg) lv2_uri = ams_prefix + name world.add_block(mod_id, lv2_uri, x, y) def float_control(world, mod_id, port_index, value, logarithmic, minimum, maximum, midi_sign): # print('FLOAT CONTROL %s:%s = %s' % (mod_id, port_index, value)) pass def control(world, mod_id, port_index, value, midi_sign): # print('CONTROL %s:%s = %s' % (mod_id, port_index, value)) pass if len(sys.argv) != 2 and len(sys.argv) != 3: sys.stderr.write('Usage: %s AMS_PATCH_FILE [SERVER_URI]\n' % sys.argv[0]) sys.exit(1) in_path = sys.argv[1] server_uri = 'unix:///tmp/ingen.sock' if len(sys.argv) == 3: server_uri = sys.argv[2] world = World(server_uri) in_file = open(in_path, 'r') in_comment = False for l in in_file: try: expr = l.split() if not expr: continue elif expr[0] == '#PARA#': in_comment = True elif in_comment and expr[0] == '#ARAP#': in_comment = False elif expr[0] == 'Module': mod_type = int(expr[1]) mod_id = int(expr[2]) mod_x = int(expr[3]) mod_y = int(expr[4]) if mod_type > len(module_types): sys.stderr.write('warning: unknown module type %d\n', mod_type) elif mod_type == Special.CUSTOM: sys.stderr.write('warning: custom module %d unsupported\n' % mod_id) if mod_type == Special.LADSPA: ladspa_module(world, mod_id, mod_x, mod_y, int(expr[5]), expr[6], expr[7]) elif mod_type == Special.SCMCV or mod_type == Special.SCQUANTIZER: scala_name = expr[5] scala_module(world, mod_id, scala_name) elif mod_type == Special.MCV or mod_type == Special.ADVMCV: world.add_block(mod_id, note_uri, mod_x, mod_y) else: standard_module(world, mod_id, mod_x, mod_y, module_types[mod_type], expr[5]) elif expr[0] == 'ColorP': world.add_arc(expr[1], expr[2], expr[3], expr[4], (expr[5], expr[6], expr[7]), (expr[8], expr[9], expr[10])) elif expr[0] == 'FSlider': float_control(world, mod_id, expr[2], expr[3], expr[4], expr[5], expr[6], expr[7]) elif expr[0] == 'ISlider' or expr[0] == 'LSlider': control(world, mod_id, expr[2], expr[3], expr[4]) # else: # sys.stderr.write('warning: unsupported form %s\n' % expr[0]) except ingen.Error: e = sys.exc_info()[1] sys.stderr.write('ingen error: %s\n' % e.message) world.create_arcs() # print(world.server.model.serialize(format='n3')) in_file.close()