viff

changeset 614:b550d4457d60

Allow sending more than one message per program counter. This is done by using a double-ended queue (a deque) to hold a list of data received or deferreds to be triggered by incoming data. When data arrives the deque is checked to see if there is a matching Deferred that we can trigger. Otherwise the data is appended to the deque. Likewise, when data is expected, the deque is checked for waiting data. If no data is waiting, the Deferred is appended to the deque. Data or Deferreds are appended to the right and popped from the left.
author Martin Geisler <mg@daimi.au.dk>
date Fri, 28 Mar 2008 17:20:22 +0100
parents 7cd68e62366a
children 743ac108951a
files viff/runtime.py
diffstat 1 files changed, 13 insertions(+), 10 deletions(-) [+]
line diff
     1.1 --- a/viff/runtime.py	Fri Mar 28 15:48:12 2008 +0100
     1.2 +++ b/viff/runtime.py	Fri Mar 28 17:20:22 2008 +0100
     1.3 @@ -37,6 +37,7 @@
     1.4  import marshal
     1.5  from optparse import OptionParser, OptionGroup
     1.6  from math import ceil
     1.7 +from collections import deque
     1.8  
     1.9  from viff import shamir
    1.10  from viff.prss import prss
    1.11 @@ -257,11 +258,12 @@
    1.12              program_counter, data_type, data = marshal.loads(string)
    1.13              key = (program_counter, data_type)
    1.14  
    1.15 -            try:
    1.16 -                deferred = self.incoming_data.pop(key)
    1.17 +            deq = self.incoming_data.setdefault(key, deque())
    1.18 +            if deq and isinstance(deq[0], Deferred):
    1.19 +                deferred = deq.popleft()
    1.20                  deferred.callback(data)
    1.21 -            except KeyError:
    1.22 -                self.incoming_data[key] = data
    1.23 +            else:
    1.24 +                deq.append(data)
    1.25  
    1.26              # TODO: marshal.loads can raise EOFError, ValueError, and
    1.27              # TypeError. They should be handled somehow.
    1.28 @@ -532,13 +534,14 @@
    1.29          pc = tuple(self.program_counter)
    1.30          key = (pc, data_type)
    1.31  
    1.32 -        data = self.protocols[peer_id].incoming_data.pop(key, None)
    1.33 -        if data is None:
    1.34 -            # We have not yet received data from the other side.
    1.35 -            self.protocols[peer_id].incoming_data[key] = deferred
    1.36 +        deq = self.protocols[peer_id].incoming_data.setdefault(key, deque())
    1.37 +        if deq and not isinstance(deq[0], Deferred):
    1.38 +            # We have already received some data from the other side.
    1.39 +            data = deq.popleft()
    1.40 +            deferred.callback(data)
    1.41          else:
    1.42 -            # We have already received the data from the other side.
    1.43 -            deferred.callback(data)
    1.44 +            # We have not yet received anything from the other side.
    1.45 +            deq.append(deferred)
    1.46  
    1.47      def _exchange_shares(self, peer_id, field_element):
    1.48          """Exchange shares with another player.