viff

changeset 1181:3171ea9886cb

Seperate incoming data and waiting deferreds.
author Marcel Keller <mkeller@cs.au.dk>
date Wed, 13 May 2009 17:07:00 +0200
parents b44882c6d4f6
children e89fb02c5e3d
files viff/runtime.py
diffstat 1 files changed, 8 insertions(+), 5 deletions(-) [+]
line diff
     1.1 --- a/viff/runtime.py	Wed May 13 16:38:16 2009 +0200
     1.2 +++ b/viff/runtime.py	Wed May 13 17:07:00 2009 +0200
     1.3 @@ -270,6 +270,7 @@
     1.4          self.lost_connection = Deferred()
     1.5          #: Data expected to be received in the future.
     1.6          self.incoming_data = {}
     1.7 +        self.waiting_deferreds = {}
     1.8  
     1.9      def connectionMade(self):
    1.10          self.sendString(str(self.factory.runtime.id))
    1.11 @@ -312,13 +313,14 @@
    1.12  
    1.13                  key = (program_counter, data_type)
    1.14  
    1.15 -                deq = self.incoming_data.setdefault(key, deque())
    1.16 -                if deq and isinstance(deq[0], Deferred):
    1.17 +                if key in self.waiting_deferreds:
    1.18 +                    deq = self.waiting_deferreds[key]
    1.19                      deferred = deq.popleft()
    1.20                      if not deq:
    1.21 -                        del self.incoming_data[key]
    1.22 +                        del self.waiting_deferreds[key]
    1.23                      deferred.callback(data)
    1.24                  else:
    1.25 +                    deq = self.incoming_data.setdefault(key, deque())
    1.26                      deq.append(data)
    1.27              except struct.error, e:
    1.28                  self.factory.runtime.abort(self, e)
    1.29 @@ -637,15 +639,16 @@
    1.30          pc = tuple(self.program_counter)
    1.31          key = (pc, data_type)
    1.32  
    1.33 -        deq = self.protocols[peer_id].incoming_data.setdefault(key, deque())
    1.34 -        if deq and not isinstance(deq[0], Deferred):
    1.35 +        if key in self.protocols[peer_id].incoming_data:
    1.36              # We have already received some data from the other side.
    1.37 +            deq = self.protocols[peer_id].incoming_data[key]
    1.38              data = deq.popleft()
    1.39              if not deq:
    1.40                  del self.protocols[peer_id].incoming_data[key]
    1.41              deferred.callback(data)
    1.42          else:
    1.43              # We have not yet received anything from the other side.
    1.44 +            deq = self.protocols[peer_id].waiting_deferreds.setdefault(key, deque())
    1.45              deq.append(deferred)
    1.46  
    1.47      def _exchange_shares(self, peer_id, field_element):