diff options
Diffstat (limited to 'scripts')
-rw-r--r-- | scripts/ingen.py | 308 | ||||
-rwxr-xr-x | scripts/ingenams | 283 | ||||
-rwxr-xr-x | scripts/ingenish | 121 |
3 files changed, 712 insertions, 0 deletions
diff --git a/scripts/ingen.py b/scripts/ingen.py new file mode 100644 index 00000000..594a7c01 --- /dev/null +++ b/scripts/ingen.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python +# Ingen Python Interface +# Copyright 2012-2015 David Robillard <http://drobilla.net> +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THIS SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +import os +import rdflib +import re +import socket +import sys + +try: + import StringIO.StringIO as StringIO +except ImportError: + from io import StringIO as StringIO + +class NS: + atom = rdflib.Namespace('http://lv2plug.in/ns/ext/atom#') + ingen = rdflib.Namespace('http://drobilla.net/ns/ingen#') + ingerr = rdflib.Namespace('http://drobilla.net/ns/ingen/errors#') + lv2 = rdflib.Namespace('http://lv2plug.in/ns/lv2core#') + patch = rdflib.Namespace('http://lv2plug.in/ns/ext/patch#') + rdf = rdflib.Namespace('http://www.w3.org/1999/02/22-rdf-syntax-ns#') + rsz = rdflib.Namespace('http://lv2plug.in/ns/ext/resize-port#') + xsd = rdflib.Namespace('http://www.w3.org/2001/XMLSchema#') + +class Interface: + 'The core Ingen interface' + def put(self, subject, body): + pass + + def patch(self, subject, remove, add): + pass + + def get(self, subject): + pass + + def set(self, subject, key, value): + pass + + def connect(self, tail, head): + pass + + def disconnect(self, tail, head): + pass + + def delete(self, subject): + pass + +class Error(Exception): + def __init__(self, msg, cause): + Exception.__init__(self, '%s; cause: %s' % (msg, cause)) + +def lv2_path(): + path = os.getenv('LV2_PATH') + if path: + return path + elif sys.platform == 'darwin': + return os.pathsep.join(['~/Library/Audio/Plug-Ins/LV2', + '~/.lv2', + '/usr/local/lib/lv2', + '/usr/lib/lv2', + '/Library/Audio/Plug-Ins/LV2']) + elif sys.platform == 'haiku': + return os.pathsep.join(['~/.lv2', + '/boot/common/add-ons/lv2']) + elif sys.platform == 'win32': + return os.pathsep.join([ + os.path.join(os.getenv('APPDATA'), 'LV2'), + os.path.join(os.getenv('COMMONPROGRAMFILES'), 'LV2')]) + else: + return os.pathsep.join(['~/.lv2', + '/usr/lib/lv2', + '/usr/local/lib/lv2']) + +def ingen_bundle_path(): + for d in lv2_path().split(os.pathsep): + bundle = os.path.abspath(os.path.join(d, 'ingen.lv2')) + if os.path.exists(bundle): + return bundle + return None + +class Remote(Interface): + def __init__(self, uri='unix:///tmp/ingen.sock'): + self.msg_id = 1 + self.server_base = uri + '/' + self.model = rdflib.Graph() + self.ns_manager = rdflib.namespace.NamespaceManager(self.model) + self.ns_manager.bind('server', self.server_base) + for (k, v) in NS.__dict__.items(): + self.ns_manager.bind(k, v) + if uri.startswith('unix://'): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.connect(uri[len('unix://'):]) + elif uri.startswith('tcp://'): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + parsed = re.split('[:/]', uri[len('tcp://'):]) + addr = (parsed[0], int(parsed[1])) + self.sock.connect(addr) + else: + raise Exception('Unsupported server URI `%s' % uri) + + # Parse error description from Ingen bundle for pretty printing + bundle = ingen_bundle_path() + if bundle: + self.model.parse(os.path.join(bundle, 'errors.ttl'), format='n3') + + def __del__(self): + self.sock.close() + + def _get_prefixes_string(self): + s = '' + for k, v in self.ns_manager.namespaces(): + s += '@prefix %s: <%s> .\n' % (k, v) + return s + + def msgencode(self, msg): + if sys.version_info[0] == 3: + return bytes(msg, 'utf-8') + else: + return msg + + def update_model(self, update): + for i in update.triples([None, NS.rdf.type, NS.patch.Put]): + put = i[0] + subject = update.value(put, NS.patch.subject, None) + body = update.value(put, NS.patch.body, None) + desc = {} + for i in update.triples([body, None, None]): + self.model.add([subject, i[1], i[2]]) + return update + + def uri_to_path(self, uri): + path = uri + if uri.startswith(self.server_base): + return uri[len(self.server_base)-1:] + return uri + + def recv(self): + 'Read from socket until a null terminator is received' + msg = u'' + while True: + c = self.sock.recv(1, 0).decode('utf-8') + if not c or ord(c[0]) == 0: # End of transmission + break + else: + msg += c[0] + return msg + + def blank_closure(self, graph, node): + def blank_walk(node, g): + for i in g.triples([node, None, None]): + if type(i[2]) == rdflib.BNode and i[2] != node: + yield i[2] + blank_walk(i[2], g) + + closure = [node] + for b in graph.transitiveClosure(blank_walk, node): + closure += [b] + + return closure + + def raise_error(self, code, cause): + klass = self.model.value(None, NS.ingerr.errorCode, rdflib.Literal(code)) + if not klass: + raise Error('error %d' % code, cause) + + fmt = self.model.value(klass, NS.ingerr.formatString, None) + if not fmt: + raise Error('%s' % klass, cause) + + raise Error(fmt, cause) + + def send(self, msg): + # Send message to server + payload = msg + if sys.version_info[0] == 3: + payload = bytes(msg, 'utf-8') + self.sock.send(self.msgencode(msg)) + + # Receive response and parse into a model + response_str = self._get_prefixes_string() + self.recv() + response_model = rdflib.Graph(namespace_manager=self.ns_manager) + + # Because rdflib has embarrassingly broken base URI resolution that + # just drops path components from the base URI entirely (seriously), + # unfortunate the real server base URI can not be used here. Use + # <ingen:/> instead to at least avoid complete nonsense + response_model.parse(StringIO(response_str), 'ingen:/', format='n3') + + # Add new prefixes to prepend to future responses because rdflib sucks + for line in response_str.split('\n'): + if line.startswith('@prefix'): + match = re.search('@prefix ([^:]*): <(.*)> *\.', line) + if match: + name = match.group(1) + uri = match.group(2) + self.ns_manager.bind(match.group(1), match.group(2)) + + # Handle response (though there should be only one) + blanks = [] + response_desc = [] + for i in response_model.triples([None, NS.rdf.type, NS.patch.Response]): + response = i[0] + subject = response_model.value(response, NS.patch.subject, None) + body = response_model.value(response, NS.patch.body, None) + + response_desc += [i] + blanks += [response] + if body != 0: + self.raise_error(int(body), msg) # Raise exception on server error + + # Find the blank node closure of all responses + blank_closure = [] + for b in blanks: + blank_closure += self.blank_closure(response_model, b) + + # Remove response descriptions from model + for b in blank_closure: + for t in response_model.triples([b, None, None]): + response_model.remove(t) + + # Remove triples describing responses from response model + for i in response_desc: + response_model.remove(i) + + # Update model with remaining information, e.g. patch:Put updates + return self.update_model(response_model) + + def get(self, subject): + return self.send(''' +[] + a patch:Get ; + patch:subject <%s> . +''' % subject) + + def put(self, subject, body): + return self.send(''' +[] + a patch:Put ; + patch:subject <%s> ; + patch:body [ +%s + ] . +''' % (subject, body)) + + def patch(self, subject, remove, add): + return self.send(''' +[] + a patch:Patch ; + patch:subject <%s> ; + patch:remove [ +%s + ] ; + patch:add [ +%s + ] . +''' % (subject, remove, add)) + + def set(self, subject, key, value): + return self.send(''' +[] + a patch:Set ; + patch:subject <%s> ; + patch:property <%s> ; + patch:value %s . +''' % (subject, key, value)) + + def connect(self, tail, head): + return self.send(''' +[] + a patch:Put ; + patch:subject <%s> ; + patch:body [ + a ingen:Arc ; + ingen:tail <%s> ; + ingen:head <%s> ; + ] . +''' % (os.path.commonprefix([tail, head]), tail, head)) + + def disconnect(self, tail, head): + return self.send(''' +[] + a patch:Delete ; + patch:body [ + a ingen:Arc ; + ingen:tail <%s> ; + ingen:head <%s> ; + ] . +''' % (tail, head)) + + def delete(self, subject): + return self.send(''' +[] + a patch:Delete ; + patch:subject <%s> . +''' % subject) diff --git a/scripts/ingenams b/scripts/ingenams new file mode 100755 index 00000000..a183586a --- /dev/null +++ b/scripts/ingenams @@ -0,0 +1,283 @@ +#!/usr/bin/env python +# Load an AlsaModularSynth patch file into Ingen +# Copyright 2012-2015 David Robillard <http://drobilla.net> +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THIS SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +import ingen +import rdflib +import rdflib.namespace +import sys + +ams_prefix = 'http://github.com/blablack/ams-lv2/' +fomp_prefix = 'http://drobilla.net/plugins/fomp/' +note_uri = 'http://drobilla.net/ns/ingen-internals#Note' + +class World: + def __init__(self, server_uri): + self.server_uri = server_uri + self.server = ingen.Remote(server_uri) + self.pending_arcs = [] + self.server.get('/') + self.mod_prototypes = {} + + def mod_sym(self, mod_id): + return 'mod%d' % int(mod_id) + + def add_block(self, mod_id, plugin_uri, x, y): + self.mod_prototypes[self.mod_sym(mod_id)] = plugin_uri + self.server.put('/' + self.mod_sym(mod_id), + ('\t\ta ingen:Block ;\n' + + 'lv2:prototype <%s> ;\n' % plugin_uri + + 'ingen:canvasX %f ;\n' % x + + 'ingen:canvasY %f' % y).replace('\n', '\n\t\t')) + + def add_arc(self, + head_port_id, tail_port_id, + head_mod_id, tail_mod_id, + jack_color, cable_color): + self.pending_arcs += [(head_port_id, tail_port_id, + head_mod_id, tail_mod_id, + jack_color, cable_color)] + + def get_ports(self, mod_uri, port_type): + ports = [] + for i in self.server.model.triples([None, ingen.NS.rdf.type, port_type]): + if str(i[0]).startswith(mod_uri + '/'): + if not [i[0], ingen.NS.rdf.type, ingen.NS.lv2.ControlPort] in self.server.model: + # Unfortunately ingen.NS.lv2.index is a method + index = self.server.model.value(i[0], ingen.NS.lv2['index'], None) + ports += [[int(index), i[0]]] + return ports + + def input_by_id(self, mod_id, port_id): + mod_uri = rdflib.URIRef(self.server.server_base + self.mod_sym(mod_id)) + + # Get all input ports on this module sorted by index + inputs = sorted(self.get_ports(mod_uri, ingen.NS.lv2.InputPort)) + + # Return the port_id'th port in the list + index = 0 + for i in inputs: + if index == int(port_id): + return i[1] + index += 1 + + return None + + def output_by_id(self, mod_id, port_id): + mod_uri = rdflib.URIRef(self.server.server_base + self.mod_sym(mod_id)) + + # Get all output ports on this module sorted by index + outputs = sorted(self.get_ports(mod_uri, ingen.NS.lv2.OutputPort)) + + port_index = int(port_id) + if world.mod_prototypes[self.mod_sym(mod_id)] == note_uri: + # Adapt MCV/ADVMCV port index to Note port index + port_mapping = [ 3, 0, 2, 4, 6, 5, -1, -1, -1, -1 ] + port_index = port_mapping[port_index] + if port_index == -1: + sys.stderr.write('warning: unsupported MCV port %d\n' % int(port_id)) + return + + # Return the port_id'th port in the list + if port_index < len(outputs): + return outputs[port_index][1] + + return None + + def create_arcs(self): + for (head_port_id, tail_port_id, + head_mod_id, tail_mod_id, + jack_color, cable_color) in self.pending_arcs: + print('%s:%s => %s:%s' % (tail_mod_id, tail_port_id, head_mod_id, head_port_id)) + try: + tail = self.output_by_id(tail_mod_id, tail_port_id) + head = self.input_by_id(head_mod_id, head_port_id) + if tail and head: + self.server.connect(self.server.uri_to_path(tail), + self.server.uri_to_path(head)) + except: + pass + +# Static enumeration of special module type IDs +class Special: + CUSTOM = 0 + LADSPA = 6 + MCV = 10 + SCMCV = 30 + SCQUANTIZER = 31 + ADVMCV = 35 + +# Module types list, indexed by numeric ID in file +# Except where otherwise commented, these correspond to internal modules, +# and the string is the suffix of the corresponding AMS LV2 plugin URI +module_types = [ + "custom", # 0 = custom (unsupported) + "vco", + "vca", + "lfo", + "delay", + "ringmod", + "ladspa", # 6 = LADSPA plugin + "pcmout", + "mix", + "vcf", + "mcv", + "env", + "seq", + "inv", + "noise", + "slew", + "quantizer", + "pcmin", + "cvs", + "sh", + "vcorgan", + "dynamicwaves", + "advenv", + "wavout", + "scope", + "spectrum", + "vcswitch", + "jackin", + "jackout", + "midiout", + "scmcv", # Scala module (different line format) + "scquantizer", # Scala module (different line format) + "stereomix", + "conv", + "vcenv", + "advmcv", + "function", + "vcpanning", + "vcenv2", + "vcdoubledecay", + "vquant", + "amp", + "ad", + "mphlfo", + "noise2", + "vco2" +] + +class Module: + def __init__(self, num, plugin_uri, properties={}): + self.num = num + self.plugin_uri = plugin_uri + self.properties = properties + self.ports = [] + +class Patch: + def __init__(self): + self.modules = [] + +def ladspa_module(world, mod_id, x, y, poly, lib, label): + lv2_uri = '' + # Kludge LADSPA library and label to LV2 URIs where applicable + if lib == 'blvco': + lv2_uri = fomp_prefix + label.lower().replace('-', '_') + elif lib == 'mvclpf24' or lib == 'mvchpf24': + lv2_uri = fomp_prefix + label.lower().replace('-', '') + elif lib == 'cs_chorus' or lib == 'cs_phaser': + lv2_uri = fomp_prefix + 'cs_' + label.lower().replace('+', '_') + + if lv2_uri: + world.add_block(mod_id, lv2_uri, x, y) + else: + print('MOD %3d LADSPA %s %s %s' % (mod_id, poly, lib, label)) + +def scala_module(world, mod_id, scala_name): + sys.stderr.write('warning: scala module %3d (%s) unsupported\n' % (d, scala_name)) + +def standard_module(world, mod_id, x, y, name, arg): + if name == 'vca': + if int(arg) > 0: + name += 'exp' + else: + name += 'lin' + elif name == 'mix': + name += 'er_%dch' % int(arg) + + lv2_uri = ams_prefix + name + world.add_block(mod_id, lv2_uri, x, y) + +def float_control(world, mod_id, port_index, value, + logarithmic, minimum, maximum, midi_sign): + #print('FLOAT CONTROL %s:%s = %s' % (mod_id, port_index, value)) + pass + +def control(world, mod_id, port_index, value, midi_sign): + #print('CONTROL %s:%s = %s' % (mod_id, port_index, value)) + pass + +if len(sys.argv) != 2 and len(sys.argv) != 3: + sys.stderr.write('Usage: %s AMS_PATCH_FILE [SERVER_URI]\n' % sys.argv[0]) + sys.exit(1) + +in_path = sys.argv[1] +server_uri = 'unix:///tmp/ingen.sock' +if len(sys.argv) == 3: + server_uri = sys.argv[2] + +world = World(server_uri) +in_file = open(in_path, 'r') + +in_comment = False +for l in in_file: + try: + expr = l.split() + if not expr: + continue + elif expr[0] == '#PARA#': + in_comment = True + elif in_comment and expr[0] == '#ARAP#': + in_comment = False + elif expr[0] == 'Module': + mod_type = int(expr[1]) + mod_id = int(expr[2]) + mod_x = int(expr[3]) + mod_y = int(expr[4]) + if mod_type > len(module_types): + sys.stderr.write('warning: unknown module type %d\n', mod_type) + elif mod_type == Special.CUSTOM: + sys.stderr.write('warning: custom module %d unsupported\n' % mod_id) + if mod_type == Special.LADSPA: + ladspa_module(world, mod_id, mod_x, mod_y, int(expr[5]), expr[6], expr[7]) + elif mod_type == Special.SCMCV or mod_type == Special.SCQUANTIZER: + scala_name = expr[5] + scala_module(world, mod_id, scala_name) + elif mod_type == Special.MCV or mod_type == Special.ADVMCV: + world.add_block(mod_id, note_uri, mod_x, mod_y) + else: + standard_module(world, mod_id, mod_x, mod_y, module_types[mod_type], expr[5]) + elif expr[0] == 'ColorP': + world.add_arc(expr[1], expr[2], expr[3], expr[4], + (expr[5], expr[6], expr[7]), + (expr[8], expr[9], expr[10])) + elif expr[0] == 'FSlider': + float_control(world, mod_id, + expr[2], expr[3], expr[4], expr[5], expr[6], expr[7]) + elif expr[0] == 'ISlider' or expr[0] == 'LSlider': + control(world, mod_id, expr[2], expr[3], expr[4]) + #else: + # sys.stderr.write('warning: unsupported form %s\n' % expr[0]) + except ingen.Error: + e = sys.exc_info()[1] + sys.stderr.write('ingen error: %s\n' % e.message) + +world.create_arcs() + +#print(world.server.model.serialize(format='n3')) + +in_file.close() diff --git a/scripts/ingenish b/scripts/ingenish new file mode 100755 index 00000000..97640645 --- /dev/null +++ b/scripts/ingenish @@ -0,0 +1,121 @@ +#!/usr/bin/env python +# Ingen Interactive Shell +# Copyright 2011-2015 David Robillard <http://drobilla.net> +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THIS SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +import ingen +import os.path +import re +import shlex +import sys +import time +try: + import readline +except: + pass + +# Python 2 compatibility +try: + input = raw_input +except NameError: + pass + +def print_usage(): + print('''Usage: ingenish [OPTION]... [COMMAND [ARGUMENT]...] + +A command line interface to an Ingen server. A command can be given directly +on the command line, or when run with no arguments an interactive shell is +launched. + +Options: + -s ADDRESS The address of the Ingen server. Default is the local server + at unix:///tmp/ingen.sock but remote servers can be used with + an address like tcp:///my-ingen-server-host:16180 + +Commands: + put SUBJECT BODY + set SUBJECT KEY VALUE + connect TAIL HEAD + disconnect TAIL HEAD + patch SUBJECT REMOVE ADD + delete SUBJECT + help + exit + +Subjects are specified by URI, relative to the engine. The top level audio +graph has the path /main, so for example, a port on a block might have the +(relative) URI /main/osc/freq. + +Bodies are specified in fragments of Turtle, just as written in Ingen graph files. + +Example: + put /main/left_in 'a lv2:InputPort ; a lv2:AudioPort' + put /main/left_out 'a lv2:OutputPort ; a lv2:AudioPort' + put /main/tone 'a ingen:Block ; lv2:prototype <http://drobilla.net/plugins/mda/Shepard>' + put /main/combo 'a ingen:Block ; lv2:prototype <http://drobilla.net/plugins/mda/Combo>' + connect /main/left_in /main/tone/left_in + connect /main/tone/left_out /main/combo/left_in + connect /main/combo/left_out /main/left_out + set /main/tone/output ingen:value 0.7 +''') + +def run(cmd): + if cmd[0] == 'help': + print_usage() + elif cmd[0] == 'exit': + sys.exit(0) + elif cmd[0] == 'get' and len(cmd) == 2: + print(ingen.get(cmd[1]).serialize(format='n3')) + elif cmd[0] == 'put' and len(cmd) == 3: + return ingen.put(cmd[1], cmd[2]) + elif cmd[0] == 'patch' and len(cmd) == 4: + return ingen.patch(cmd[1], cmd[2], cmd[3]) + elif cmd[0] == 'set' and len(cmd) == 4: + return ingen.set(cmd[1], cmd[2], cmd[3]) + elif cmd[0] == 'connect' and len(cmd) == 3: + return ingen.connect(cmd[1], cmd[2]) + elif cmd[0] == 'disconnect' and len(cmd) == 3: + return ingen.disconnect(cmd[1], cmd[2]) + elif cmd[0] == 'delete' and len(cmd) == 2: + return ingen.delete(cmd[1]) + return False + +a = 1 +server = 'unix:///tmp/ingen.sock' +if len(sys.argv) > 1: + if sys.argv[a] == '-s': + server = sys.argv[a + 1] + a = a + 2 + elif sys.argv[a][0] == '-': + print_usage() + sys.exit(1) + +ingen = ingen.Remote(server) + +if len(sys.argv) - a == 0: + print('Ingen server at %s' % server) + while True: + try: + run(shlex.split(input('> '))) + except (EOFError, KeyboardInterrupt, SystemExit): + break + except: + print('error: %s' % sys.exc_info()[1]) +else: + try: + update = run(sys.argv[a:]) + if update: + print(update.serialize(format='n3')) + except: + print('error: %s' % sys.exc_info()[1]) |