Mercurial > viff
changeset 1319:96e0f7de22b0
Store data without deferreds in preprocessing pool.
author | Marcel Keller <mkeller@cs.au.dk> |
---|---|
date | Fri, 02 Oct 2009 11:27:08 +0200 |
parents | 70c995d31d1a |
children | fb09cb799cc8 |
files | viff/runtime.py |
diffstat | 1 files changed, 14 insertions(+), 19 deletions(-) [+] |
line wrap: on
line diff
--- a/viff/runtime.py Fri Sep 25 20:30:05 2009 +0200 +++ b/viff/runtime.py Fri Oct 02 11:27:08 2009 +0200 @@ -446,7 +446,7 @@ def preprocess_wrapper(self, *args, **kwargs): pc = tuple(self.program_counter) try: - return self._pool.pop(pc) + return succeed(self._pool.pop(pc)) except KeyError: key = (generator, args) pcs = self._needed_data.setdefault(key, []) @@ -750,30 +750,25 @@ """ def update(results, program_counters): - # We concatenate the sub-lists in results. - results = sum(results, []) - - # The pool must map program counters to Deferreds to - # present a uniform interface for the functions we - # pre-process. - results = map(succeed, results) - # Update the pool with pairs of program counter and data. self._pool.update(zip(program_counters, results)) - # Return a Deferred that waits on the individual results. - # This is important to make it possible for the players to - # avoid starting before the pre-processing is complete. - return deep_wait(results) - + wait_list = [] for ((generator, args), program_counters) in program.iteritems(): print "Preprocessing %s (%d items)" % (generator, len(program_counters)) func = getattr(self, generator) - results = [] - while len(results) < len(program_counters): - results += func(*args) - self._pool.update(zip(program_counters, results)) - return DeferredList(results).addCallback(lambda _: None) + + block_size = 20 + while program_counters: + results = [] + while len(results) < len(program_counters) and \ + len(results) < block_size: + results += func(*args) + ready = gatherResults(results) + ready.addCallback(update, program_counters[:len(results)]) + del program_counters[:len(results)] + wait_list.append(ready) + return gatherResults(wait_list) def input(self, inputters, field, number=None): """Input *number* to the computation.