summaryrefslogtreecommitdiffstats
path: root/extras/netcache_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'extras/netcache_client.py')
-rw-r--r--extras/netcache_client.py390
1 files changed, 0 insertions, 390 deletions
diff --git a/extras/netcache_client.py b/extras/netcache_client.py
deleted file mode 100644
index dc49048..0000000
--- a/extras/netcache_client.py
+++ /dev/null
@@ -1,390 +0,0 @@
-#! /usr/bin/env python
-# encoding: utf-8
-# Thomas Nagy, 2011-2015 (ita)
-
-"""
-A client for the network cache (playground/netcache/). Launch the server with:
-./netcache_server, then use it for the builds by adding the following:
-
- def build(bld):
- bld.load('netcache_client')
-
-The parameters should be present in the environment in the form:
- NETCACHE=host:port waf configure build
-
-Or in a more detailed way:
- NETCACHE_PUSH=host:port NETCACHE_PULL=host:port waf configure build
-
-where:
- host: host where the server resides, by default localhost
- port: by default push on 11001 and pull on 12001
-
-Use the server provided in playground/netcache/Netcache.java
-"""
-
-import os, socket, time, atexit, sys
-from waflib import Task, Logs, Utils, Build, Runner
-from waflib.Configure import conf
-
-BUF = 8192 * 16
-HEADER_SIZE = 128
-MODES = ['PUSH', 'PULL', 'PUSH_PULL']
-STALE_TIME = 30 # seconds
-
-GET = 'GET'
-PUT = 'PUT'
-LST = 'LST'
-BYE = 'BYE'
-
-all_sigs_in_cache = (0.0, [])
-
-def put_data(conn, data):
- if sys.hexversion > 0x3000000:
- data = data.encode('latin-1')
- cnt = 0
- while cnt < len(data):
- sent = conn.send(data[cnt:])
- if sent == 0:
- raise RuntimeError('connection ended')
- cnt += sent
-
-push_connections = Runner.Queue(0)
-pull_connections = Runner.Queue(0)
-def get_connection(push=False):
- # return a new connection... do not forget to release it!
- try:
- if push:
- ret = push_connections.get(block=False)
- else:
- ret = pull_connections.get(block=False)
- except Exception:
- ret = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if push:
- ret.connect(Task.push_addr)
- else:
- ret.connect(Task.pull_addr)
- return ret
-
-def release_connection(conn, msg='', push=False):
- if conn:
- if push:
- push_connections.put(conn)
- else:
- pull_connections.put(conn)
-
-def close_connection(conn, msg=''):
- if conn:
- data = '%s,%s' % (BYE, msg)
- try:
- put_data(conn, data.ljust(HEADER_SIZE))
- except:
- pass
- try:
- conn.close()
- except:
- pass
-
-def close_all():
- for q in (push_connections, pull_connections):
- while q.qsize():
- conn = q.get()
- try:
- close_connection(conn)
- except:
- # ignore errors when cleaning up
- pass
-atexit.register(close_all)
-
-def read_header(conn):
- cnt = 0
- buf = []
- while cnt < HEADER_SIZE:
- data = conn.recv(HEADER_SIZE - cnt)
- if not data:
- #import traceback
- #traceback.print_stack()
- raise ValueError('connection ended when reading a header %r' % buf)
- buf.append(data)
- cnt += len(data)
- if sys.hexversion > 0x3000000:
- ret = ''.encode('latin-1').join(buf)
- ret = ret.decode('latin-1')
- else:
- ret = ''.join(buf)
- return ret
-
-def check_cache(conn, ssig):
- """
- List the files on the server, this is an optimization because it assumes that
- concurrent builds are rare
- """
- global all_sigs_in_cache
- if not STALE_TIME:
- return
- if time.time() - all_sigs_in_cache[0] > STALE_TIME:
-
- params = (LST,'')
- put_data(conn, ','.join(params).ljust(HEADER_SIZE))
-
- # read what is coming back
- ret = read_header(conn)
- size = int(ret.split(',')[0])
-
- buf = []
- cnt = 0
- while cnt < size:
- data = conn.recv(min(BUF, size-cnt))
- if not data:
- raise ValueError('connection ended %r %r' % (cnt, size))
- buf.append(data)
- cnt += len(data)
-
- if sys.hexversion > 0x3000000:
- ret = ''.encode('latin-1').join(buf)
- ret = ret.decode('latin-1')
- else:
- ret = ''.join(buf)
-
- all_sigs_in_cache = (time.time(), ret.splitlines())
- Logs.debug('netcache: server cache has %r entries', len(all_sigs_in_cache[1]))
-
- if not ssig in all_sigs_in_cache[1]:
- raise ValueError('no file %s in cache' % ssig)
-
-class MissingFile(Exception):
- pass
-
-def recv_file(conn, ssig, count, p):
- check_cache(conn, ssig)
-
- params = (GET, ssig, str(count))
- put_data(conn, ','.join(params).ljust(HEADER_SIZE))
- data = read_header(conn)
-
- size = int(data.split(',')[0])
-
- if size == -1:
- raise MissingFile('no file %s - %s in cache' % (ssig, count))
-
- # get the file, writing immediately
- # TODO a tmp file would be better
- f = open(p, 'wb')
- cnt = 0
- while cnt < size:
- data = conn.recv(min(BUF, size-cnt))
- if not data:
- raise ValueError('connection ended %r %r' % (cnt, size))
- f.write(data)
- cnt += len(data)
- f.close()
-
-def sock_send(conn, ssig, cnt, p):
- #print "pushing %r %r %r" % (ssig, cnt, p)
- size = os.stat(p).st_size
- params = (PUT, ssig, str(cnt), str(size))
- put_data(conn, ','.join(params).ljust(HEADER_SIZE))
- f = open(p, 'rb')
- cnt = 0
- while cnt < size:
- r = f.read(min(BUF, size-cnt))
- while r:
- k = conn.send(r)
- if not k:
- raise ValueError('connection ended')
- cnt += k
- r = r[k:]
-
-def can_retrieve_cache(self):
- if not Task.pull_addr:
- return False
- if not self.outputs:
- return False
- self.cached = False
-
- cnt = 0
- sig = self.signature()
- ssig = Utils.to_hex(self.uid() + sig)
-
- conn = None
- err = False
- try:
- try:
- conn = get_connection()
- for node in self.outputs:
- p = node.abspath()
- recv_file(conn, ssig, cnt, p)
- cnt += 1
- except MissingFile as e:
- Logs.debug('netcache: file is not in the cache %r', e)
- err = True
- except Exception as e:
- Logs.debug('netcache: could not get the files %r', self.outputs)
- if Logs.verbose > 1:
- Logs.debug('netcache: exception %r', e)
- err = True
-
- # broken connection? remove this one
- close_connection(conn)
- conn = None
- else:
- Logs.debug('netcache: obtained %r from cache', self.outputs)
-
- finally:
- release_connection(conn)
- if err:
- return False
-
- self.cached = True
- return True
-
-@Utils.run_once
-def put_files_cache(self):
- if not Task.push_addr:
- return
- if not self.outputs:
- return
- if getattr(self, 'cached', None):
- return
-
- #print "called put_files_cache", id(self)
- bld = self.generator.bld
- sig = self.signature()
- ssig = Utils.to_hex(self.uid() + sig)
-
- conn = None
- cnt = 0
- try:
- for node in self.outputs:
- # We could re-create the signature of the task with the signature of the outputs
- # in practice, this means hashing the output files
- # this is unnecessary
- try:
- if not conn:
- conn = get_connection(push=True)
- sock_send(conn, ssig, cnt, node.abspath())
- Logs.debug('netcache: sent %r', node)
- except Exception as e:
- Logs.debug('netcache: could not push the files %r', e)
-
- # broken connection? remove this one
- close_connection(conn)
- conn = None
- cnt += 1
- finally:
- release_connection(conn, push=True)
-
- bld.task_sigs[self.uid()] = self.cache_sig
-
-def hash_env_vars(self, env, vars_lst):
- # reimplement so that the resulting hash does not depend on local paths
- if not env.table:
- env = env.parent
- if not env:
- return Utils.SIG_NIL
-
- idx = str(id(env)) + str(vars_lst)
- try:
- cache = self.cache_env
- except AttributeError:
- cache = self.cache_env = {}
- else:
- try:
- return self.cache_env[idx]
- except KeyError:
- pass
-
- v = str([env[a] for a in vars_lst])
- v = v.replace(self.srcnode.abspath().__repr__()[:-1], '')
- m = Utils.md5()
- m.update(v.encode())
- ret = m.digest()
-
- Logs.debug('envhash: %r %r', ret, v)
-
- cache[idx] = ret
-
- return ret
-
-def uid(self):
- # reimplement so that the signature does not depend on local paths
- try:
- return self.uid_
- except AttributeError:
- m = Utils.md5()
- src = self.generator.bld.srcnode
- up = m.update
- up(self.__class__.__name__.encode())
- for x in self.inputs + self.outputs:
- up(x.path_from(src).encode())
- self.uid_ = m.digest()
- return self.uid_
-
-
-def make_cached(cls):
- if getattr(cls, 'nocache', None):
- return
-
- m1 = cls.run
- def run(self):
- if getattr(self, 'nocache', False):
- return m1(self)
- if self.can_retrieve_cache():
- return 0
- return m1(self)
- cls.run = run
-
- m2 = cls.post_run
- def post_run(self):
- if getattr(self, 'nocache', False):
- return m2(self)
- bld = self.generator.bld
- ret = m2(self)
- if bld.cache_global:
- self.put_files_cache()
- if hasattr(self, 'chmod'):
- for node in self.outputs:
- os.chmod(node.abspath(), self.chmod)
- return ret
- cls.post_run = post_run
-
-@conf
-def setup_netcache(ctx, push_addr, pull_addr):
- Task.Task.can_retrieve_cache = can_retrieve_cache
- Task.Task.put_files_cache = put_files_cache
- Task.Task.uid = uid
- Task.push_addr = push_addr
- Task.pull_addr = pull_addr
- Build.BuildContext.hash_env_vars = hash_env_vars
- ctx.cache_global = True
-
- for x in Task.classes.values():
- make_cached(x)
-
-def build(bld):
- if not 'NETCACHE' in os.environ and not 'NETCACHE_PULL' in os.environ and not 'NETCACHE_PUSH' in os.environ:
- Logs.warn('Setting NETCACHE_PULL=127.0.0.1:11001 and NETCACHE_PUSH=127.0.0.1:12001')
- os.environ['NETCACHE_PULL'] = '127.0.0.1:12001'
- os.environ['NETCACHE_PUSH'] = '127.0.0.1:11001'
-
- if 'NETCACHE' in os.environ:
- if not 'NETCACHE_PUSH' in os.environ:
- os.environ['NETCACHE_PUSH'] = os.environ['NETCACHE']
- if not 'NETCACHE_PULL' in os.environ:
- os.environ['NETCACHE_PULL'] = os.environ['NETCACHE']
-
- v = os.environ['NETCACHE_PULL']
- if v:
- h, p = v.split(':')
- pull_addr = (h, int(p))
- else:
- pull_addr = None
-
- v = os.environ['NETCACHE_PUSH']
- if v:
- h, p = v.split(':')
- push_addr = (h, int(p))
- else:
- push_addr = None
-
- setup_netcache(bld, push_addr, pull_addr)
-