changeset 1191:8ec45943c12a

Merged with Marcel.
author Martin Geisler <mg@cs.au.dk>
date Wed, 27 May 2009 22:29:31 +0200
parents b5eea8738968 b1ee0d5e116f
children c1259ceebc55
files viff/test/util.py
diffstat 30 files changed, 229 insertions(+), 6 deletions(-) [+]
line wrap: on
line diff
--- a/apps/aes.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/aes.py	Wed May 27 22:29:31 2009 +0200
@@ -23,6 +23,8 @@
 import time
 from optparse import OptionParser
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF256
@@ -81,7 +83,7 @@
         rt.shutdown()
 
     g = gather_shares(opened_ciphertext)
-    rt.schedule_callback(g, fin)
+    rt.schedule_complex_callback(g, fin)
 
 def share_key(rt):
     key =  []
@@ -94,7 +96,7 @@
             key.append(rt.input([inputter], GF256))
 
     s = rt.synchronize()
-    s.addCallback(encrypt, rt, key)
+    rt.schedule_complex_callback(s, encrypt, rt, key)
 
 rt = create_runtime(id, players, 1, options)
 rt.addCallback(share_key)
--- a/apps/aes_inversion.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/aes_inversion.py	Wed May 27 22:29:31 2009 +0200
@@ -24,6 +24,8 @@
 import time
 from optparse import OptionParser
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF256
--- a/apps/beginner.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/beginner.py	Wed May 27 22:29:31 2009 +0200
@@ -37,6 +37,8 @@
 # Some useful imports.
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/benchmark.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/benchmark.py	Wed May 27 22:29:31 2009 +0200
@@ -60,6 +60,8 @@
 import operator
 from pprint import pformat
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF, GF256, FakeGF
@@ -176,7 +178,7 @@
             # run with no preprocessing. So they are quite brittle.
             if self.operation == operator.mul:
                 key = ("generate_triples", (Zp,))
-                desc = [(i, 1, 0) for i in range(2, 2 + count)]
+                desc = [(i, 1, 0) for i in range(3, 3 + count)]
                 program_desc.setdefault(key, []).extend(desc)
             elif isinstance(self.rt, ComparisonToft05Mixin):
                 key = ("generate_triples", (GF256,))
--- a/apps/compare.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/compare.py	Wed May 27 22:29:31 2009 +0200
@@ -20,6 +20,8 @@
 import sys
 import random
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 from twisted.internet.defer import gatherResults
 
--- a/apps/divide.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/divide.py	Wed May 27 22:29:31 2009 +0200
@@ -32,6 +32,8 @@
 """
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/double-auction/double-auction.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/double-auction/double-auction.py	Wed May 27 22:29:31 2009 +0200
@@ -38,6 +38,8 @@
 import time, random
 from optparse import OptionParser
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff import shamir
--- a/apps/equality.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/equality.py	Wed May 27 22:29:31 2009 +0200
@@ -31,6 +31,8 @@
 """
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/eval-poly.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/eval-poly.py	Wed May 27 22:29:31 2009 +0200
@@ -26,6 +26,8 @@
 from time import time
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/gc-test.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/gc-test.py	Wed May 27 22:29:31 2009 +0200
@@ -26,6 +26,8 @@
 import sys
 from time import time
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/int-bit-conversion.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/int-bit-conversion.py	Wed May 27 22:29:31 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF, GF256
--- a/apps/millionaires.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/millionaires.py	Wed May 27 22:29:31 2009 +0200
@@ -32,6 +32,8 @@
 # the example with '--help' for help with the command line options.
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/multiply.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/multiply.py	Wed May 27 22:29:31 2009 +0200
@@ -18,6 +18,8 @@
 # License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/online-comparison-benchmark.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/online-comparison-benchmark.py	Wed May 27 22:29:31 2009 +0200
