changeset 1184:6cd5ceb87542

Added reentrant reactor. Data now is sent faster and incoming data is processed faster.
author Marcel Keller <mkeller@cs.au.dk>
date Tue, 19 May 2009 17:04:07 +0200
parents e89fb02c5e3d
children 89b93bc75476
files apps/aes.py apps/aes_inversion.py apps/beginner.py apps/benchmark.py apps/compare.py apps/divide.py apps/equality.py apps/eval-poly.py apps/gc-test.py apps/int-bit-conversion.py apps/millionaires.py apps/multiply.py apps/online-comparison-benchmark.py apps/paillier.py apps/prss-and-open.py apps/seq-mul.py apps/shamir-share-open.py apps/share-open.py apps/sort.py apps/sum.py apps/two-fields.py apps/xor-all.py viff/active.py viff/passive.py viff/reactor.py viff/runtime.py viff/test/__init__.py viff/test/util.py
diffstat 28 files changed, 186 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/apps/aes.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/aes.py	Tue May 19 17:04:07 2009 +0200
@@ -23,6 +23,8 @@
 import time
 from optparse import OptionParser
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF256
--- a/apps/aes_inversion.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/aes_inversion.py	Tue May 19 17:04:07 2009 +0200
@@ -24,6 +24,8 @@
 import time
 from optparse import OptionParser
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF256
--- a/apps/beginner.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/beginner.py	Tue May 19 17:04:07 2009 +0200
@@ -37,6 +37,8 @@
 # Some useful imports.
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/benchmark.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/benchmark.py	Tue May 19 17:04:07 2009 +0200
@@ -60,6 +60,8 @@
 import operator
 from pprint import pformat
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF, GF256, FakeGF
--- a/apps/compare.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/compare.py	Tue May 19 17:04:07 2009 +0200
@@ -20,6 +20,8 @@
 import sys
 import random
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 from twisted.internet.defer import gatherResults
 
--- a/apps/divide.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/divide.py	Tue May 19 17:04:07 2009 +0200
@@ -32,6 +32,8 @@
 """
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/equality.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/equality.py	Tue May 19 17:04:07 2009 +0200
@@ -31,6 +31,8 @@
 """
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/eval-poly.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/eval-poly.py	Tue May 19 17:04:07 2009 +0200
@@ -26,6 +26,8 @@
 from time import time
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/gc-test.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/gc-test.py	Tue May 19 17:04:07 2009 +0200
@@ -26,6 +26,8 @@
 import sys
 from time import time
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/int-bit-conversion.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/int-bit-conversion.py	Tue May 19 17:04:07 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF, GF256
--- a/apps/millionaires.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/millionaires.py	Tue May 19 17:04:07 2009 +0200
@@ -32,6 +32,8 @@
 # the example with '--help' for help with the command line options.
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/multiply.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/multiply.py	Tue May 19 17:04:07 2009 +0200
@@ -18,6 +18,8 @@
 # License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/online-comparison-benchmark.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/online-comparison-benchmark.py	Tue May 19 17:04:07 2009 +0200
@@ -21,6 +21,8 @@
 import signal
 from optparse import OptionParser
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 from twisted.internet.defer import DeferredList, gatherResults
 
--- a/apps/paillier.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/paillier.py	Tue May 19 17:04:07 2009 +0200
@@ -26,6 +26,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/prss-and-open.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/prss-and-open.py	Tue May 19 17:04:07 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 from twisted.internet.defer import gatherResults
 
--- a/apps/seq-mul.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/seq-mul.py	Tue May 19 17:04:07 2009 +0200
@@ -18,6 +18,8 @@
 # License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/shamir-share-open.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/shamir-share-open.py	Tue May 19 17:04:07 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/share-open.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/share-open.py	Tue May 19 17:04:07 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF256
--- a/apps/sort.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/sort.py	Tue May 19 17:04:07 2009 +0200
@@ -51,6 +51,8 @@
 
 from math import log, floor
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from progressbar import ProgressBar, Percentage, Bar, ETA, ProgressBarWidget
--- a/apps/sum.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/sum.py	Tue May 19 17:04:07 2009 +0200
@@ -18,6 +18,8 @@
 # License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
 
 from optparse import OptionParser
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/two-fields.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/two-fields.py	Tue May 19 17:04:07 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF
--- a/apps/xor-all.py	Thu May 14 12:01:54 2009 +0200
+++ b/apps/xor-all.py	Tue May 19 17:04:07 2009 +0200
@@ -19,6 +19,8 @@
 
 import sys
 
