changeset 1185:89b93bc75476

Introduced complex callbacks. These are only executed when process_deferred_queue() is not called recursively.
author Marcel Keller <mkeller@cs.au.dk>
date Wed, 20 May 2009 19:34:03 +0200
parents 6cd5ceb87542
children 0abdcb6c392a
files apps/aes.py viff/aes.py viff/runtime.py
diffstat 3 files changed, 38 insertions(+), 7 deletions(-) [+]
line wrap: on
line diff
--- a/apps/aes.py	Tue May 19 17:04:07 2009 +0200
+++ b/apps/aes.py	Wed May 20 19:34:03 2009 +0200
@@ -83,7 +83,7 @@
         rt.shutdown()
 
     g = gather_shares(opened_ciphertext)
-    rt.schedule_callback(g, fin)
+    rt.schedule_complex_callback(g, fin)
 
 def share_key(rt):
     key =  []
@@ -96,7 +96,7 @@
             key.append(rt.input([inputter], GF256))
 
     s = rt.synchronize()
-    s.addCallback(encrypt, rt, key)
+    rt.schedule_complex_callback(s, encrypt, rt, key)
 
 rt = create_runtime(id, players, 1, options)
 rt.addCallback(share_key)
--- a/viff/aes.py	Tue May 19 17:04:07 2009 +0200
+++ b/viff/aes.py	Wed May 20 19:34:03 2009 +0200
@@ -377,9 +377,9 @@
                 trigger.addCallback(progress, i, time.time())
 
                 if (i < self.rounds - 1):
-                    self.runtime.schedule_callback(trigger, round, state, i + 1)
+                    self.runtime.schedule_complex_callback(trigger, round, state, i + 1)
                 else:
-                    self.runtime.schedule_callback(trigger, final_round, state)
+                    self.runtime.schedule_complex_callback(trigger, final_round, state)
 
             prep_progress(i, start_round)
 
--- a/viff/runtime.py	Tue May 19 17:04:07 2009 +0200
+++ b/viff/runtime.py	Wed May 20 19:34:03 2009 +0200
@@ -538,6 +538,7 @@
 
         #: Queue of deferreds and data.
         self.deferred_queue = []
+        self.complex_deferred_queue = []
         #: Counter for calls of activate_reactor().
         self.activation_counter = 0
         #: Record the recursion depth.
@@ -628,6 +629,25 @@
 
         return deferred.addCallback(callback_wrapper, *args, **kwargs)
 
+    def schedule_complex_callback(self, deferred, func, *args, **kwargs):
+        """Schedule a complex callback, i.e. a callback which blocks a
+        long time.
+
+        Consider that the deferred is forked, i.e. if the callback returns
+        something to be used afterwards, add further callbacks to the returned
+        deferred."""
+
+        if isinstance(deferred, Share):
+            fork = Share(deferred.runtime, deferred.field)
+        else:
+            fork = Deferred()
+
+        def queue_callback(result, runtime, fork):
+            runtime.complex_deferred_queue.append((fork, result))
+
+        deferred.addCallback(queue_callback, self, fork)
+        return self.schedule_callback(fork, func, *args, **kwargs)
+
     @increment_pc
     def synchronize(self):
         """Introduce a synchronization point.
@@ -778,10 +798,21 @@
         self.deferred_queue.append((deferred, data))
 
     def process_deferred_queue(self):
-        """Execute the callbacks of the deferreds in the queue."""
+        """Execute the callbacks of the deferreds in the queue.
 
-        while(self.deferred_queue):
-            deferred, data = self.deferred_queue.pop(0)
+        If this function is not called via activate_reactor(), also
+        complex callbacks are executed."""
+
+        self.process_queue(self.deferred_queue)
+
+        if self.depth_counter == 0:
+            self.process_queue(self.complex_deferred_queue)
+
+    def process_queue(self, queue):
+        """Execute the callbacks of the deferreds in *queue*."""
+
+        while(queue):
+            deferred, data = queue.pop(0)
             deferred.callback(data)
 
     def activate_reactor(self):