changeset 613:7cd68e62366a

Revert to just a single program counter. It turned out that multiple program counters did not really help solve the problem of how to send several things in the same method. At least not without making it much more difficult to write correct code, see Issue 22 for an example of what might go wrong. A simpler and better solution seems to be to allow the players to exchange multiple shares with the *same* program counter. There is no chance of them being mixed up since we use a reliable transport for the connection between players, and so the shares will arrive in the same order as they were sent. The only requirement of the code is then that it makes the calls to sendShare in the same order as the calls to _expect_share. That requirement should be easy to fulfill.
author Martin Geisler <mg@daimi.au.dk>
date Fri, 28 Mar 2008 15:48:12 +0100
parents 57f877cd60f8 c0251598d6a3
children b550d4457d60
files viff/runtime.py
diffstat 2 files changed, 88 insertions(+), 101 deletions(-) [+]
line wrap: on
line diff
--- a/viff/runtime.py	Fri Mar 28 15:30:28 2008 +0100
+++ b/viff/runtime.py	Fri Mar 28 15:48:12 2008 +0100
@@ -222,52 +222,6 @@
         #: deferred data.
         self.incoming_data = {}
 
-        #: Program counter for this connection.
-        #:
-        #: Whenever a share is sent over the network, it must be
-        #: uniquely identified so that the receiving player known what
-        #: operation the share is a result of. This is done by
-        #: associating a X{program counter} with each operation.
-        #:
-        #: Keeping the program counter synchronized between all
-        #: players ought to be easy, but because of the asynchronous
-        #: nature of network protocols, all players might not reach
-        #: the same parts of the program at the same time.
-        #:
-        #: Consider two players M{A} and M{B} who are both waiting on
-        #: the variables C{a} and C{b}. Callbacks have been added to
-        #: C{a} and C{b}, and the question is what program counter the
-        #: callbacks should use when sending data out over the
-        #: network.
-        #:
-        #: Let M{A} receive input for C{a} and then for C{b} a little
-        #: later, and let M{B} receive the inputs in reversed order so
-        #: that the input for C{b} arrives first. The goal is to keep
-        #: the program counters synchronized so that program counter
-        #: M{x} refers to the same operation on all players. Because
-        #: the inputs arrive in different order at different players,
-        #: incrementing a simple global counter is not enough.
-        #:
-        #: Instead, a I{tree} is made, which follows the tree of
-        #: execution. At the top level the program counter starts at
-        #: C{[0]}. At the next operation it becomes C{[1]}, and so on.
-        #: If a callback is scheduled (see
-        #: L{Runtime.schedule_callback}) at program counter C{[x, y,
-        #: z]}, any calls it makes will be numbered C{[x, y, z, 1]},
-        #: then C{[x, y, z, 2]}, and so on.
-        #:
-        #: Maintaining such a tree of program counters ensures that
-        #: different parts of the program execution never reuses the
-        #: same program counter for different variables.
-        #:
-        #: The L{increment_pc} decorator is responsible for
-        #: dynamically building the tree as the execution unfolds and
-        #: L{Runtime.schedule_callback} is responsible for scheduling
-        #: callbacks with the correct program counter.
-        #:
-        #: @type: C{list} of integers.
-        self.program_counter = [0]
-
     def connectionMade(self):
         self.sendString(str(self.factory.runtime.id))
         try:
@@ -312,18 +266,20 @@
             # TODO: marshal.loads can raise EOFError, ValueError, and
             # TypeError. They should be handled somehow.
 
-    def sendData(self, data_type, data):
-        pc = tuple(self.program_counter)
-        send_data = (pc, data_type, data)
+    def sendData(self, program_counter, data_type, data):
+        send_data = (program_counter, data_type, data)
         self.sendString(marshal.dumps(send_data))
 
-    def sendShare(self, share):
+    def sendShare(self, program_counter, share):
         """Send a share.
 
-        The share is marshalled and sent to the peer along with the
-        current program counter for this connection.
+        The program counter and the share are marshalled and sent to
+        the peer.
+
+        @param program_counter: the program counter associated with
+        the share.
         """
-        self.sendData("share", share.value)
+        self.sendData(program_counter, "share", share.value)
 
     def loseConnection(self):
         """Disconnect this protocol instance."""
@@ -362,13 +318,11 @@
     @wrapper(method)
     def inc_pc_wrapper(self, *args, **kwargs):
         try:
-            for protocol in self.protocols.itervalues():
-                protocol.program_counter[-1] += 1
-                protocol.program_counter.append(0)
+            self.program_counter[-1] += 1
+            self.program_counter.append(0)
             return method(self, *args, **kwargs)
         finally:
-            for protocol in self.protocols.itervalues():
-                protocol.program_counter.pop()
+            self.program_counter.pop()
     return inc_pc_wrapper
 
 
@@ -428,6 +382,51 @@
             from twisted.internet import defer
             defer.setDebugging(True)
 