+import viff.reactor
+viff.reactor.install()
 from twisted.internet import reactor
 
 from viff.field import GF256
--- a/viff/active.py	Thu May 14 12:01:54 2009 +0200
+++ b/viff/active.py	Tue May 19 17:04:07 2009 +0200
@@ -70,6 +70,9 @@
             for protocol in self.protocols.itervalues():
                 protocol.sendData(pc, data_type, message)
 
+            # do actual communication
+            self.activate_reactor()
+
         def echo_received(message, peer_id):
             # This is called when we receive an echo message. It
             # updates the echo count for the message and enters the
@@ -133,6 +136,8 @@
             d_send = Deferred().addCallback(send_received)
             self._expect_data(sender, SEND, d_send)
 
+        # do actual communication
+        self.activate_reactor()
 
         return result
 
@@ -261,6 +266,9 @@
                 # first T shares.
                 return rvec[:T]
 
+            # do actual communication
+            self.activate_reactor()
+
         result = gather_shares(svec[T:])
         self.schedule_callback(result, exchange)
         return result
@@ -361,6 +369,9 @@
                 # first T shares.
                 return (rvec1[:T], rvec2[:T])
 
+            # do actual communication
+            self.activate_reactor()
+
         result = gather_shares([gather_shares(svec1[T:]), gather_shares(svec2[T:])])
         self.schedule_callback(result, exchange)
         return result
--- a/viff/passive.py	Thu May 14 12:01:54 2009 +0200
+++ b/viff/passive.py	Tue May 19 17:04:07 2009 +0200
@@ -104,6 +104,10 @@
 
         result = share.clone()
         self.schedule_callback(result, exchange)
+
+        # do actual communication
+        self.activate_reactor()
+
         if self.id in receivers:
             return result
 
@@ -204,6 +208,10 @@
         result = gather_shares([share_a, share_b])
         result.addCallback(lambda (a, b): a * b)
         self.schedule_callback(result, share_recombine)
+
+        # do actual communication
+        self.activate_reactor()
+
         return result
 
     def pow(self, share, exponent):
@@ -501,6 +509,9 @@
             else:
                 results.append(self._expect_share(peer_id, field))
 
+        # do actual communication
+        self.activate_reactor()
+
         # Unpack a singleton list.
         if len(results) == 1:
             return results[0]
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/viff/reactor.py	Tue May 19 17:04:07 2009 +0200
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright 2009 VIFF Development Team.
+#
+# This file is part of VIFF, the Virtual Ideal Functionality Framework.
+#
+# VIFF is free software: you can redistribute it and/or modify it
+# under the terms of the GNU Lesser General Public License (LGPL) as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# VIFF is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
+# Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
+
+"""VIFF reactor to have control over the scheduling."""
+
+__docformat__ = "restructuredtext"
+
+from twisted.internet.selectreactor import SelectReactor
+
+
+class ViffReactor(SelectReactor):
+    """VIFF reactor.
+
+    The only difference to the SelectReactor is the loop call.
+    From there, doIteration() can be called recursively."""
+
+    def __init__(self):
+        SelectReactor.__init__(self)
+        self.loopCall = lambda: None
+   
+    def setLoopCall(self, f):
+        self.loopCall = f
+
+    def doIteration(self, t):
+        # Do the same as in mainLoop() first.
+        self.runUntilCurrent()
+        t2 = self.timeout()
+
+        if (t2 is not None):
+            t = min(t, self.running and t2)
+
+        SelectReactor.doIteration(self, t)
+        self.loopCall()
+
+def install():
+    """Use the VIFF reactor."""
+    reactor = ViffReactor()
+    from twisted.internet.main import installReactor
+    installReactor(reactor)
--- a/viff/runtime.py	Thu May 14 12:01:54 2009 +0200
+++ b/viff/runtime.py	Tue May 19 17:04:07 2009 +0200
@@ -318,7 +318,10 @@
                     deferred = deq.popleft()
                     if not deq:
                         del self.waiting_deferreds[key]
