changeset 1319:96e0f7de22b0

Store data without deferreds in preprocessing pool.
author Marcel Keller <mkeller@cs.au.dk>
date Fri, 02 Oct 2009 11:27:08 +0200
parents 70c995d31d1a
children fb09cb799cc8
files viff/runtime.py
diffstat 1 files changed, 14 insertions(+), 19 deletions(-) [+]
line wrap: on
line diff
--- a/viff/runtime.py	Fri Sep 25 20:30:05 2009 +0200
+++ b/viff/runtime.py	Fri Oct 02 11:27:08 2009 +0200
@@ -446,7 +446,7 @@
         def preprocess_wrapper(self, *args, **kwargs):
             pc = tuple(self.program_counter)
             try:
-                return self._pool.pop(pc)
+                return succeed(self._pool.pop(pc))
             except KeyError:
                 key = (generator, args)
                 pcs = self._needed_data.setdefault(key, [])
@@ -750,30 +750,25 @@
         """
 
         def update(results, program_counters):
-            # We concatenate the sub-lists in results.
-            results = sum(results, [])
-
-            # The pool must map program counters to Deferreds to
-            # present a uniform interface for the functions we
-            # pre-process.
-            results = map(succeed, results)
-
             # Update the pool with pairs of program counter and data.
             self._pool.update(zip(program_counters, results))
 
-            # Return a Deferred that waits on the individual results.
-            # This is important to make it possible for the players to
-            # avoid starting before the pre-processing is complete.
-            return deep_wait(results)
-
+        wait_list = []
         for ((generator, args), program_counters) in program.iteritems():
             print "Preprocessing %s (%d items)" % (generator, len(program_counters))
             func = getattr(self, generator)
-            results = []
-            while len(results) < len(program_counters):
-                results += func(*args)
-            self._pool.update(zip(program_counters, results))
-        return DeferredList(results).addCallback(lambda _: None)
+
+            block_size = 20
+            while program_counters:
+                results = []
+                while len(results) < len(program_counters) and \
+                          len(results) < block_size:
+                    results += func(*args)
+                ready = gatherResults(results)
+                ready.addCallback(update, program_counters[:len(results)])
+                del program_counters[:len(results)]
+                wait_list.append(ready)
+        return gatherResults(wait_list)
 
     def input(self, inputters, field, number=None):
         """Input *number* to the computation.