@@ -21,6 +21,8 @@
 import signal
 from optparse import OptionParser
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 from twisted.internet.defer import DeferredList, gatherResults
 
--- a/apps/paillier.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/paillier.py	Wed May 27 22:29:31 2009 +0200
@@ -26,6 +26,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/prss-and-open.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/prss-and-open.py	Wed May 27 22:29:31 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 from twisted.internet.defer import gatherResults
 
--- a/apps/seq-mul.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/seq-mul.py	Wed May 27 22:29:31 2009 +0200
@@ -18,6 +18,8 @@
 # License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/shamir-share-open.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/shamir-share-open.py	Wed May 27 22:29:31 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/share-open.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/share-open.py	Wed May 27 22:29:31 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF256
--- a/apps/sort.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/sort.py	Wed May 27 22:29:31 2009 +0200
@@ -51,6 +51,8 @@
 
 from math import log, floor
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from progressbar import ProgressBar, Percentage, Bar, ETA, ProgressBarWidget
--- a/apps/sum.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/sum.py	Wed May 27 22:29:31 2009 +0200
@@ -18,6 +18,8 @@
 # License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/two-fields.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/two-fields.py	Wed May 27 22:29:31 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/xor-all.py	Wed May 20 09:34:42 2009 +0200
+++ b/apps/xor-all.py	Wed May 27 22:29:31 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF256
--- a/viff/active.py	Wed May 20 09:34:42 2009 +0200
+++ b/viff/active.py	Wed May 27 22:29:31 2009 +0200
@@ -70,6 +70,9 @@
             for protocol in self.protocols.itervalues():
                 protocol.sendData(pc, data_type, message)
 
+            # do actual communication
+            self.activate_reactor()
+
         def echo_received(message, peer_id):
             # This is called when we receive an echo message. It
             # updates the echo count for the message and enters the
@@ -133,6 +136,8 @@
             d_send = Deferred().addCallback(send_received)
             self._expect_data(sender, SEND, d_send)
 
+        # do actual communication
+        self.activate_reactor()
 
         return result
 
@@ -261,6 +266,9 @@
                 # first T shares.
                 return rvec[:T]
 
+            # do actual communication
+            self.activate_reactor()
+
         result = gather_shares(svec[T:])
         self.schedule_callback(result, exchange)
         return result
@@ -361,6 +369,9 @@
                 # first T shares.
                 return (rvec1[:T], rvec2[:T])
 
+            # do actual communication
+            self.activate_reactor()
+
         result = gather_shares([gather_shares(svec1[T:]), gather_shares(svec2[T:])])
         self.schedule_callback(result, exchange)
         return result
--- a/viff/aes.py	Wed May 20 09:34:42 2009 +0200
+++ b/viff/aes.py	Wed May 27 22:29:31 2009 +0200
@@ -377,9 +377,9 @@
                 trigger.addCallback(progress, i, time.time())
 
                 if (i < self.rounds - 1):
-                    self.runtime.schedule_callback(trigger, round, state, i + 1)
+                    self.runtime.schedule_complex_callback(trigger, round, state, i + 1)
                 else:
-                    self.runtime.schedule_callback(trigger, final_round, state)
+                    self.runtime.schedule_complex_callback(trigger, final_round, state)
 
             prep_progress(i, start_round)
 
--- a/viff/passive.py	Wed May 20 09:34:42 2009 +0200
+++ b/viff/passive.py	Wed May 27 22:29:31 2009 +0200
@@ -104,6 +104,10 @@
 
         result = share.clone()
         self.schedule_callback(result, exchange)
+
+        # do actual communication
+        self.activate_reactor()
+
         if self.id in receivers:
             return result
 
@@ -204,6 +208,10 @@
         result = gather_shares([share_a, share_b])
         result.addCallback(lambda (a, b): a * b)
         self.schedule_callback(result, share_recombine)
+
+        # do actual communication
+        self.activate_reactor()
+
         return result
 
     def pow(self, share, exponent):