-                    deferred.callback(data)
+
+                    # Just queue, callbacks will be executed
+                    # in process_deferred_queue().
+                    self.factory.runtime.queue_deferred(deferred, data)
                 else:
                     deq = self.incoming_data.setdefault(key, deque())
                     deq.append(data)
@@ -533,6 +536,14 @@
         # communicating with ourselves.
         self.add_player(player, None)
 
+        #: Queue of deferreds and data.
+        self.deferred_queue = []
+        #: Counter for calls of activate_reactor().
+        self.activation_counter = 0
+        #: Record the recursion depth.
+        self.depth_counter = 0
+        self.max_depth = 0
+
     def add_player(self, player, protocol):
         self.players[player.id] = player
         self.num_players = len(self.players)
@@ -761,6 +772,39 @@
         Python integer."""
         raise NotImplemented("Override this abstract method in a subclass.")
 
+    def queue_deferred(self, deferred, data):
+        """Put deferred and data into the queue."""
+
+        self.deferred_queue.append((deferred, data))
+
+    def process_deferred_queue(self):
+        """Execute the callbacks of the deferreds in the queue."""
+
+        while(self.deferred_queue):
+            deferred, data = self.deferred_queue.pop(0)
+            deferred.callback(data)
+
+    def activate_reactor(self):
+        """Activate the reactor to do actual communcation.
+
+        This is where the recursion happens."""
+
+        self.activation_counter += 1
+
+        # setting the number to n makes the reactor called 
+        # only every n-th time
+        if (self.activation_counter >= 2):
+            self.depth_counter += 1
+
+            if (self.depth_counter > self.max_depth):
+                # Record the maximal depth reached.
+                self.max_depth = self.depth_counter
+
+            reactor.doIteration(0)
+
+            self.depth_counter -= 1
+            self.activation_counter = 0
+
 
 def make_runtime_class(runtime_class=None, mixins=None):
     """Creates a new runtime class with *runtime_class* as a base
@@ -912,6 +956,9 @@
             print "Will connect to %s" % player
             connect(player.host, player.port)
 
+    # Process the deferred queue after every reactor iteration.
+    reactor.setLoopCall(runtime.process_deferred_queue)
+
     return result
 
 if __name__ == "__main__":
--- a/viff/test/__init__.py	Thu May 14 12:01:54 2009 +0200
+++ b/viff/test/__init__.py	Tue May 19 17:04:07 2009 +0200
@@ -14,3 +14,6 @@
 #
 # You should have received a copy of the GNU Lesser General Public
 # License along with VIFF. If not, see <http://www.gnu.org/licenses/>.
+
+import viff.reactor
+viff.reactor.install()
--- a/viff/test/util.py	Thu May 14 12:01:54 2009 +0200
+++ b/viff/test/util.py	Tue May 19 17:04:07 2009 +0200
@@ -19,6 +19,7 @@
 
 from twisted.internet.defer import Deferred, gatherResults, maybeDeferred
 from twisted.trial.unittest import TestCase
+from twisted.internet import reactor
 
 from viff.passive import PassiveRuntime
 from viff.runtime import Share, ShareExchanger, ShareExchangerFactory
@@ -127,6 +128,7 @@
         self.close_sentinels = []
 
         self.runtimes = []
+        self.real_runtimes = []
         for id in reversed(range(1, self.num_players+1)):
             _, players = load_config(configs[id])
             self.create_loopback_runtime(id, players)
@@ -164,6 +166,18 @@
         # the Runtime, since we want everybody to wait until all
         # runtimes are ready.
         self.runtimes.append(result)
+        self.real_runtimes.append(runtime)
+        self.i = 0
+
+        # This loop call should ensure the queues of the parties are
+        # processed in a more or less fair manner. This is necessary
+        # because we have only one reactor for all parties here.
+        def loop_call():
+            for runtime in self.real_runtimes[self.i:] + self.real_runtimes[:self.i]:
+                runtime.process_deferred_queue()
+                self.i = (self.i + 1) % len(self.real_runtimes)
+
+        reactor.setLoopCall(loop_call)
 
         for peer_id in players:
             if peer_id != id: