diff options
Diffstat (limited to 'src/progs/python/OSC.py')
-rwxr-xr-x | src/progs/python/OSC.py | 374 |
1 files changed, 374 insertions, 0 deletions
diff --git a/src/progs/python/OSC.py b/src/progs/python/OSC.py new file mode 100755 index 00000000..74eb5880 --- /dev/null +++ b/src/progs/python/OSC.py @@ -0,0 +1,374 @@ +#!/usr/bin/python +# +# Open SoundControl for Python +# Copyright (C) 2002 Daniel Holth, Clinton McChesney +# Modified by Leonard Ritter +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# For questions regarding this module contact +# Daniel Holth <dholth@stetson.edu> or visit +# http://www.stetson.edu/~ProctoLogic/ + +import socket +import struct +import math +import sys +import string + + +def hexDump(bytes): + """Useful utility; prints the string in hexadecimal""" + for i in range(len(bytes)): + sys.stdout.write("%2x " % (ord(bytes[i]))) + if (i+1) % 8 == 0: + print repr(bytes[i-7:i+1]) + + if(len(bytes) % 8 != 0): + print string.rjust("", 11), repr(bytes[i-7:i+1]) + + +class OSCMessage: + """Builds typetagged OSC messages.""" + def __init__(self): + self.address = "" + self.typetags = "," + self.message = "" + + def setAddress(self, address): + self.address = address + + def setMessage(self, message): + self.message = message + + def setTypetags(self, typetags): + self.typetags = typetags + + def clear(self): + self.address = "" + self.clearData() + + def clearData(self): + self.typetags = "," + self.message = "" + + def append(self, argument, typehint = None): + """Appends data to the message, + updating the typetags based on + the argument's type. + If the argument is a blob (counted string) + pass in 'b' as typehint.""" + + if typehint == 'b': + binary = OSCBlob(argument) + else: + binary = OSCArgument(argument) + + self.typetags = self.typetags + binary[0] + self.rawAppend(binary[1]) + + def rawAppend(self, data): + """Appends raw data to the message. Use append().""" + self.message = self.message + data + + def getBinary(self): + """Returns the binary message (so far) with typetags.""" + address = OSCArgument(self.address)[1] + typetags = OSCArgument(self.typetags)[1] + return address + typetags + self.message + + def __repr__(self): + return self.getBinary() + +def readString(data): + length = string.find(data,"\0") + nextData = int(math.ceil((length+1) / 4.0) * 4) + return (data[0:length], data[nextData:]) + + +def readBlob(data): + length = struct.unpack(">i", data[0:4])[0] + nextData = int(math.ceil((length) / 4.0) * 4) + 4 + return (data[4:length+4], data[nextData:]) + + +def readInt(data): + if(len(data)<4): + print "Error: too few bytes for int", data, len(data) + rest = data + integer = 0 + else: + integer = struct.unpack(">i", data[0:4])[0] + rest = data[4:] + + return (integer, rest) + + + +def readLong(data): + """Tries to interpret the next 8 bytes of the data + as a 64-bit signed integer.""" + high, low = struct.unpack(">ll", data[0:8]) + big = (long(high) << 32) + low + rest = data[8:] + return (big, rest) + + + +def readFloat(data): + if(len(data)<4): + print "Error: too few bytes for float", data, len(data) + rest = data + float = 0 + else: + float = struct.unpack(">f", data[0:4])[0] + rest = data[4:] + + return (float, rest) + + +def OSCBlob(next): + """Convert a string into an OSC Blob, + returning a (typetag, data) tuple.""" + + if type(next) == type(""): + length = len(next) + padded = math.ceil((len(next)) / 4.0) * 4 + binary = struct.pack(">i%ds" % (padded), length, next) + tag = 'b' + else: + tag = '' + binary = '' + + return (tag, binary) + + +def OSCArgument(next): + """Convert some Python types to their + OSC binary representations, returning a + (typetag, data) tuple.""" + + if type(next) == type(""): + OSCstringLength = math.ceil((len(next)+1) / 4.0) * 4 + binary = struct.pack(">%ds" % (OSCstringLength), next) + tag = "s" + elif type(next) == type(42.5): + binary = struct.pack(">f", next) + tag = "f" + elif type(next) == type(13): + binary = struct.pack(">i", next) + tag = "i" + else: + binary = "" + tag = "" + + return (tag, binary) + + +def parseArgs(args): + """Given a list of strings, produces a list + where those strings have been parsed (where + possible) as floats or integers.""" + parsed = [] + for arg in args: + print arg + arg = arg.strip() + interpretation = None + try: + interpretation = float(arg) + if string.find(arg, ".") == -1: + interpretation = int(interpretation) + except: + # Oh - it was a string. + interpretation = arg + pass + parsed.append(interpretation) + return parsed + + + +def decodeOSC(data): + """Converts a typetagged OSC message to a Python list.""" + table = {"i":readInt, "f":readFloat, "s":readString, "b":readBlob} + decoded = [] + address, rest = readString(data) + typetags = "" + + if address == "#bundle": + time, rest = readLong(rest) + decoded.append(address) + decoded.append(time) + while len(rest)>0: + length, rest = readInt(rest) + decoded.append(decodeOSC(rest[:length])) + rest = rest[length:] + + elif len(rest)>0: + typetags, rest = readString(rest) + decoded.append(address) + decoded.append(typetags) + if(typetags[0] == ","): + for tag in typetags[1:]: + value, rest = table[tag](rest) + decoded.append(value) + else: + print "Oops, typetag lacks the magic ," + + # return only the data + return address,decoded[2:] + + +class CallbackManager: + """This utility class maps OSC addresses to callables. + + The CallbackManager calls its callbacks with a list + of decoded OSC arguments, including the address and + the typetags as the first two arguments.""" + + def __init__(self): + self.callbacks = {} + self.addCallback(self.unbundler, "#bundle") + + def handle(self, data, source = None): + """Given OSC data, tries to call the callback with the + right address.""" + decoded = decodeOSC(data) + self.dispatch(decoded) + + def dispatch(self, message): + """Sends decoded OSC data to an appropriate calback""" + try: + address = message[0] + self.callbacks[address](message) + except KeyError, e: + # address not found + print "no handler for '%s'" % address + pass + except None, e: + print "Exception in", address, "callback :", e + + return + + def addCallback(self, callback, name): + """Adds a callback to our set of callbacks, + or removes the callback with name if callback + is None.""" + if callback == None: + del self.callbacks[name] + else: + self.callbacks[name] = callback + + def unbundler(self, messages): + """Dispatch the messages in a decoded bundle.""" + # first two elements are #bundle and the time tag, rest are messages. + for message in messages[2:]: + self.dispatch(message) + + +if __name__ == "__main__": + hexDump("Welcome to the OSC testing program.") + print + message = OSCMessage() + message.setAddress("/foo/play") + message.append(44) + message.append(11) + message.append(4.5) + message.append("the white cliffs of dover") + hexDump(message.getBinary()) + + print "Making and unmaking a message.." + + strings = OSCMessage() + strings.append("Mary had a little lamb") + strings.append("its fleece was white as snow") + strings.append("and everywhere that Mary went,") + strings.append("the lamb was sure to go.") + strings.append(14.5) + strings.append(14.5) + strings.append(-400) + + raw = strings.getBinary() + + hexDump(raw) + + print "Retrieving arguments..." + data = raw + for i in range(6): + text, data = readString(data) + print text + + number, data = readFloat(data) + print number + + number, data = readFloat(data) + print number + + number, data = readInt(data) + print number + + hexDump(raw) + print decodeOSC(raw) + print decodeOSC(message.getBinary()) + + print "Testing Blob types." + + blob = OSCMessage() + blob.append("","b") + blob.append("b","b") + blob.append("bl","b") + blob.append("blo","b") + blob.append("blob","b") + blob.append("blobs","b") + blob.append(42) + + hexDump(blob.getBinary()) + + print decodeOSC(blob.getBinary()) + + def printingCallback(stuff): + sys.stdout.write("Got: ") + for i in stuff: + sys.stdout.write(str(i) + " ") + sys.stdout.write("\n") + + print "Testing the callback manager." + + c = CallbackManager() + c.add(printingCallback, "/print") + + c.handle(message.getBinary()) + message.setAddress("/print") + c.handle(message.getBinary()) + + print1 = OSCMessage() + print1.setAddress("/print") + print1.append("Hey man, that's cool.") + print1.append(42) + print1.append(3.1415926) + + c.handle(print1.getBinary()) + + bundle = OSCMessage() + bundle.setAddress("") + bundle.append("#bundle") + bundle.append(0) + bundle.append(0) + bundle.append(print1.getBinary(), 'b') + bundle.append(print1.getBinary(), 'b') + + bundlebinary = bundle.message + + print "sending a bundle to the callback manager" + c.handle(bundlebinary) |