@@ -501,6 +509,9 @@
             else:
                 results.append(self._expect_share(peer_id, field))
 
+        # do actual communication
+        self.activate_reactor()
+
         # Unpack a singleton list.
         if len(results) == 1:
             return results[0]
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/viff/reactor.py	Wed May 27 22:29:31 2009 +0200
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright 2009 VIFF Development Team.
+#
+# This file is part of VIFF, the Virtual Ideal Functionality Framework.
+#
+# VIFF is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License (LGPL) as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# VIFF is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
+# Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
+
+"""VIFF reactor to have control over the scheduling."""
+
+__docformat__ = "restructuredtext"
+
+from twisted.internet.selectreactor import SelectReactor
+
+
+class ViffReactor(SelectReactor):
+    """VIFF reactor.
+
+    The only difference to the SelectReactor is the loop call.
+    From there, doIteration() can be called recursively."""
+
+    def __init__(self):
+        SelectReactor.__init__(self)
+        self.loopCall = lambda: None
+   
+    def setLoopCall(self, f):
+        self.loopCall = f
+
+    def doIteration(self, t):
+        # Do the same as in mainLoop() first.
+        self.runUntilCurrent()
+        t2 = self.timeout()
+
+        if (t2 is not None):
+            t = min(t, self.running and t2)
+
+        SelectReactor.doIteration(self, t)
+        self.loopCall()
+
+def install():
+    """Use the VIFF reactor."""
+    reactor = ViffReactor()
+    from twisted.internet.main import installReactor
+    installReactor(reactor)
--- a/viff/runtime.py	Wed May 20 09:34:42 2009 +0200
+++ b/viff/runtime.py	Wed May 27 22:29:31 2009 +0200
@@ -40,6 +40,7 @@
 
 from viff.field import GF256, FieldElement
 from viff.util import wrapper, rand, deep_wait, track_memory_usage
+import viff.reactor
 
 from twisted.internet import reactor
 from twisted.internet.task import LoopingCall
@@ -318,7 +319,7 @@
                     deferred = deq.popleft()
                     if not deq:
                         del self.waiting_deferreds[key]
-                    deferred.callback(data)
+                    self.factory.runtime.handle_deferred_data(deferred, data)
                 else:
                     deq = self.incoming_data.setdefault(key, deque())
                     deq.append(data)
@@ -533,6 +534,17 @@
         # communicating with ourselves.
         self.add_player(player, None)
 
+        #: Queue of deferreds and data.
+        self.deferred_queue = deque()
+        self.complex_deferred_queue = deque()
+        #: Counter for calls of activate_reactor().
+        self.activation_counter = 0
+        #: Record the recursion depth.
+        self.depth_counter = 0
+        self.max_depth = 0
+        #: Use deferred queues only if the ViffReactor is running.
+        self.using_viff_reactor = isinstance(reactor, viff.reactor.ViffReactor)
+
     def add_player(self, player, protocol):
         self.players[player.id] = player
         self.num_players = len(self.players)
@@ -617,6 +629,25 @@
 
         return deferred.addCallback(callback_wrapper, *args, **kwargs)
 
+    def schedule_complex_callback(self, deferred, func, *args, **kwargs):
+        """Schedule a complex callback, i.e. a callback which blocks a
+        long time.
+
+        Consider that the deferred is forked, i.e. if the callback returns
+        something to be used afterwards, add further callbacks to the returned
+        deferred."""
+
+        if isinstance(deferred, Share):
+            fork = Share(deferred.runtime, deferred.field)
+        else:
+            fork = Deferred()
+
+        def queue_callback(result, runtime, fork):
+            runtime.complex_deferred_queue.append((fork, result))
+
+        deferred.addCallback(queue_callback, self, fork)
+        return self.schedule_callback(fork, func, *args, **kwargs)
+
     @increment_pc
     def synchronize(self):
         """Introduce a synchronization point.
@@ -761,6 +792,54 @@
         Python integer."""
         raise NotImplemented("Override this abstract method in a subclass.")
 
