viff

changeset 1185:89b93bc75476

Introduced complex callbacks. These are only executed when process_deferred_queue() is not called recursively.
author Marcel Keller <mkeller@cs.au.dk>
date Wed, 20 May 2009 19:34:03 +0200
parents 6cd5ceb87542
children 0abdcb6c392a
files apps/aes.py viff/aes.py viff/runtime.py
diffstat 3 files changed, 38 insertions(+), 7 deletions(-) [+]
line diff
     1.1 --- a/apps/aes.py	Tue May 19 17:04:07 2009 +0200
     1.2 +++ b/apps/aes.py	Wed May 20 19:34:03 2009 +0200
     1.3 @@ -83,7 +83,7 @@
     1.4          rt.shutdown()
     1.5  
     1.6      g = gather_shares(opened_ciphertext)
     1.7 -    rt.schedule_callback(g, fin)
     1.8 +    rt.schedule_complex_callback(g, fin)
     1.9  
    1.10  def share_key(rt):
    1.11      key =  []
    1.12 @@ -96,7 +96,7 @@
    1.13              key.append(rt.input([inputter], GF256))
    1.14  
    1.15      s = rt.synchronize()
    1.16 -    s.addCallback(encrypt, rt, key)
    1.17 +    rt.schedule_complex_callback(s, encrypt, rt, key)
    1.18  
    1.19  rt = create_runtime(id, players, 1, options)
    1.20  rt.addCallback(share_key)
     2.1 --- a/viff/aes.py	Tue May 19 17:04:07 2009 +0200
     2.2 +++ b/viff/aes.py	Wed May 20 19:34:03 2009 +0200
     2.3 @@ -377,9 +377,9 @@
     2.4                  trigger.addCallback(progress, i, time.time())
     2.5  
     2.6                  if (i < self.rounds - 1):
     2.7 -                    self.runtime.schedule_callback(trigger, round, state, i + 1)
     2.8 +                    self.runtime.schedule_complex_callback(trigger, round, state, i + 1)
     2.9                  else:
    2.10 -                    self.runtime.schedule_callback(trigger, final_round, state)
    2.11 +                    self.runtime.schedule_complex_callback(trigger, final_round, state)
    2.12  
    2.13              prep_progress(i, start_round)
    2.14  
     3.1 --- a/viff/runtime.py	Tue May 19 17:04:07 2009 +0200
     3.2 +++ b/viff/runtime.py	Wed May 20 19:34:03 2009 +0200
     3.3 @@ -538,6 +538,7 @@
     3.4  
     3.5          #: Queue of deferreds and data.
     3.6          self.deferred_queue = []
     3.7 +        self.complex_deferred_queue = []
     3.8          #: Counter for calls of activate_reactor().
     3.9          self.activation_counter = 0
    3.10          #: Record the recursion depth.
    3.11 @@ -628,6 +629,25 @@
    3.12  
    3.13          return deferred.addCallback(callback_wrapper, *args, **kwargs)
    3.14  
    3.15 +    def schedule_complex_callback(self, deferred, func, *args, **kwargs):
    3.16 +        """Schedule a complex callback, i.e. a callback which blocks a
    3.17 +        long time.
    3.18 +
    3.19 +        Consider that the deferred is forked, i.e. if the callback returns
    3.20 +        something to be used afterwards, add further callbacks to the returned
    3.21 +        deferred."""
    3.22 +
    3.23 +        if isinstance(deferred, Share):
    3.24 +            fork = Share(deferred.runtime, deferred.field)
    3.25 +        else:
    3.26 +            fork = Deferred()
    3.27 +
    3.28 +        def queue_callback(result, runtime, fork):
    3.29 +            runtime.complex_deferred_queue.append((fork, result))
    3.30 +
    3.31 +        deferred.addCallback(queue_callback, self, fork)
    3.32 +        return self.schedule_callback(fork, func, *args, **kwargs)
    3.33 +
    3.34      @increment_pc
    3.35      def synchronize(self):
    3.36          """Introduce a synchronization point.
    3.37 @@ -778,10 +798,21 @@
    3.38          self.deferred_queue.append((deferred, data))
    3.39  
    3.40      def process_deferred_queue(self):
    3.41 -        """Execute the callbacks of the deferreds in the queue."""
    3.42 +        """Execute the callbacks of the deferreds in the queue.
    3.43  
    3.44 -        while(self.deferred_queue):
    3.45 -            deferred, data = self.deferred_queue.pop(0)
    3.46 +        If this function is not called via activate_reactor(), also
    3.47 +        complex callbacks are executed."""
    3.48 +
    3.49 +        self.process_queue(self.deferred_queue)
    3.50 +
    3.51 +        if self.depth_counter == 0:
    3.52 +            self.process_queue(self.complex_deferred_queue)
    3.53 +
    3.54 +    def process_queue(self, queue):
    3.55 +        """Execute the callbacks of the deferreds in *queue*."""
    3.56 +
    3.57 +        while(queue):
    3.58 +            deferred, data = queue.pop(0)
    3.59              deferred.callback(data)
    3.60  
    3.61      def activate_reactor(self):