summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Robillard <d@drobilla.net>2014-07-13 02:45:41 +0000
committerDavid Robillard <d@drobilla.net>2014-07-13 02:45:41 +0000
commitc067725cf5c187bca3c7b9a41c8b90a8ce50d95d (patch)
tree90d39f196c6b4ba2351c46c8f4cab7ce3f39c9e6
parentdf38c9eddd81db74174647fd5cca89f7a6de1257 (diff)
downloadlilv-c067725cf5c187bca3c7b9a41c8b90a8ce50d95d.tar.gz
lilv-c067725cf5c187bca3c7b9a41c8b90a8ce50d95d.tar.bz2
lilv-c067725cf5c187bca3c7b9a41c8b90a8ce50d95d.zip
Improved/working lv2_apply.py to apply plugin to a .wav (fix #969).
git-svn-id: http://svn.drobilla.net/lad/trunk/lilv@5409 a436a847-0d15-0410-975c-d299462d15a1
-rw-r--r--NEWS4
-rwxr-xr-xbindings/python/lv2_apply.py235
2 files changed, 154 insertions, 85 deletions
diff --git a/NEWS b/NEWS
index a00db45..792c1dd 100644
--- a/NEWS
+++ b/NEWS
@@ -10,10 +10,12 @@ lilv (0.19.0) unstable;
(useful for porting plugins bridges with NASPRO)
* Fix issues with lilv_plugin_get_author_name and friends
(thanks Filipe Coelho)
+ * Improved/working lv2_apply.py to apply plugin to a .wav
+ (thanks Joe Button)
* Fix several minor memory leaks
* Improve test coverage
- -- David Robillard <d@drobilla.net> Sat, 12 Jul 2014 15:45:00 -0400
+ -- David Robillard <d@drobilla.net> Sat, 12 Jul 2014 22:42:56 -0400
lilv (0.18.0) stable;
diff --git a/bindings/python/lv2_apply.py b/bindings/python/lv2_apply.py
index c3e5370..1cd906d 100755
--- a/bindings/python/lv2_apply.py
+++ b/bindings/python/lv2_apply.py
@@ -7,91 +7,158 @@ import sys
import wave
import numpy
-# Read command line arguments
-if len(sys.argv) != 4:
- print('USAGE: lv2_apply.py PLUGIN_URI INPUT_WAV OUTPUT_WAV')
- sys.exit(1)
-
-# Initialise Lilv
-world = lilv.World()
-world.load_all()
-
-plugin_uri = world.new_uri(sys.argv[1])
-wav_in_path = sys.argv[2]
-wav_out_path = sys.argv[3]
-
-
-# Find plugin
-plugin = world.get_all_plugins().get_by_uri(plugin_uri)
-if not plugin:
- print("Unknown plugin `%s'\n" % plugin_uri)
- sys.exit(1)
-
-lv2_InputPort = world.new_uri(lilv.LILV_URI_INPUT_PORT)
-lv2_OutputPort = world.new_uri(lilv.LILV_URI_OUTPUT_PORT)
-lv2_AudioPort = world.new_uri(lilv.LILV_URI_AUDIO_PORT)
-
-n_audio_in = plugin.get_num_ports_of_class(lv2_InputPort, lv2_AudioPort)
-n_audio_out = plugin.get_num_ports_of_class(lv2_OutputPort, lv2_AudioPort)
-if n_audio_out == 0:
- print("Plugin has no audio outputs\n")
- sys.exit(1)
-
-# Open input file
-wav_in = wave.open(wav_in_path, 'r')
-if not wav_in:
- print("Failed to open input `%s'\n" % wav_in_path)
- sys.exit(1)
-if wav_in.getnchannels() != n_audio_in:
- print("Input has %d channels, but plugin has %d audio inputs\n" % (
- wav_in.getnchannels(), n_audio_in))
- sys.exit(1)
-
-# Open output file
-wav_out = wave.open(wav_out_path, 'w')
-if not wav_out:
- print("Failed to open output `%s'\n" % wav_out_path)
- sys.exit(1)
-
-# Set output file to same format as input (except possibly nchannels)
-wav_out.setparams(wav_in.getparams())
-wav_out.setnchannels(n_audio_out)
-
-rate = wav_in.getframerate()
-nframes = wav_in.getnframes()
-
-# Instantiate plugin
-instance = lilv.Instance(plugin, rate)
-
-def read_float(wf, nframes):
- wav = wf.readframes(nframes)
- if wf.getsampwidth() == 4:
- wav = wave.struct.unpack("<%ul" % (len(wav) / 4), wav)
- wav = [ i / float(math.pow(2, 32)) for i in wav ]
- elif wf.getsampwidth() == 2:
- wav = wave.struct.unpack("<%uh" % (len(wav) / 2), wav)
- wav = [ i / float(math.pow(2, 16)) for i in wav ]
+class WavFile(object):
+ """Helper class for accessing wav file data. Should work on the most common
+ formats (8 bit unsigned, 16 bit signed, 32 bit signed). Audio data is
+ converted to float32."""
+
+ # (struct format code, is_signedtype) for each sample width:
+ WAV_SPECS = {
+ 1: ("B", False),
+ 2: ("h", True),
+ 4: ("l", True),
+ }
+
+ def __init__(self, wav_in_path):
+ self.wav_in = wave.open(wav_in_path, 'r')
+ self.framerate = self.wav_in.getframerate()
+ self.nframes = self.wav_in.getnframes()
+ self.nchannels = self.wav_in.getnchannels()
+ self.sampwidth = self.wav_in.getsampwidth()
+ wav_spec = self.WAV_SPECS[self.sampwidth]
+ self.struct_fmt_code, self.signed = wav_spec
+ self.range = 2 ** (8*self.sampwidth)
+
+ def read(self):
+ """Read data from an open wav file. Return a list of channels, where each
+ channel is a list of floats."""
+ raw_bytes = self.wav_in.readframes(self.nframes)
+ struct_fmt = "%u%s" % (len(raw_bytes) / self.sampwidth, self.struct_fmt_code)
+ data = wave.struct.unpack(struct_fmt, raw_bytes)
+ if self.signed:
+ data = [i / float(self.range/2) for i in data]
+ else:
+ data = [(i - float(range/2)) / float(range/2) for i in data]
+
+ channels = []
+ for i in xrange(self.nchannels):
+ channels.append([data[j] for j in xrange(0, len(data), self.nchannels) ])
+
+ return channels
+
+ def close(self):
+ self.wav_in.close()
+
+def main():
+ # Read command line arguments
+ if len(sys.argv) != 4:
+ print('USAGE: lv2_apply.py PLUGIN_URI INPUT_WAV OUTPUT_WAV')
+ sys.exit(1)
+
+ # Initialise Lilv
+ world = lilv.World()
+ world.load_all()
+
+ plugin_uri = sys.argv[1]
+ wav_in_path = sys.argv[2]
+ wav_out_path = sys.argv[3]
+
+ # Find plugin
+ plugin_uri_node = world.new_uri(plugin_uri)
+ plugin = world.get_all_plugins().get_by_uri(plugin_uri_node)
+ if not plugin:
+ print("Unknown plugin `%s'\n" % plugin_uri)
+ sys.exit(1)
+
+ lv2_InputPort = world.new_uri(lilv.LILV_URI_INPUT_PORT)
+ lv2_OutputPort = world.new_uri(lilv.LILV_URI_OUTPUT_PORT)
+ lv2_AudioPort = world.new_uri(lilv.LILV_URI_AUDIO_PORT)
+ lv2_ControlPort = world.new_uri(lilv.LILV_URI_CONTROL_PORT)
+ lv2_default = world.new_uri("http://lv2plug.in/ns/lv2core#default")
+
+ n_audio_in = plugin.get_num_ports_of_class(lv2_InputPort, lv2_AudioPort)
+ n_audio_out = plugin.get_num_ports_of_class(lv2_OutputPort, lv2_AudioPort)
+ if n_audio_out == 0:
+ print("Plugin has no audio outputs\n")
+ sys.exit(1)
+
+ # Open input file
+ try:
+ wav_in = WavFile(wav_in_path)
+ except:
+ print("Failed to open input `%s'\n" % wav_in_path)
+ sys.exit(1)
+
+ if wav_in.nchannels != n_audio_in:
+ print("Input has %d channels, but plugin has %d audio inputs\n" % (
+ wav_in.nchannels, n_audio_in))
+ sys.exit(1)
+
+ # Open output file
+ wav_out = wave.open(wav_out_path, 'w')
+ if not wav_out:
+ print("Failed to open output `%s'\n" % wav_out_path)
+ sys.exit(1)
+
+ # Set output file to same format as input (except possibly nchannels)
+ wav_out.setparams(wav_in.wav_in.getparams())
+ wav_out.setnchannels(n_audio_out)
+
+ print('%s => %s => %s @ %d Hz'
+ % (wav_in_path, plugin.get_name(), wav_out_path, wav_in.framerate))
+
+ instance = lilv.Instance(plugin, wav_in.framerate)
+
+ channels = wav_in.read()
+ wav_in.close()
+
+ # Connect all ports to buffers. NB if we fail to connect any buffer, lilv
+ # will segfault.
+ audio_input_buffers = []
+ audio_output_buffers = []
+ control_input_buffers = []
+ control_output_buffers = []
+ for index in range(plugin.get_num_ports()):
+ port = plugin.get_port_by_index(index)
+ if port.is_a(lv2_InputPort):
+ if port.is_a(lv2_AudioPort):
+ audio_input_buffers.append(numpy.array(channels[len(audio_input_buffers)], numpy.float32))
+ instance.connect_port(index, audio_input_buffers[-1])
+ elif port.is_a(lv2_ControlPort):
+ #if port.has_property(lv2_default): # Doesn't seem to work
+ default = lilv.lilv_node_as_float(lilv.lilv_nodes_get_first(port.get_value(lv2_default)))
+ control_input_buffers.append(numpy.array([default], numpy.float32))
+ instance.connect_port(index, control_input_buffers[-1])
+ else:
+ raise ValueError("Unhandled port type")
+ elif port.is_a(lv2_OutputPort):
+ if port.is_a(lv2_AudioPort):
+ audio_output_buffers.append(numpy.array([0] * wav_in.nframes, numpy.float32))
+ instance.connect_port(index, audio_output_buffers[-1])
+ elif port.is_a(lv2_ControlPort):
+ control_output_buffers.append(numpy.array([0], numpy.float32))
+ instance.connect_port(index, control_output_buffers[-1])
+ else:
+ raise ValueError("Unhandled port type")
+
+ # Run the plugin:
+ instance.run(wav_in.nframes)
+
+ # Interleave output buffers:
+ data = numpy.dstack(audio_output_buffers).flatten()
+
+ # Return to original int range:
+ if wav_in.signed:
+ data = data * float(wav_in.range / 2)
else:
- wav = wave.struct.unpack("%uB" % (len(wav)), wav)
- wav = [ s - 128 for s in wav ]
- wav = [ i / float(math.pow(2, 8)) for i in wav ]
-
- n_channels = wf.getnchannels()
- wavs = []
- if n_channels > 1:
- for i in xrange(n_channels):
- wavs.append([ wav[j] for j in xrange(0, len(wav), n_channels) ])
- else:
- wavs = [ wav ]
-
- return wavs
-
-in_buf = read_float(wav_in, nframes)
+ data = (data + 1) * float(wav_in.range/2)
-# TODO: buffer marshaling
-#instance.connect_port(3, in_buf)
+ # Write output file in chunks to stop memory usage getting out of hand:
+ CHUNK_SIZE = 8192
+ for chunk in numpy.array_split(data, CHUNK_SIZE):
+ wav_out.writeframes(wave.struct.pack("%u%s" % (len(chunk), wav_in.struct_fmt_code), *chunk))
+ wav_out.close()
-print('%s => %s => %s @ %d Hz'
- % (wav_in_path, plugin.get_name(), wav_out_path, rate))
-instance.connect_port(3, in_buf)
+if __name__ == "__main__":
+ main()