+    def handle_deferred_data(self, deferred, data):
+        """Put deferred and data into the queue if the ViffReactor is running. 
+        Otherwise, just execute the callback."""
+
+        if (self.using_viff_reactor):
+            self.deferred_queue.append((deferred, data))
+        else:
+            deferred.callback(data)
+
+    def process_deferred_queue(self):
+        """Execute the callbacks of the deferreds in the queue.
+
+        If this function is not called via activate_reactor(), also
+        complex callbacks are executed."""
+
+        self.process_queue(self.deferred_queue)
+
+        if self.depth_counter == 0:
+            self.process_queue(self.complex_deferred_queue)
+
+    def process_queue(self, queue):
+        """Execute the callbacks of the deferreds in *queue*."""
+
+        while(queue):
+            deferred, data = queue.popleft()
+            deferred.callback(data)
+
+    def activate_reactor(self):
+        """Activate the reactor to do actual communcation.
+
+        This is where the recursion happens."""
+
+        self.activation_counter += 1
+
+        # setting the number to n makes the reactor called 
+        # only every n-th time
+        if (self.activation_counter >= 2):
+            self.depth_counter += 1
+
+            if (self.depth_counter > self.max_depth):
+                # Record the maximal depth reached.
+                self.max_depth = self.depth_counter
+
+            reactor.doIteration(0)
+
+            self.depth_counter -= 1
+            self.activation_counter = 0
+
 
 def make_runtime_class(runtime_class=None, mixins=None):
     """Creates a new runtime class with *runtime_class* as a base
@@ -912,6 +991,10 @@
             print "Will connect to %s" % player
             connect(player.host, player.port)
 
+    if runtime.using_viff_reactor:
+        # Process the deferred queue after every reactor iteration.
+        reactor.setLoopCall(runtime.process_deferred_queue)
+
     return result
 
 if __name__ == "__main__":
--- a/viff/test/__init__.py	Wed May 20 09:34:42 2009 +0200
+++ b/viff/test/__init__.py	Wed May 27 22:29:31 2009 +0200
@@ -14,3 +14,6 @@
 #
 # You should have received a copy of the GNU Lesser General Public
 # License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
+
+import viff.reactor
+viff.reactor.install()
--- a/viff/test/util.py	Wed May 20 09:34:42 2009 +0200
+++ b/viff/test/util.py	Wed May 27 22:29:31 2009 +0200
@@ -19,6 +19,7 @@
 
 from twisted.internet.defer import Deferred, gatherResults, maybeDeferred
 from twisted.trial.unittest import TestCase
+from twisted.internet import reactor
 
 from viff.passive import PassiveRuntime
 from viff.runtime import Share, ShareExchanger, ShareExchangerFactory
@@ -134,6 +135,7 @@
         self.close_sentinels = []
 
         self.runtimes = []
+        self.real_runtimes = []
         for id in reversed(range(1, self.num_players+1)):
             _, players = load_config(configs[id])
             self.create_loopback_runtime(id, players)
@@ -177,6 +179,18 @@
         # the Runtime, since we want everybody to wait until all
         # runtimes are ready.
         self.runtimes.append(result)
+        self.real_runtimes.append(runtime)
+        self.i = 0
+
+        # This loop call should ensure the queues of the parties are
+        # processed in a more or less fair manner. This is necessary
+        # because we have only one reactor for all parties here.
+        def loop_call():
+            for runtime in self.real_runtimes[self.i:] + self.real_runtimes[:self.i]:
+                runtime.process_deferred_queue()
+                self.i = (self.i + 1) % len(self.real_runtimes)
+
+        reactor.setLoopCall(loop_call)
 
         for peer_id in players:
             if peer_id != id: