From c067725cf5c187bca3c7b9a41c8b90a8ce50d95d Mon Sep 17 00:00:00 2001 From: David Robillard Date: Sun, 13 Jul 2014 02:45:41 +0000 Subject: Improved/working lv2_apply.py to apply plugin to a .wav (fix #969). git-svn-id: http://svn.drobilla.net/lad/trunk/lilv@5409 a436a847-0d15-0410-975c-d299462d15a1 --- NEWS | 4 +- bindings/python/lv2_apply.py | 235 +++++++++++++++++++++++++++---------------- 2 files changed, 154 insertions(+), 85 deletions(-) diff --git a/NEWS b/NEWS index a00db45..792c1dd 100644 --- a/NEWS +++ b/NEWS @@ -10,10 +10,12 @@ lilv (0.19.0) unstable; (useful for porting plugins bridges with NASPRO) * Fix issues with lilv_plugin_get_author_name and friends (thanks Filipe Coelho) + * Improved/working lv2_apply.py to apply plugin to a .wav + (thanks Joe Button) * Fix several minor memory leaks * Improve test coverage - -- David Robillard Sat, 12 Jul 2014 15:45:00 -0400 + -- David Robillard Sat, 12 Jul 2014 22:42:56 -0400 lilv (0.18.0) stable; diff --git a/bindings/python/lv2_apply.py b/bindings/python/lv2_apply.py index c3e5370..1cd906d 100755 --- a/bindings/python/lv2_apply.py +++ b/bindings/python/lv2_apply.py @@ -7,91 +7,158 @@ import sys import wave import numpy -# Read command line arguments -if len(sys.argv) != 4: - print('USAGE: lv2_apply.py PLUGIN_URI INPUT_WAV OUTPUT_WAV') - sys.exit(1) - -# Initialise Lilv -world = lilv.World() -world.load_all() - -plugin_uri = world.new_uri(sys.argv[1]) -wav_in_path = sys.argv[2] -wav_out_path = sys.argv[3] - - -# Find plugin -plugin = world.get_all_plugins().get_by_uri(plugin_uri) -if not plugin: - print("Unknown plugin `%s'\n" % plugin_uri) - sys.exit(1) - -lv2_InputPort = world.new_uri(lilv.LILV_URI_INPUT_PORT) -lv2_OutputPort = world.new_uri(lilv.LILV_URI_OUTPUT_PORT) -lv2_AudioPort = world.new_uri(lilv.LILV_URI_AUDIO_PORT) - -n_audio_in = plugin.get_num_ports_of_class(lv2_InputPort, lv2_AudioPort) -n_audio_out = plugin.get_num_ports_of_class(lv2_OutputPort, lv2_AudioPort) -if n_audio_out == 0: - print("Plugin has no audio outputs\n") - sys.exit(1) - -# Open input file -wav_in = wave.open(wav_in_path, 'r') -if not wav_in: - print("Failed to open input `%s'\n" % wav_in_path) - sys.exit(1) -if wav_in.getnchannels() != n_audio_in: - print("Input has %d channels, but plugin has %d audio inputs\n" % ( - wav_in.getnchannels(), n_audio_in)) - sys.exit(1) - -# Open output file -wav_out = wave.open(wav_out_path, 'w') -if not wav_out: - print("Failed to open output `%s'\n" % wav_out_path) - sys.exit(1) - -# Set output file to same format as input (except possibly nchannels) -wav_out.setparams(wav_in.getparams()) -wav_out.setnchannels(n_audio_out) - -rate = wav_in.getframerate() -nframes = wav_in.getnframes() - -# Instantiate plugin -instance = lilv.Instance(plugin, rate) - -def read_float(wf, nframes): - wav = wf.readframes(nframes) - if wf.getsampwidth() == 4: - wav = wave.struct.unpack("<%ul" % (len(wav) / 4), wav) - wav = [ i / float(math.pow(2, 32)) for i in wav ] - elif wf.getsampwidth() == 2: - wav = wave.struct.unpack("<%uh" % (len(wav) / 2), wav) - wav = [ i / float(math.pow(2, 16)) for i in wav ] +class WavFile(object): + """Helper class for accessing wav file data. Should work on the most common + formats (8 bit unsigned, 16 bit signed, 32 bit signed). Audio data is + converted to float32.""" + + # (struct format code, is_signedtype) for each sample width: + WAV_SPECS = { + 1: ("B", False), + 2: ("h", True), + 4: ("l", True), + } + + def __init__(self, wav_in_path): + self.wav_in = wave.open(wav_in_path, 'r') + self.framerate = self.wav_in.getframerate() + self.nframes = self.wav_in.getnframes() + self.nchannels = self.wav_in.getnchannels() + self.sampwidth = self.wav_in.getsampwidth() + wav_spec = self.WAV_SPECS[self.sampwidth] + self.struct_fmt_code, self.signed = wav_spec + self.range = 2 ** (8*self.sampwidth) + + def read(self): + """Read data from an open wav file. Return a list of channels, where each + channel is a list of floats.""" + raw_bytes = self.wav_in.readframes(self.nframes) + struct_fmt = "%u%s" % (len(raw_bytes) / self.sampwidth, self.struct_fmt_code) + data = wave.struct.unpack(struct_fmt, raw_bytes) + if self.signed: + data = [i / float(self.range/2) for i in data] + else: + data = [(i - float(range/2)) / float(range/2) for i in data] + + channels = [] + for i in xrange(self.nchannels): + channels.append([data[j] for j in xrange(0, len(data), self.nchannels) ]) + + return channels + + def close(self): + self.wav_in.close() + +def main(): + # Read command line arguments + if len(sys.argv) != 4: + print('USAGE: lv2_apply.py PLUGIN_URI INPUT_WAV OUTPUT_WAV') + sys.exit(1) + + # Initialise Lilv + world = lilv.World() + world.load_all() + + plugin_uri = sys.argv[1] + wav_in_path = sys.argv[2] + wav_out_path = sys.argv[3] + + # Find plugin + plugin_uri_node = world.new_uri(plugin_uri) + plugin = world.get_all_plugins().get_by_uri(plugin_uri_node) + if not plugin: + print("Unknown plugin `%s'\n" % plugin_uri) + sys.exit(1) + + lv2_InputPort = world.new_uri(lilv.LILV_URI_INPUT_PORT) + lv2_OutputPort = world.new_uri(lilv.LILV_URI_OUTPUT_PORT) + lv2_AudioPort = world.new_uri(lilv.LILV_URI_AUDIO_PORT) + lv2_ControlPort = world.new_uri(lilv.LILV_URI_CONTROL_PORT) + lv2_default = world.new_uri("http://lv2plug.in/ns/lv2core#default") + + n_audio_in = plugin.get_num_ports_of_class(lv2_InputPort, lv2_AudioPort) + n_audio_out = plugin.get_num_ports_of_class(lv2_OutputPort, lv2_AudioPort) + if n_audio_out == 0: + print("Plugin has no audio outputs\n") + sys.exit(1) + + # Open input file + try: + wav_in = WavFile(wav_in_path) + except: + print("Failed to open input `%s'\n" % wav_in_path) + sys.exit(1) + + if wav_in.nchannels != n_audio_in: + print("Input has %d channels, but plugin has %d audio inputs\n" % ( + wav_in.nchannels, n_audio_in)) + sys.exit(1) + + # Open output file + wav_out = wave.open(wav_out_path, 'w') + if not wav_out: + print("Failed to open output `%s'\n" % wav_out_path) + sys.exit(1) + + # Set output file to same format as input (except possibly nchannels) + wav_out.setparams(wav_in.wav_in.getparams()) + wav_out.setnchannels(n_audio_out) + + print('%s => %s => %s @ %d Hz' + % (wav_in_path, plugin.get_name(), wav_out_path, wav_in.framerate)) + + instance = lilv.Instance(plugin, wav_in.framerate) + + channels = wav_in.read() + wav_in.close() + + # Connect all ports to buffers. NB if we fail to connect any buffer, lilv + # will segfault. + audio_input_buffers = [] + audio_output_buffers = [] + control_input_buffers = [] + control_output_buffers = [] + for index in range(plugin.get_num_ports()): + port = plugin.get_port_by_index(index) + if port.is_a(lv2_InputPort): + if port.is_a(lv2_AudioPort): + audio_input_buffers.append(numpy.array(channels[len(audio_input_buffers)], numpy.float32)) + instance.connect_port(index, audio_input_buffers[-1]) + elif port.is_a(lv2_ControlPort): + #if port.has_property(lv2_default): # Doesn't seem to work + default = lilv.lilv_node_as_float(lilv.lilv_nodes_get_first(port.get_value(lv2_default))) + control_input_buffers.append(numpy.array([default], numpy.float32)) + instance.connect_port(index, control_input_buffers[-1]) + else: + raise ValueError("Unhandled port type") + elif port.is_a(lv2_OutputPort): + if port.is_a(lv2_AudioPort): + audio_output_buffers.append(numpy.array([0] * wav_in.nframes, numpy.float32)) + instance.connect_port(index, audio_output_buffers[-1]) + elif port.is_a(lv2_ControlPort): + control_output_buffers.append(numpy.array([0], numpy.float32)) + instance.connect_port(index, control_output_buffers[-1]) + else: + raise ValueError("Unhandled port type") + + # Run the plugin: + instance.run(wav_in.nframes) + + # Interleave output buffers: + data = numpy.dstack(audio_output_buffers).flatten() + + # Return to original int range: + if wav_in.signed: + data = data * float(wav_in.range / 2) else: - wav = wave.struct.unpack("%uB" % (len(wav)), wav) - wav = [ s - 128 for s in wav ] - wav = [ i / float(math.pow(2, 8)) for i in wav ] - - n_channels = wf.getnchannels() - wavs = [] - if n_channels > 1: - for i in xrange(n_channels): - wavs.append([ wav[j] for j in xrange(0, len(wav), n_channels) ]) - else: - wavs = [ wav ] - - return wavs - -in_buf = read_float(wav_in, nframes) + data = (data + 1) * float(wav_in.range/2) -# TODO: buffer marshaling -#instance.connect_port(3, in_buf) + # Write output file in chunks to stop memory usage getting out of hand: + CHUNK_SIZE = 8192 + for chunk in numpy.array_split(data, CHUNK_SIZE): + wav_out.writeframes(wave.struct.pack("%u%s" % (len(chunk), wav_in.struct_fmt_code), *chunk)) + wav_out.close() -print('%s => %s => %s @ %d Hz' - % (wav_in_path, plugin.get_name(), wav_out_path, rate)) -instance.connect_port(3, in_buf) +if __name__ == "__main__": + main() -- cgit v1.2.1