viff

changeset 613:7cd68e62366a

Revert to just a single program counter. It turned out that multiple program counters did not really help solve the problem of how to send several things in the same method. At least not without making it much more difficult to write correct code, see Issue 22 for an example of what might go wrong. A simpler and better solution seems to be to allow the players to exchange multiple shares with the *same* program counter. There is no chance of them being mixed up since we use a reliable transport for the connection between players, and so the shares will arrive in the same order as they were sent. The only requirement of the code is then that it makes the calls to sendShare in the same order as the calls to _expect_share. That requirement should be easy to fulfill.
author Martin Geisler <mg@daimi.au.dk>
date Fri, 28 Mar 2008 15:48:12 +0100
parents 57f877cd60f8 c0251598d6a3
children b550d4457d60
files viff/runtime.py
diffstat 2 files changed, 88 insertions(+), 101 deletions(-) [+]
line diff
     1.1 --- a/viff/runtime.py	Fri Mar 28 15:30:28 2008 +0100
     1.2 +++ b/viff/runtime.py	Fri Mar 28 15:48:12 2008 +0100
     1.3 @@ -222,52 +222,6 @@
     1.4          #: deferred data.
     1.5          self.incoming_data = {}
     1.6  
     1.7 -        #: Program counter for this connection.
     1.8 -        #:
     1.9 -        #: Whenever a share is sent over the network, it must be
    1.10 -        #: uniquely identified so that the receiving player known what
    1.11 -        #: operation the share is a result of. This is done by
    1.12 -        #: associating a X{program counter} with each operation.
    1.13 -        #:
    1.14 -        #: Keeping the program counter synchronized between all
    1.15 -        #: players ought to be easy, but because of the asynchronous
    1.16 -        #: nature of network protocols, all players might not reach
    1.17 -        #: the same parts of the program at the same time.
    1.18 -        #:
    1.19 -        #: Consider two players M{A} and M{B} who are both waiting on
    1.20 -        #: the variables C{a} and C{b}. Callbacks have been added to
    1.21 -        #: C{a} and C{b}, and the question is what program counter the
    1.22 -        #: callbacks should use when sending data out over the
    1.23 -        #: network.
    1.24 -        #:
    1.25 -        #: Let M{A} receive input for C{a} and then for C{b} a little
    1.26 -        #: later, and let M{B} receive the inputs in reversed order so
    1.27 -        #: that the input for C{b} arrives first. The goal is to keep
    1.28 -        #: the program counters synchronized so that program counter
    1.29 -        #: M{x} refers to the same operation on all players. Because
    1.30 -        #: the inputs arrive in different order at different players,
    1.31 -        #: incrementing a simple global counter is not enough.
    1.32 -        #:
    1.33 -        #: Instead, a I{tree} is made, which follows the tree of
    1.34 -        #: execution. At the top level the program counter starts at
    1.35 -        #: C{[0]}. At the next operation it becomes C{[1]}, and so on.
    1.36 -        #: If a callback is scheduled (see
    1.37 -        #: L{Runtime.schedule_callback}) at program counter C{[x, y,
    1.38 -        #: z]}, any calls it makes will be numbered C{[x, y, z, 1]},
    1.39 -        #: then C{[x, y, z, 2]}, and so on.
    1.40 -        #:
    1.41 -        #: Maintaining such a tree of program counters ensures that
    1.42 -        #: different parts of the program execution never reuses the
    1.43 -        #: same program counter for different variables.
    1.44 -        #:
    1.45 -        #: The L{increment_pc} decorator is responsible for
    1.46 -        #: dynamically building the tree as the execution unfolds and
    1.47 -        #: L{Runtime.schedule_callback} is responsible for scheduling
    1.48 -        #: callbacks with the correct program counter.
    1.49 -        #:
    1.50 -        #: @type: C{list} of integers.
    1.51 -        self.program_counter = [0]
    1.52 -
    1.53      def connectionMade(self):
    1.54          self.sendString(str(self.factory.runtime.id))
    1.55          try:
    1.56 @@ -312,18 +266,20 @@
    1.57              # TODO: marshal.loads can raise EOFError, ValueError, and
    1.58              # TypeError. They should be handled somehow.
    1.59  
    1.60 -    def sendData(self, data_type, data):
    1.61 -        pc = tuple(self.program_counter)
    1.62 -        send_data = (pc, data_type, data)
    1.63 +    def sendData(self, program_counter, data_type, data):
    1.64 +        send_data = (program_counter, data_type, data)
    1.65          self.sendString(marshal.dumps(send_data))
    1.66  
    1.67 -    def sendShare(self, share):
    1.68 +    def sendShare(self, program_counter, share):
    1.69          """Send a share.
    1.70  
    1.71 -        The share is marshalled and sent to the peer along with the
    1.72 -        current program counter for this connection.
    1.73 +        The program counter and the share are marshalled and sent to
    1.74 +        the peer.
    1.75 +
    1.76 +        @param program_counter: the program counter associated with
    1.77 +        the share.
    1.78          """
    1.79 -        self.sendData("share", share.value)
    1.80 +        self.sendData(program_counter, "share", share.value)
    1.81  
    1.82      def loseConnection(self):
    1.83          """Disconnect this protocol instance."""
    1.84 @@ -362,13 +318,11 @@
    1.85      @wrapper(method)
    1.86      def inc_pc_wrapper(self, *args, **kwargs):
    1.87          try:
    1.88 -            for protocol in self.protocols.itervalues():
    1.89 -                protocol.program_counter[-1] += 1
    1.90 -                protocol.program_counter.append(0)
    1.91 +            self.program_counter[-1] += 1
    1.92 +            self.program_counter.append(0)
    1.93              return method(self, *args, **kwargs)
    1.94          finally:
    1.95 -            for protocol in self.protocols.itervalues():
    1.96 -                protocol.program_counter.pop()
    1.97 +            self.program_counter.pop()
    1.98      return inc_pc_wrapper
    1.99  
   1.100  
   1.101 @@ -428,6 +382,51 @@
   1.102              from twisted.internet import defer
   1.103              defer.setDebugging(True)
   1.104  
   1.105 +        #: Current program counter.
   1.106 +        #:
   1.107 +        #: Whenever a share is sent over the network, it must be
   1.108 +        #: uniquely identified so that the receiving player known what
   1.109 +        #: operation the share is a result of. This is done by
   1.110 +        #: associating a X{program counter} with each operation.
   1.111 +        #:
   1.112 +        #: Keeping the program counter synchronized between all
   1.113 +        #: players ought to be easy, but because of the asynchronous
   1.114 +        #: nature of network protocols, all players might not reach
   1.115 +        #: the same parts of the program at the same time.
   1.116 +        #:
   1.117 +        #: Consider two players M{A} and M{B} who are both waiting on
   1.118 +        #: the variables C{a} and C{b}. Callbacks have been added to
   1.119 +        #: C{a} and C{b}, and the question is what program counter the
   1.120 +        #: callbacks should use when sending data out over the
   1.121 +        #: network.
   1.122 +        #:
   1.123 +        #: Let M{A} receive input for C{a} and then for C{b} a little
   1.124 +        #: later, and let M{B} receive the inputs in reversed order so
   1.125 +        #: that the input for C{b} arrives first. The goal is to keep
   1.126 +        #: the program counters synchronized so that program counter
   1.127 +        #: M{x} refers to the same operation on all players. Because
   1.128 +        #: the inputs arrive in different order at different players,
   1.129 +        #: incrementing a simple global counter is not enough.
   1.130 +        #:
   1.131 +        #: Instead, a I{tree} is made, which follows the tree of
   1.132 +        #: execution. At the top level the program counter starts at
   1.133 +        #: C{[0]}. At the next operation it becomes C{[1]}, and so on.
   1.134 +        #: If a callback is scheduled (see L{callback}) at program
   1.135 +        #: counter C{[x, y, z]}, any calls it makes will be numbered
   1.136 +        #: C{[x, y, z, 1]}, then C{[x, y, z, 2]}, and so on.
   1.137 +        #:
   1.138 +        #: Maintaining such a tree of program counters ensures that
   1.139 +        #: different parts of the program execution never reuses the
   1.140 +        #: same program counter for different variables.
   1.141 +        #:
   1.142 +        #: The L{increment_pc} decorator is responsible for
   1.143 +        #: dynamically building the tree as the execution unfolds and
   1.144 +        #: L{callback} is responsible for scheduling callbacks with
   1.145 +        #: the correct program counter.
   1.146 +        #:
   1.147 +        #: @type: C{list} of integers.
   1.148 +        self.program_counter = [0]
   1.149 +
   1.150          #: Connections to the other players.
   1.151          #:
   1.152          #: @type: C{dict} from Player ID to L{ShareExchanger} objects.
   1.153 @@ -504,25 +503,17 @@
   1.154          # program counter. Simply decorating callback with increase_pc
   1.155          # does not seem to work (the multiplication benchmark hangs).
   1.156          # This should be fixed.
   1.157 -
   1.158 -        def get_pcs():
   1.159 -            return [(protocol, protocol.program_counter[:]) for protocol
   1.160 -                    in self.protocols.itervalues()]
   1.161 -        def set_pcs(pcs):
   1.162 -            for protocol, pc in pcs:
   1.163 -                protocol.program_counter = pc
   1.164 -
   1.165 -        saved_pcs = get_pcs()
   1.166 +        saved_pc = self.program_counter[:]
   1.167  
   1.168          @wrapper(func)
   1.169          def callback_wrapper(*args, **kwargs):
   1.170              """Wrapper for a callback which ensures a correct PC."""
   1.171              try:
   1.172 -                current_pcs = get_pcs()
   1.173 -                set_pcs(saved_pcs)
   1.174 +                current_pc = self.program_counter
   1.175 +                self.program_counter = saved_pc
   1.176                  return func(*args, **kwargs)
   1.177              finally:
   1.178 -                set_pcs(current_pcs)
   1.179 +                self.program_counter = current_pc
   1.180  
   1.181          deferred.addCallback(callback_wrapper, *args, **kwargs)
   1.182  
   1.183 @@ -538,7 +529,7 @@
   1.184          assert peer_id != self.id, "Do not expect data from yourself!"
   1.185          # Convert self.program_counter to a hashable value in order to
   1.186          # use it as a key in self.protocols[peer_id].incoming_data.
   1.187 -        pc = tuple(self.protocols[peer_id].program_counter)
   1.188 +        pc = tuple(self.program_counter)
   1.189          key = (pc, data_type)
   1.190  
   1.191          data = self.protocols[peer_id].incoming_data.pop(key, None)
   1.192 @@ -561,7 +552,8 @@
   1.193              return Share(self, field_element.field, field_element)
   1.194          else:
   1.195              share = self._expect_share(peer_id, field_element.field)
   1.196 -            self.protocols[peer_id].sendShare(field_element)
   1.197 +            pc = tuple(self.program_counter)
   1.198 +            self.protocols[peer_id].sendShare(pc, field_element)
   1.199              return share
   1.200  
   1.201      def _expect_share(self, peer_id, field):
   1.202 @@ -635,7 +627,8 @@
   1.203              # Send share to all receivers.
   1.204              for peer_id in receivers:
   1.205                  if peer_id != self.id:
   1.206 -                    self.protocols[peer_id].sendShare(share)
   1.207 +                    pc = tuple(self.program_counter)
   1.208 +                    self.protocols[peer_id].sendShare(pc, share)
   1.209              # Receive and recombine shares if this player is a receiver.
   1.210              if self.id in receivers:
   1.211                  deferreds = []
   1.212 @@ -762,8 +755,7 @@
   1.213          n = self.num_players
   1.214  
   1.215          # Key used for PRSS.
   1.216 -        key = tuple([tuple(p.program_counter) for p
   1.217 -                     in self.protocols.itervalues()])
   1.218 +        key = tuple(self.program_counter)
   1.219  
   1.220          # The shares for which we have all the keys.
   1.221          all_shares = []
   1.222 @@ -782,9 +774,10 @@
   1.223              correction = element - shared
   1.224              # if this player is inputter then broadcast correction value
   1.225              # TODO: more efficient broadcast?
   1.226 +            pc = tuple(self.program_counter)
   1.227              for peer_id in self.players:
   1.228                  if self.id != peer_id:
   1.229 -                    self.protocols[peer_id].sendShare(correction)
   1.230 +                    self.protocols[peer_id].sendShare(pc, correction)
   1.231  
   1.232          # Receive correction value from inputters and compute share.
   1.233          result = []
   1.234 @@ -818,8 +811,7 @@
   1.235              modulus = field.modulus
   1.236  
   1.237          # Key used for PRSS.
   1.238 -        prss_key = tuple([tuple(p.program_counter) for p
   1.239 -                          in self.protocols.itervalues()])
   1.240 +        prss_key = tuple(self.program_counter)
   1.241          prfs = self.players[self.id].prfs(modulus)
   1.242          share = prss(self.num_players, self.id, field, prfs, prss_key)
   1.243  
   1.244 @@ -872,13 +864,14 @@
   1.245          results = []
   1.246          for peer_id in inputters:
   1.247              if peer_id == self.id:
   1.248 +                pc = tuple(self.program_counter)
   1.249                  shares = shamir.share(field(number), self.threshold,
   1.250                                        self.num_players)
   1.251                  for other_id, share in shares:
   1.252                      if other_id.value == self.id:
   1.253                          results.append(Share(self, share.field, share))
   1.254                      else:
   1.255 -                        self.protocols[other_id.value].sendShare(share)
   1.256 +                        self.protocols[other_id.value].sendShare(pc, share)
   1.257              else:
   1.258                  results.append(self._expect_share(peer_id, field))
   1.259  
   1.260 @@ -903,8 +896,7 @@
   1.261          """
   1.262  
   1.263          result = Deferred()
   1.264 -        pc = tuple([tuple(p.program_counter) for p
   1.265 -                    in self.protocols.itervalues()])
   1.266 +        pc = tuple(self.program_counter)
   1.267          n = self.num_players
   1.268          t = self.threshold
   1.269  
     2.1 --- a/viff/test/test_basic_runtime.py	Fri Mar 28 15:30:28 2008 +0100
     2.2 +++ b/viff/test/test_basic_runtime.py	Fri Mar 28 15:48:12 2008 +0100
     2.3 @@ -26,14 +26,9 @@
     2.4  class ProgramCounterTest(RuntimeTestCase):
     2.5      """Program counter tests."""
     2.6  
     2.7 -    def assert_pc(self, runtime, pc):
     2.8 -        """Assert that all protocols has a given program counter."""
     2.9 -        for p in runtime.protocols.itervalues():
    2.10 -            self.assertEquals(p.program_counter, pc)
    2.11 -
    2.12      @protocol
    2.13      def test_initial_value(self, runtime):
    2.14 -        self.assert_pc(runtime, [0])
    2.15 +        self.assertEquals(runtime.program_counter, [0])
    2.16  
    2.17      @protocol
    2.18      def test_simple_operation(self, runtime):
    2.19 @@ -42,9 +37,9 @@
    2.20          Each call should increment the program counter by one.
    2.21          """
    2.22          runtime.synchronize()
    2.23 -        self.assert_pc(runtime, [1])
    2.24 +        self.assertEquals(runtime.program_counter, [1])
    2.25          runtime.synchronize()
    2.26 -        self.assert_pc(runtime, [2])
    2.27 +        self.assertEquals(runtime.program_counter, [2])
    2.28  
    2.29      @protocol
    2.30      def test_complex_operation(self, runtime):
    2.31 @@ -56,9 +51,9 @@
    2.32          # Exclusive-or is calculated as x + y - 2 * x * y, so add,
    2.33          # sub, and mul are called.
    2.34          runtime.xor(self.Zp(0), self.Zp(1))
    2.35 -        self.assert_pc(runtime, [1])
    2.36 +        self.assertEquals(runtime.program_counter, [1])
    2.37          runtime.xor(self.Zp(0), self.Zp(1))
    2.38 -        self.assert_pc(runtime, [2])
    2.39 +        self.assertEquals(runtime.program_counter, [2])
    2.40  
    2.41      @protocol
    2.42      def test_callback(self, runtime):
    2.43 @@ -69,13 +64,13 @@
    2.44          """
    2.45  
    2.46          def verify_program_counter(_):
    2.47 -            self.assert_pc(runtime, [0])
    2.48 +            self.assertEquals(runtime.program_counter, [0])
    2.49  
    2.50          d = Deferred()
    2.51          runtime.schedule_callback(d, verify_program_counter)
    2.52  
    2.53          runtime.synchronize()
    2.54 -        self.assert_pc(runtime, [1])
    2.55 +        self.assertEquals(runtime.program_counter, [1])
    2.56  
    2.57          # Now trigger verify_program_counter.
    2.58          d.callback(None)
    2.59 @@ -92,26 +87,26 @@
    2.60              # First top-level call, so first entry is 1. No calls to
    2.61              # other methods decorated with increment_pc has been made,
    2.62              # so the second entry is 0.
    2.63 -            self.assert_pc(runtime, [1, 0])
    2.64 +            self.assertEquals(runtime.program_counter, [1, 0])
    2.65              method_b(runtime, 1)
    2.66  
    2.67 -            self.assert_pc(runtime, [1, 1])
    2.68 +            self.assertEquals(runtime.program_counter, [1, 1])
    2.69              method_b(runtime, 2)
    2.70  
    2.71              # At this point two sub-calls has been made:
    2.72 -            self.assert_pc(runtime, [1, 2])
    2.73 +            self.assertEquals(runtime.program_counter, [1, 2])
    2.74  
    2.75          @increment_pc
    2.76          def method_b(runtime, count):
    2.77              # This method is called twice from method_a:
    2.78 -            self.assert_pc(runtime, [1, count, 0])
    2.79 +            self.assertEquals(runtime.program_counter, [1, count, 0])
    2.80  
    2.81          # Zero top-level calls:
    2.82 -        self.assert_pc(runtime, [0])
    2.83 +        self.assertEquals(runtime.program_counter, [0])
    2.84          method_a(runtime)
    2.85  
    2.86          # One top-level call:
    2.87 -        self.assert_pc(runtime, [1])
    2.88 +        self.assertEquals(runtime.program_counter, [1])
    2.89  
    2.90      @protocol
    2.91      def test_multiple_callbacks(self, runtime):
    2.92 @@ -120,11 +115,11 @@
    2.93          d2 = Deferred()
    2.94  
    2.95          def verify_program_counter(_, count):
    2.96 -            self.assert_pc(runtime, [1, count, 0])
    2.97 +            self.assertEquals(runtime.program_counter, [1, count, 0])
    2.98  
    2.99          @increment_pc
   2.100          def method_a(runtime):
   2.101 -            self.assert_pc(runtime, [1, 0])
   2.102 +            self.assertEquals(runtime.program_counter, [1, 0])
   2.103  
   2.104              runtime.schedule_callback(d1, verify_program_counter, 1)
   2.105              runtime.schedule_callback(d2, verify_program_counter, 2)