viff

changeset 1319:96e0f7de22b0

Store data without deferreds in preprocessing pool.
author Marcel Keller <mkeller@cs.au.dk>
date Fri, 02 Oct 2009 11:27:08 +0200
parents 70c995d31d1a
children fb09cb799cc8
files viff/runtime.py
diffstat 1 files changed, 14 insertions(+), 19 deletions(-) [+]
line diff
     1.1 --- a/viff/runtime.py	Fri Sep 25 20:30:05 2009 +0200
     1.2 +++ b/viff/runtime.py	Fri Oct 02 11:27:08 2009 +0200
     1.3 @@ -446,7 +446,7 @@
     1.4          def preprocess_wrapper(self, *args, **kwargs):
     1.5              pc = tuple(self.program_counter)
     1.6              try:
     1.7 -                return self._pool.pop(pc)
     1.8 +                return succeed(self._pool.pop(pc))
     1.9              except KeyError:
    1.10                  key = (generator, args)
    1.11                  pcs = self._needed_data.setdefault(key, [])
    1.12 @@ -750,30 +750,25 @@
    1.13          """
    1.14  
    1.15          def update(results, program_counters):
    1.16 -            # We concatenate the sub-lists in results.
    1.17 -            results = sum(results, [])
    1.18 -
    1.19 -            # The pool must map program counters to Deferreds to
    1.20 -            # present a uniform interface for the functions we
    1.21 -            # pre-process.
    1.22 -            results = map(succeed, results)
    1.23 -
    1.24              # Update the pool with pairs of program counter and data.
    1.25              self._pool.update(zip(program_counters, results))
    1.26  
    1.27 -            # Return a Deferred that waits on the individual results.
    1.28 -            # This is important to make it possible for the players to
    1.29 -            # avoid starting before the pre-processing is complete.
    1.30 -            return deep_wait(results)
    1.31 -
    1.32 +        wait_list = []
    1.33          for ((generator, args), program_counters) in program.iteritems():
    1.34              print "Preprocessing %s (%d items)" % (generator, len(program_counters))
    1.35              func = getattr(self, generator)
    1.36 -            results = []
    1.37 -            while len(results) < len(program_counters):
    1.38 -                results += func(*args)
    1.39 -            self._pool.update(zip(program_counters, results))
    1.40 -        return DeferredList(results).addCallback(lambda _: None)
    1.41 +
    1.42 +            block_size = 20
    1.43 +            while program_counters:
    1.44 +                results = []
    1.45 +                while len(results) < len(program_counters) and \
    1.46 +                          len(results) < block_size:
    1.47 +                    results += func(*args)
    1.48 +                ready = gatherResults(results)
    1.49 +                ready.addCallback(update, program_counters[:len(results)])
    1.50 +                del program_counters[:len(results)]
    1.51 +                wait_list.append(ready)
    1.52 +        return gatherResults(wait_list)
    1.53  
    1.54      def input(self, inputters, field, number=None):
    1.55          """Input *number* to the computation.