diff options
author | David Robillard <d@drobilla.net> | 2006-06-10 02:43:54 +0000 |
---|---|---|
committer | David Robillard <d@drobilla.net> | 2006-06-10 02:43:54 +0000 |
commit | 47246db7e9d71ba694b719001033fba1766a58c4 (patch) | |
tree | 25614ea4f2c92033a3cd37f6413df742f09a5369 /src/clients/python/OSC.py | |
parent | 98fe0e7056e6697396249531785d3899f94d79be (diff) | |
download | ingen-47246db7e9d71ba694b719001033fba1766a58c4.tar.gz ingen-47246db7e9d71ba694b719001033fba1766a58c4.tar.bz2 ingen-47246db7e9d71ba694b719001033fba1766a58c4.zip |
More juggling
git-svn-id: http://svn.drobilla.net/lad/grauph@16 a436a847-0d15-0410-975c-d299462d15a1
Diffstat (limited to 'src/clients/python/OSC.py')
-rwxr-xr-x | src/clients/python/OSC.py | 374 |
1 files changed, 0 insertions, 374 deletions
diff --git a/src/clients/python/OSC.py b/src/clients/python/OSC.py deleted file mode 100755 index 74eb5880..00000000 --- a/src/clients/python/OSC.py +++ /dev/null @@ -1,374 +0,0 @@ -#!/usr/bin/python -# -# Open SoundControl for Python -# Copyright (C) 2002 Daniel Holth, Clinton McChesney -# Modified by Leonard Ritter -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# For questions regarding this module contact -# Daniel Holth <dholth@stetson.edu> or visit -# http://www.stetson.edu/~ProctoLogic/ - -import socket -import struct -import math -import sys -import string - - -def hexDump(bytes): - """Useful utility; prints the string in hexadecimal""" - for i in range(len(bytes)): - sys.stdout.write("%2x " % (ord(bytes[i]))) - if (i+1) % 8 == 0: - print repr(bytes[i-7:i+1]) - - if(len(bytes) % 8 != 0): - print string.rjust("", 11), repr(bytes[i-7:i+1]) - - -class OSCMessage: - """Builds typetagged OSC messages.""" - def __init__(self): - self.address = "" - self.typetags = "," - self.message = "" - - def setAddress(self, address): - self.address = address - - def setMessage(self, message): - self.message = message - - def setTypetags(self, typetags): - self.typetags = typetags - - def clear(self): - self.address = "" - self.clearData() - - def clearData(self): - self.typetags = "," - self.message = "" - - def append(self, argument, typehint = None): - """Appends data to the message, - updating the typetags based on - the argument's type. - If the argument is a blob (counted string) - pass in 'b' as typehint.""" - - if typehint == 'b': - binary = OSCBlob(argument) - else: - binary = OSCArgument(argument) - - self.typetags = self.typetags + binary[0] - self.rawAppend(binary[1]) - - def rawAppend(self, data): - """Appends raw data to the message. Use append().""" - self.message = self.message + data - - def getBinary(self): - """Returns the binary message (so far) with typetags.""" - address = OSCArgument(self.address)[1] - typetags = OSCArgument(self.typetags)[1] - return address + typetags + self.message - - def __repr__(self): - return self.getBinary() - -def readString(data): - length = string.find(data,"\0") - nextData = int(math.ceil((length+1) / 4.0) * 4) - return (data[0:length], data[nextData:]) - - -def readBlob(data): - length = struct.unpack(">i", data[0:4])[0] - nextData = int(math.ceil((length) / 4.0) * 4) + 4 - return (data[4:length+4], data[nextData:]) - - -def readInt(data): - if(len(data)<4): - print "Error: too few bytes for int", data, len(data) - rest = data - integer = 0 - else: - integer = struct.unpack(">i", data[0:4])[0] - rest = data[4:] - - return (integer, rest) - - - -def readLong(data): - """Tries to interpret the next 8 bytes of the data - as a 64-bit signed integer.""" - high, low = struct.unpack(">ll", data[0:8]) - big = (long(high) << 32) + low - rest = data[8:] - return (big, rest) - - - -def readFloat(data): - if(len(data)<4): - print "Error: too few bytes for float", data, len(data) - rest = data - float = 0 - else: - float = struct.unpack(">f", data[0:4])[0] - rest = data[4:] - - return (float, rest) - - -def OSCBlob(next): - """Convert a string into an OSC Blob, - returning a (typetag, data) tuple.""" - - if type(next) == type(""): - length = len(next) - padded = math.ceil((len(next)) / 4.0) * 4 - binary = struct.pack(">i%ds" % (padded), length, next) - tag = 'b' - else: - tag = '' - binary = '' - - return (tag, binary) - - -def OSCArgument(next): - """Convert some Python types to their - OSC binary representations, returning a - (typetag, data) tuple.""" - - if type(next) == type(""): - OSCstringLength = math.ceil((len(next)+1) / 4.0) * 4 - binary = struct.pack(">%ds" % (OSCstringLength), next) - tag = "s" - elif type(next) == type(42.5): - binary = struct.pack(">f", next) - tag = "f" - elif type(next) == type(13): - binary = struct.pack(">i", next) - tag = "i" - else: - binary = "" - tag = "" - - return (tag, binary) - - -def parseArgs(args): - """Given a list of strings, produces a list - where those strings have been parsed (where - possible) as floats or integers.""" - parsed = [] - for arg in args: - print arg - arg = arg.strip() - interpretation = None - try: - interpretation = float(arg) - if string.find(arg, ".") == -1: - interpretation = int(interpretation) - except: - # Oh - it was a string. - interpretation = arg - pass - parsed.append(interpretation) - return parsed - - - -def decodeOSC(data): - """Converts a typetagged OSC message to a Python list.""" - table = {"i":readInt, "f":readFloat, "s":readString, "b":readBlob} - decoded = [] - address, rest = readString(data) - typetags = "" - - if address == "#bundle": - time, rest = readLong(rest) - decoded.append(address) - decoded.append(time) - while len(rest)>0: - length, rest = readInt(rest) - decoded.append(decodeOSC(rest[:length])) - rest = rest[length:] - - elif len(rest)>0: - typetags, rest = readString(rest) - decoded.append(address) - decoded.append(typetags) - if(typetags[0] == ","): - for tag in typetags[1:]: - value, rest = table[tag](rest) - decoded.append(value) - else: - print "Oops, typetag lacks the magic ," - - # return only the data - return address,decoded[2:] - - -class CallbackManager: - """This utility class maps OSC addresses to callables. - - The CallbackManager calls its callbacks with a list - of decoded OSC arguments, including the address and - the typetags as the first two arguments.""" - - def __init__(self): - self.callbacks = {} - self.addCallback(self.unbundler, "#bundle") - - def handle(self, data, source = None): - """Given OSC data, tries to call the callback with the - right address.""" - decoded = decodeOSC(data) - self.dispatch(decoded) - - def dispatch(self, message): - """Sends decoded OSC data to an appropriate calback""" - try: - address = message[0] - self.callbacks[address](message) - except KeyError, e: - # address not found - print "no handler for '%s'" % address - pass - except None, e: - print "Exception in", address, "callback :", e - - return - - def addCallback(self, callback, name): - """Adds a callback to our set of callbacks, - or removes the callback with name if callback - is None.""" - if callback == None: - del self.callbacks[name] - else: - self.callbacks[name] = callback - - def unbundler(self, messages): - """Dispatch the messages in a decoded bundle.""" - # first two elements are #bundle and the time tag, rest are messages. - for message in messages[2:]: - self.dispatch(message) - - -if __name__ == "__main__": - hexDump("Welcome to the OSC testing program.") - print - message = OSCMessage() - message.setAddress("/foo/play") - message.append(44) - message.append(11) - message.append(4.5) - message.append("the white cliffs of dover") - hexDump(message.getBinary()) - - print "Making and unmaking a message.." - - strings = OSCMessage() - strings.append("Mary had a little lamb") - strings.append("its fleece was white as snow") - strings.append("and everywhere that Mary went,") - strings.append("the lamb was sure to go.") - strings.append(14.5) - strings.append(14.5) - strings.append(-400) - - raw = strings.getBinary() - - hexDump(raw) - - print "Retrieving arguments..." - data = raw - for i in range(6): - text, data = readString(data) - print text - - number, data = readFloat(data) - print number - - number, data = readFloat(data) - print number - - number, data = readInt(data) - print number - - hexDump(raw) - print decodeOSC(raw) - print decodeOSC(message.getBinary()) - - print "Testing Blob types." - - blob = OSCMessage() - blob.append("","b") - blob.append("b","b") - blob.append("bl","b") - blob.append("blo","b") - blob.append("blob","b") - blob.append("blobs","b") - blob.append(42) - - hexDump(blob.getBinary()) - - print decodeOSC(blob.getBinary()) - - def printingCallback(stuff): - sys.stdout.write("Got: ") - for i in stuff: - sys.stdout.write(str(i) + " ") - sys.stdout.write("\n") - - print "Testing the callback manager." - - c = CallbackManager() - c.add(printingCallback, "/print") - - c.handle(message.getBinary()) - message.setAddress("/print") - c.handle(message.getBinary()) - - print1 = OSCMessage() - print1.setAddress("/print") - print1.append("Hey man, that's cool.") - print1.append(42) - print1.append(3.1415926) - - c.handle(print1.getBinary()) - - bundle = OSCMessage() - bundle.setAddress("") - bundle.append("#bundle") - bundle.append(0) - bundle.append(0) - bundle.append(print1.getBinary(), 'b') - bundle.append(print1.getBinary(), 'b') - - bundlebinary = bundle.message - - print "sending a bundle to the callback manager" - c.handle(bundlebinary) |