changeset 1176:e86a007bddbd

Merged with Marcel.
author Martin Geisler <mg@cs.au.dk>
date Thu, 07 May 2009 11:09:47 +0200
parents 38e4007378df d6ff18bd643c
children a1304b0072d4 e1e0c107c40b
files viff/runtime.py
diffstat 8 files changed, 17 insertions(+), 21 deletions(-) [+]
line wrap: on
line diff
--- a/apps/aes.py	Wed Apr 29 09:39:21 2009 +0200
+++ b/apps/aes.py	Thu May 07 11:09:47 2009 +0200
@@ -81,7 +81,7 @@
         rt.shutdown()
 
     g = gather_shares(opened_ciphertext)
-    g.addCallback(fin)
+    rt.schedule_callback(g, fin)
 
 def share_key(rt):
     key =  []
--- a/apps/millionaires.py	Wed Apr 29 09:39:21 2009 +0200
+++ b/apps/millionaires.py	Thu May 07 11:09:47 2009 +0200
@@ -97,10 +97,10 @@
         # the argument (which is None since self.results_ready does
         # not return anything), so we throw it away using a lambda
         # expressions which ignores its first argument.
-        results.addCallback(lambda _: runtime.synchronize())
+        runtime.schedule_callback(results, lambda _: runtime.synchronize())
         # The next callback shuts the runtime down, killing the
         # connections between the players.
-        results.addCallback(lambda _: runtime.shutdown())
+        runtime.schedule_callback(results, lambda _: runtime.shutdown())
 
     def results_ready(self, results):
         # Since this method is called as a callback above, the results
--- a/viff/active.py	Wed Apr 29 09:39:21 2009 +0200
+++ b/viff/active.py	Thu May 07 11:09:47 2009 +0200
@@ -502,6 +502,7 @@
         result = Share(self, share_x.field)
         # This is the Deferred we will do processing on.
         triple = self.get_triple(share_x.field)
+        triple.addCallback(gather_shares)
         self.schedule_callback(triple, finish_mul)
         # We add the result to the chains in triple.
         triple.chainDeferred(result)
--- a/viff/equality.py	Wed Apr 29 09:39:21 2009 +0200
+++ b/viff/equality.py	Thu May 07 11:09:47 2009 +0200
@@ -74,7 +74,7 @@
                 xj = (-1) * (1/Zp(2)) * (bj - 1)
             return xj
 
-        x = [cj.addCallback(finish, bj) for cj, bj in zip(c, b)]
+        x = [self.schedule_callback(cj, finish, bj) for cj, bj in zip(c, b)]
 
         # Take the product (this is here the same as the "and") of all
         # the x'es
--- a/viff/passive.py	Wed Apr 29 09:39:21 2009 +0200
+++ b/viff/passive.py	Thu May 07 11:09:47 2009 +0200
@@ -98,7 +98,7 @@
                         d = Share(self, share.field, (share.field(peer_id), share))
                     else:
                         d = self._expect_share(peer_id, share.field)
-                        self.schedule_callback(d, lambda s, peer_id: (s.field(peer_id), s), peer_id)
+                        d.addCallback(lambda s, peer_id: (s.field(peer_id), s), peer_id)
                     deferreds.append(d)
                 return recombine(deferreds)
 
--- a/viff/runtime.py	Wed Apr 29 09:39:21 2009 +0200
+++ b/viff/runtime.py	Thu May 07 11:09:47 2009 +0200
@@ -585,8 +585,9 @@
         The runtime is shut down when all variables are calculated.
         """
         dl = DeferredList(vars)
-        dl.addCallback(lambda _: self.shutdown())
+        self.schedule_callback(dl, lambda _: self.shutdown())
 
+    @increment_pc
     def schedule_callback(self, deferred, func, *args, **kwargs):
         """Schedule a callback on a deferred with the correct program
         counter.
@@ -617,7 +618,7 @@
             finally:
                 self.program_counter[:] = current_pc
 
-        deferred.addCallback(callback_wrapper, *args, **kwargs)
+        return deferred.addCallback(callback_wrapper, *args, **kwargs)
 
     @increment_pc
     def synchronize(self):
@@ -832,19 +833,15 @@
         # profiler here and stop it upon shutdown, but this triggers
         # http://bugs.python.org/issue1375 since the start and stop
         # calls are in different stack frames.
-        import hotshot
-        prof = hotshot.Profile("player-%d.prof" % id)
+        import cProfile
+        prof = cProfile.Profile()
         old_run = reactor.run
         def new_run(*args, **kwargs):
             print "Starting reactor with profiling"
             prof.runcall(old_run, *args, **kwargs)
 
-            import sys
-            import hotshot.stats
-            print "Loading profiling statistics...",
-            sys.stdout.flush()
-            stats = hotshot.stats.load("player-%d.prof" % id)
-            print "done."
+            import pstats
+            stats = pstats.Stats(prof)
             print
             stats.strip_dirs()
             stats.sort_stats("time", "calls")
--- a/viff/test/test_active_runtime.py	Wed Apr 29 09:39:21 2009 +0200
+++ b/viff/test/test_active_runtime.py	Thu May 07 11:09:47 2009 +0200
@@ -88,7 +88,7 @@
 
         double = runtime.double_share_random(T, runtime.threshold,
                                              2*runtime.threshold, self.Zp)
-        double.addCallback(check)
+        runtime.schedule_callback(double, check)
         return double
 
     @protocol
@@ -116,7 +116,7 @@
         count, triples = runtime.generate_triples(self.Zp)
         self.assertEquals(count, runtime.num_players - 2*runtime.threshold)
 
-        triples.addCallback(check)
+        runtime.schedule_callback(triples, check)
         return triples
 
 
--- a/viff/test/test_basic_runtime.py	Wed Apr 29 09:39:21 2009 +0200
+++ b/viff/test/test_basic_runtime.py	Thu May 07 11:09:47 2009 +0200
@@ -62,13 +62,13 @@
         """
 
         def verify_program_counter(_):
-            self.assertEquals(runtime.program_counter, [0])
+            self.assertEquals(runtime.program_counter, [1, 0])
 
         d = Deferred()
         runtime.schedule_callback(d, verify_program_counter)
 
         runtime.synchronize()
-        self.assertEquals(runtime.program_counter, [1])
+        self.assertEquals(runtime.program_counter, [2])
 
         # Now trigger verify_program_counter.
         d.callback(None)
@@ -129,8 +129,6 @@
         d2.callback(None)
 
         return gatherResults([d1, d2])
-    test_multiple_callbacks.skip = ("TODO: Scheduling callbacks fails to "
-                                    "increment program counter!")
 
     @protocol
     def test_multi_send(self, runtime):