+        #: Current program counter.
+        #:
+        #: Whenever a share is sent over the network, it must be
+        #: uniquely identified so that the receiving player known what
+        #: operation the share is a result of. This is done by
+        #: associating a X{program counter} with each operation.
+        #:
+        #: Keeping the program counter synchronized between all
+        #: players ought to be easy, but because of the asynchronous
+        #: nature of network protocols, all players might not reach
+        #: the same parts of the program at the same time.
+        #:
+        #: Consider two players M{A} and M{B} who are both waiting on
+        #: the variables C{a} and C{b}. Callbacks have been added to
+        #: C{a} and C{b}, and the question is what program counter the
+        #: callbacks should use when sending data out over the
+        #: network.
+        #:
+        #: Let M{A} receive input for C{a} and then for C{b} a little
+        #: later, and let M{B} receive the inputs in reversed order so
+        #: that the input for C{b} arrives first. The goal is to keep
+        #: the program counters synchronized so that program counter
+        #: M{x} refers to the same operation on all players. Because
+        #: the inputs arrive in different order at different players,
+        #: incrementing a simple global counter is not enough.
+        #:
+        #: Instead, a I{tree} is made, which follows the tree of
+        #: execution. At the top level the program counter starts at
+        #: C{[0]}. At the next operation it becomes C{[1]}, and so on.
+        #: If a callback is scheduled (see L{callback}) at program
+        #: counter C{[x, y, z]}, any calls it makes will be numbered
+        #: C{[x, y, z, 1]}, then C{[x, y, z, 2]}, and so on.
+        #:
+        #: Maintaining such a tree of program counters ensures that
+        #: different parts of the program execution never reuses the
+        #: same program counter for different variables.
+        #:
+        #: The L{increment_pc} decorator is responsible for
+        #: dynamically building the tree as the execution unfolds and
+        #: L{callback} is responsible for scheduling callbacks with
+        #: the correct program counter.
+        #:
+        #: @type: C{list} of integers.
+        self.program_counter = [0]
+
         #: Connections to the other players.
         #:
         #: @type: C{dict} from Player ID to L{ShareExchanger} objects.
@@ -504,25 +503,17 @@
         # program counter. Simply decorating callback with increase_pc
         # does not seem to work (the multiplication benchmark hangs).
         # This should be fixed.
-
-        def get_pcs():
-            return [(protocol, protocol.program_counter[:]) for protocol
-                    in self.protocols.itervalues()]
-        def set_pcs(pcs):
-            for protocol, pc in pcs:
-                protocol.program_counter = pc
-
-        saved_pcs = get_pcs()
+        saved_pc = self.program_counter[:]
 
         @wrapper(func)
         def callback_wrapper(*args, **kwargs):
             """Wrapper for a callback which ensures a correct PC."""
             try:
-                current_pcs = get_pcs()
-                set_pcs(saved_pcs)
+                current_pc = self.program_counter
+                self.program_counter = saved_pc
                 return func(*args, **kwargs)
             finally:
-                set_pcs(current_pcs)
+                self.program_counter = current_pc
 
         deferred.addCallback(callback_wrapper, *args, **kwargs)
 
@@ -538,7 +529,7 @@
         assert peer_id != self.id, "Do not expect data from yourself!"
         # Convert self.program_counter to a hashable value in order to
         # use it as a key in self.protocols[peer_id].incoming_data.
-        pc = tuple(self.protocols[peer_id].program_counter)
+        pc = tuple(self.program_counter)
         key = (pc, data_type)
 
         data = self.protocols[peer_id].incoming_data.pop(key, None)
@@ -561,7 +552,8 @@
             return Share(self, field_element.field, field_element)
         else:
             share = self._expect_share(peer_id, field_element.field)
-            self.protocols[peer_id].sendShare(field_element)
+            pc = tuple(self.program_counter)
+            self.protocols[peer_id].sendShare(pc, field_element)
             return share
 
     def _expect_share(self, peer_id, field):
@@ -635,7 +627,8 @@
             # Send share to all receivers.
             for peer_id in receivers:
                 if peer_id != self.id:
-                    self.protocols[peer_id].sendShare(share)
+                    pc = tuple(self.program_counter)
+                    self.protocols[peer_id].sendShare(pc, share)
             # Receive and recombine shares if this player is a receiver.
             if self.id in receivers:
                 deferreds = []
@@ -762,8 +755,7 @@
         n = self.num_players
 
         # Key used for PRSS.
-        key = tuple([tuple(p.program_counter) for p
-                     in self.protocols.itervalues()])
+        key = tuple(self.program_counter)
 
         # The shares for which we have all the keys.
         all_shares = []
@@ -782,9 +774,10 @@
             correction = element - shared
             # if this player is inputter then broadcast correction value
             # TODO: more efficient broadcast?
+            pc = tuple(self.program_counter)
             for peer_id in self.players:
                 if self.id != peer_id:
-                    self.protocols[peer_id].sendShare(correction)
+                    self.protocols[peer_id].sendShare(pc, correction)
 
         # Receive correction value from inputters and compute share.
         result = []
@@ -818,8 +811,7 @@
             modulus = field.modulus
 
         # Key used for PRSS.
