summaryrefslogtreecommitdiffstats
path: root/bindings/python/lv2_apply.py
diff options
context:
space:
mode:
Diffstat (limited to 'bindings/python/lv2_apply.py')
-rwxr-xr-xbindings/python/lv2_apply.py159
1 files changed, 159 insertions, 0 deletions
diff --git a/bindings/python/lv2_apply.py b/bindings/python/lv2_apply.py
new file mode 100755
index 0000000..4c7d9b4
--- /dev/null
+++ b/bindings/python/lv2_apply.py
@@ -0,0 +1,159 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import math
+import lilv
+import sys
+import wave
+import numpy
+
+class WavFile(object):
+ """Helper class for accessing wav file data. Should work on the most common
+ formats (8 bit unsigned, 16 bit signed, 32 bit signed). Audio data is
+ converted to float32."""
+
+ # (struct format code, is_signedtype) for each sample width:
+ WAV_SPECS = {
+ 1: ("B", False),
+ 2: ("h", True),
+ 4: ("l", True),
+ }
+
+ def __init__(self, wav_in_path):
+ self.wav_in = wave.open(wav_in_path, 'r')
+ self.framerate = self.wav_in.getframerate()
+ self.nframes = self.wav_in.getnframes()
+ self.nchannels = self.wav_in.getnchannels()
+ self.sampwidth = self.wav_in.getsampwidth()
+ wav_spec = self.WAV_SPECS[self.sampwidth]
+ self.struct_fmt_code, self.signed = wav_spec
+ self.range = 2 ** (8*self.sampwidth)
+
+ def read(self):
+ """Read data from an open wav file. Return a list of channels, where each
+ channel is a list of floats."""
+ raw_bytes = self.wav_in.readframes(self.nframes)
+ struct_fmt = "%u%s" % (len(raw_bytes) / self.sampwidth, self.struct_fmt_code)
+ data = wave.struct.unpack(struct_fmt, raw_bytes)
+ if self.signed:
+ data = [i / float(self.range/2) for i in data]
+ else:
+ data = [(i - float(range/2)) / float(range/2) for i in data]
+
+ channels = []
+ for i in range(self.nchannels):
+ channels.append([data[j] for j in range(0, len(data), self.nchannels) ])
+
+ return channels
+
+ def close(self):
+ self.wav_in.close()
+
+def main():
+ # Read command line arguments
+ if len(sys.argv) != 4:
+ print('USAGE: lv2_apply.py PLUGIN_URI INPUT_WAV OUTPUT_WAV')
+ sys.exit(1)
+
+ # Initialise Lilv
+ world = lilv.World()
+ ns = world.ns
+ world.load_all()
+
+ plugin_uri = sys.argv[1]
+ wav_in_path = sys.argv[2]
+ wav_out_path = sys.argv[3]
+
+ # Find plugin
+ plugin_uri_node = world.new_uri(plugin_uri)
+ plugins = world.get_all_plugins()
+ if plugin_uri_node not in plugins:
+ print("Unknown plugin `%s'" % plugin_uri)
+ sys.exit(1)
+
+ plugin = plugins[plugin_uri_node]
+ n_audio_in = plugin.get_num_ports_of_class(ns.lv2.InputPort, ns.lv2.AudioPort)
+ n_audio_out = plugin.get_num_ports_of_class(ns.lv2.OutputPort, ns.lv2.AudioPort)
+ if n_audio_out == 0:
+ print("Plugin has no audio outputs\n")
+ sys.exit(1)
+
+ # Open input file
+ try:
+ wav_in = WavFile(wav_in_path)
+ except:
+ print("Failed to open input `%s'\n" % wav_in_path)
+ sys.exit(1)
+
+ if wav_in.nchannels != n_audio_in:
+ print("Input has %d channels, but plugin has %d audio inputs\n" % (
+ wav_in.nchannels, n_audio_in))
+ sys.exit(1)
+
+ # Open output file
+ wav_out = wave.open(wav_out_path, 'w')
+ if not wav_out:
+ print("Failed to open output `%s'\n" % wav_out_path)
+ sys.exit(1)
+
+ # Set output file to same format as input (except possibly nchannels)
+ wav_out.setparams(wav_in.wav_in.getparams())
+ wav_out.setnchannels(n_audio_out)
+
+ print('%s => %s => %s @ %d Hz'
+ % (wav_in_path, plugin.get_name(), wav_out_path, wav_in.framerate))
+
+ instance = lilv.Instance(plugin, wav_in.framerate)
+
+ channels = wav_in.read()
+ wav_in.close()
+
+ # Connect all ports to buffers. NB if we fail to connect any buffer, lilv
+ # will segfault.
+ audio_input_buffers = []
+ audio_output_buffers = []
+ control_input_buffers = []
+ control_output_buffers = []
+ for index in range(plugin.get_num_ports()):
+ port = plugin.get_port_by_index(index)
+ if port.is_a(ns.lv2.InputPort):
+ if port.is_a(ns.lv2.AudioPort):
+ audio_input_buffers.append(numpy.array(channels[len(audio_input_buffers)], numpy.float32))
+ instance.connect_port(index, audio_input_buffers[-1])
+ elif port.is_a(ns.lv2.ControlPort):
+ default = float(port.get(ns.lv2.default))
+ control_input_buffers.append(numpy.array([default], numpy.float32))
+ instance.connect_port(index, control_input_buffers[-1])
+ else:
+ raise ValueError("Unhandled port type")
+ elif port.is_a(ns.lv2.OutputPort):
+ if port.is_a(ns.lv2.AudioPort):
+ audio_output_buffers.append(numpy.array([0] * wav_in.nframes, numpy.float32))
+ instance.connect_port(index, audio_output_buffers[-1])
+ elif port.is_a(ns.lv2.ControlPort):
+ control_output_buffers.append(numpy.array([0], numpy.float32))
+ instance.connect_port(index, control_output_buffers[-1])
+ else:
+ raise ValueError("Unhandled port type")
+
+ # Run the plugin:
+ instance.run(wav_in.nframes)
+
+ # Interleave output buffers:
+ data = numpy.dstack(audio_output_buffers).flatten()
+
+ # Return to original int range:
+ if wav_in.signed:
+ data = data * float(wav_in.range / 2)
+ else:
+ data = (data + 1) * float(wav_in.range/2)
+
+ # Write output file in chunks to stop memory usage getting out of hand:
+ CHUNK_SIZE = 8192
+ for chunk in numpy.array_split(data, CHUNK_SIZE):
+ wav_out.writeframes(wave.struct.pack("%u%s" % (len(chunk), wav_in.struct_fmt_code), *chunk.astype(int)))
+ wav_out.close()
+
+
+if __name__ == "__main__":
+ main()