#! /usr/bin/env python # encoding: utf-8 # Thomas Nagy, 2011-2015 (ita) """ A client for the network cache (playground/netcache/). Launch the server with: ./netcache_server, then use it for the builds by adding the following: def build(bld): bld.load('netcache_client') The parameters should be present in the environment in the form: NETCACHE=host:port waf configure build Or in a more detailed way: NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build where: host: host where the server resides, by default localhost port: by default push on 11001 and pull on 12001 Use the server provided in playground/netcache/Netcache.java """ import os, socket, time, atexit, sys from waflib import Task, Logs, Utils, Build, Runner from waflib.Configure import conf BUF = 8192 * 16 HEADER_SIZE = 128 MODES = ['PUSH', 'PULL', 'PUSH_PULL'] STALE_TIME = 30 # seconds GET = 'GET' PUT = 'PUT' LST = 'LST' BYE = 'BYE' all_sigs_in_cache = (0.0, []) def put_data(conn, data): if sys.hexversion > 0x3000000: data = data.encode('latin-1') cnt = 0 while cnt < len(data): sent = conn.send(data[cnt:]) if sent == 0: raise RuntimeError('connection ended') cnt += sent push_connections = Runner.Queue(0) pull_connections = Runner.Queue(0) def get_connection(push=False): # return a new connection... do not forget to release it! try: if push: ret = push_connections.get(block=False) else: ret = pull_connections.get(block=False) except Exception: ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if push: ret.connect(Task.push_addr) else: ret.connect(Task.pull_addr) return ret def release_connection(conn, msg='', push=False): if conn: if push: push_connections.put(conn) else: pull_connections.put(conn) def close_connection(conn, msg=''): if conn: data = '%s,%s' % (BYE, msg) try: put_data(conn, data.ljust(HEADER_SIZE)) except: pass try: conn.close() except: pass def close_all(): for q in (push_connections, pull_connections): while q.qsize(): conn = q.get() try: close_connection(conn) except: # ignore errors when cleaning up pass atexit.register(close_all) def read_header(conn): cnt = 0 buf = [] while cnt < HEADER_SIZE: data = conn.recv(HEADER_SIZE - cnt) if not data: #import traceback #traceback.print_stack() raise ValueError('connection ended when reading a header %r' % buf) buf.append(data) cnt += len(data) if sys.hexversion > 0x3000000: ret = ''.encode('latin-1').join(buf) ret = ret.decode('latin-1') else: ret = ''.join(buf) return ret def check_cache(conn, ssig): """ List the files on the server, this is an optimization because it assumes that concurrent builds are rare """ global all_sigs_in_cache if not STALE_TIME: return if time.time() - all_sigs_in_cache[0] > STALE_TIME: params = (LST,'') put_data(conn, ','.join(params).ljust(HEADER_SIZE)) # read what is coming back ret = read_header(conn) size = int(ret.split(',')[0]) buf = [] cnt = 0 while cnt < size: data = conn.recv(min(BUF, size-cnt)) if not data: raise ValueError('connection ended %r %r' % (cnt, size)) buf.append(data) cnt += len(data) if sys.hexversion > 0x3000000: ret = ''.encode('latin-1').join(buf) ret = ret.decode('latin-1') else: ret = ''.join(buf) all_sigs_in_cache = (time.time(), ret.splitlines()) Logs.debug('netcache: server cache has %r entries', len(all_sigs_in_cache[1])) if not ssig in all_sigs_in_cache[1]: raise ValueError('no file %s in cache' % ssig) class MissingFile(Exception): pass def recv_file(conn, ssig, count, p): check_cache(conn, ssig) params = (GET, ssig, str(count)) put_data(conn, ','.join(params).ljust(HEADER_SIZE)) data = read_header(conn) size = int(data.split(',')[0]) if size == -1: raise MissingFile('no file %s - %s in cache' % (ssig, count)) # get the file, writing immediately # TODO a tmp file would be better f = open(p, 'wb') cnt = 0 while cnt < size: data = conn.recv(min(BUF, size-cnt)) if not data: raise ValueError('connection ended %r %r' % (cnt, size)) f.write(data) cnt += len(data) f.close() def sock_send(conn, ssig, cnt, p): #print "pushing %r %r %r" % (ssig, cnt, p) size = os.stat(p).st_size params = (PUT, ssig, str(cnt), str(size)) put_data(conn, ','.join(params).ljust(HEADER_SIZE)) f = open(p, 'rb') cnt = 0 while cnt < size: r = f.read(min(BUF, size-cnt)) while r: k = conn.send(r) if not k: raise ValueError('connection ended') cnt += k r = r[k:] def can_retrieve_cache(self): if not Task.pull_addr: return False if not self.outputs: return False self.cached = False cnt = 0 sig = self.signature() ssig = Utils.to_hex(self.uid() + sig) conn = None err = False try: try: conn = get_connection() for node in self.outputs: p = node.abspath() recv_file(conn, ssig, cnt, p) cnt += 1 except MissingFile as e: Logs.debug('netcache: file is not in the cache %r', e) err = True except Exception as e: Logs.debug('netcache: could not get the files %r', self.outputs) if Logs.verbose > 1: Logs.debug('netcache: exception %r', e) err = True # broken connection? remove this one close_connection(conn) conn = None else: Logs.debug('netcache: obtained %r from cache', self.outputs) finally: release_connection(conn) if err: return False self.cached = True return True @Utils.run_once def put_files_cache(self): if not Task.push_addr: return if not self.outputs: return if getattr(self, 'cached', None): return #print "called put_files_cache", id(self) bld = self.generator.bld sig = self.signature() ssig = Utils.to_hex(self.uid() + sig) conn = None cnt = 0 try: for node in self.outputs: # We could re-create the signature of the task with the signature of the outputs # in practice, this means hashing the output files # this is unnecessary try: if not conn: conn = get_connection(push=True) sock_send(conn, ssig, cnt, node.abspath()) Logs.debug('netcache: sent %r', node) except Exception as e: Logs.debug('netcache: could not push the files %r', e) # broken connection? remove this one close_connection(conn) conn = None cnt += 1 finally: release_connection(conn, push=True) bld.task_sigs[self.uid()] = self.cache_sig def hash_env_vars(self, env, vars_lst): # reimplement so that the resulting hash does not depend on local paths if not env.table: env = env.parent if not env: return Utils.SIG_NIL idx = str(id(env)) + str(vars_lst) try: cache = self.cache_env except AttributeError: cache = self.cache_env = {} else: try: return self.cache_env[idx] except KeyError: pass v = str([env[a] for a in vars_lst]) v = v.replace(self.srcnode.abspath().__repr__()[:-1], '') m = Utils.md5() m.update(v.encode()) ret = m.digest() Logs.debug('envhash: %r %r', ret, v) cache[idx] = ret return ret def uid(self): # reimplement so that the signature does not depend on local paths try: return self.uid_ except AttributeError: m = Utils.md5() src = self.generator.bld.srcnode up = m.update up(self.__class__.__name__.encode()) for x in self.inputs + self.outputs: up(x.path_from(src).encode()) self.uid_ = m.digest() return self.uid_ def make_cached(cls): if getattr(cls, 'nocache', None): return m1 = cls.run def run(self): if getattr(self, 'nocache', False): return m1(self) if self.can_retrieve_cache(): return 0 return m1(self) cls.run = run m2 = cls.post_run def post_run(self): if getattr(self, 'nocache', False): return m2(self) bld = self.generator.bld ret = m2(self) if bld.cache_global: self.put_files_cache() if hasattr(self, 'chmod'): for node in self.outputs: os.chmod(node.abspath(), self.chmod) return ret cls.post_run = post_run @conf def setup_netcache(ctx, push_addr, pull_addr): Task.Task.can_retrieve_cache = can_retrieve_cache Task.Task.put_files_cache = put_files_cache Task.Task.uid = uid Task.push_addr = push_addr Task.pull_addr = pull_addr Build.BuildContext.hash_env_vars = hash_env_vars ctx.cache_global = True for x in Task.classes.values(): make_cached(x) def build(bld): if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ: Logs.warn('Setting NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001') os.environ['NETCACHE_PULL'] = '127.0.0.1:12001' os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001' if 'NETCACHE' in os.environ: if not 'NETCACHE_PUSH' in os.environ: os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE'] if not 'NETCACHE_PULL' in os.environ: os.environ['NETCACHE_PULL'] = os.environ['NETCACHE'] v = os.environ['NETCACHE_PULL'] if v: h, p = v.split(':') pull_addr = (h, int(p)) else: pull_addr = None v = os.environ['NETCACHE_PUSH'] if v: h, p = v.split(':') push_addr = (h, int(p)) else: push_addr = None setup_netcache(bld, push_addr, pull_addr)