-        prss_key = tuple([tuple(p.program_counter) for p
-                          in self.protocols.itervalues()])
+        prss_key = tuple(self.program_counter)
         prfs = self.players[self.id].prfs(modulus)
         share = prss(self.num_players, self.id, field, prfs, prss_key)
 
@@ -872,13 +864,14 @@
         results = []
         for peer_id in inputters:
             if peer_id == self.id:
+                pc = tuple(self.program_counter)
                 shares = shamir.share(field(number), self.threshold,
                                       self.num_players)
                 for other_id, share in shares:
                     if other_id.value == self.id:
                         results.append(Share(self, share.field, share))
                     else:
-                        self.protocols[other_id.value].sendShare(share)
+                        self.protocols[other_id.value].sendShare(pc, share)
             else:
                 results.append(self._expect_share(peer_id, field))
 
@@ -903,8 +896,7 @@
         """
 
         result = Deferred()
-        pc = tuple([tuple(p.program_counter) for p
-                    in self.protocols.itervalues()])
+        pc = tuple(self.program_counter)
         n = self.num_players
         t = self.threshold
 
--- a/viff/test/test_basic_runtime.py	Fri Mar 28 15:30:28 2008 +0100
+++ b/viff/test/test_basic_runtime.py	Fri Mar 28 15:48:12 2008 +0100
@@ -26,14 +26,9 @@
 class ProgramCounterTest(RuntimeTestCase):
     """Program counter tests."""
 
-    def assert_pc(self, runtime, pc):
-        """Assert that all protocols has a given program counter."""
-        for p in runtime.protocols.itervalues():
-            self.assertEquals(p.program_counter, pc)
-
     @protocol
     def test_initial_value(self, runtime):
-        self.assert_pc(runtime, [0])
+        self.assertEquals(runtime.program_counter, [0])
 
     @protocol
     def test_simple_operation(self, runtime):
@@ -42,9 +37,9 @@
         Each call should increment the program counter by one.
         """
         runtime.synchronize()
-        self.assert_pc(runtime, [1])
+        self.assertEquals(runtime.program_counter, [1])
         runtime.synchronize()
-        self.assert_pc(runtime, [2])
+        self.assertEquals(runtime.program_counter, [2])
 
     @protocol
     def test_complex_operation(self, runtime):
@@ -56,9 +51,9 @@
         # Exclusive-or is calculated as x + y - 2 * x * y, so add,
         # sub, and mul are called.
         runtime.xor(self.Zp(0), self.Zp(1))
-        self.assert_pc(runtime, [1])
+        self.assertEquals(runtime.program_counter, [1])
         runtime.xor(self.Zp(0), self.Zp(1))
-        self.assert_pc(runtime, [2])
+        self.assertEquals(runtime.program_counter, [2])
 
     @protocol
     def test_callback(self, runtime):
@@ -69,13 +64,13 @@
         """
 
         def verify_program_counter(_):
-            self.assert_pc(runtime, [0])
+            self.assertEquals(runtime.program_counter, [0])
 
         d = Deferred()
         runtime.schedule_callback(d, verify_program_counter)
 
         runtime.synchronize()
-        self.assert_pc(runtime, [1])
+        self.assertEquals(runtime.program_counter, [1])
 
         # Now trigger verify_program_counter.
         d.callback(None)
@@ -92,26 +87,26 @@
             # First top-level call, so first entry is 1. No calls to
             # other methods decorated with increment_pc has been made,
             # so the second entry is 0.
-            self.assert_pc(runtime, [1, 0])
+            self.assertEquals(runtime.program_counter, [1, 0])
             method_b(runtime, 1)
 
-            self.assert_pc(runtime, [1, 1])
+            self.assertEquals(runtime.program_counter, [1, 1])
             method_b(runtime, 2)
 
             # At this point two sub-calls has been made:
-            self.assert_pc(runtime, [1, 2])
+            self.assertEquals(runtime.program_counter, [1, 2])
 
         @increment_pc
         def method_b(runtime, count):
             # This method is called twice from method_a:
-            self.assert_pc(runtime, [1, count, 0])
+            self.assertEquals(runtime.program_counter, [1, count, 0])
 
         # Zero top-level calls:
-        self.assert_pc(runtime, [0])
+        self.assertEquals(runtime.program_counter, [0])
         method_a(runtime)
 
         # One top-level call:
-        self.assert_pc(runtime, [1])
+        self.assertEquals(runtime.program_counter, [1])
 
     @protocol
     def test_multiple_callbacks(self, runtime):
@@ -120,11 +115,11 @@
         d2 = Deferred()
 
         def verify_program_counter(_, count):
-            self.assert_pc(runtime, [1, count, 0])
+            self.assertEquals(runtime.program_counter, [1, count, 0])
 
         @increment_pc
         def method_a(runtime):
-            self.assert_pc(runtime, [1, 0])
+            self.assertEquals(runtime.program_counter, [1, 0])
 
             runtime.schedule_callback(d1, verify_program_counter, 1)
             runtime.schedule_callback(d2, verify_program_counter, 2)