changeset 1181:3171ea9886cb

Seperate incoming data and waiting deferreds.
author Marcel Keller <mkeller@cs.au.dk>
date Wed, 13 May 2009 17:07:00 +0200
parents b44882c6d4f6
children e89fb02c5e3d
files viff/runtime.py
diffstat 1 files changed, 8 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- a/viff/runtime.py	Wed May 13 16:38:16 2009 +0200
+++ b/viff/runtime.py	Wed May 13 17:07:00 2009 +0200
@@ -270,6 +270,7 @@
         self.lost_connection = Deferred()
         #: Data expected to be received in the future.
         self.incoming_data = {}
+        self.waiting_deferreds = {}
 
     def connectionMade(self):
         self.sendString(str(self.factory.runtime.id))
@@ -312,13 +313,14 @@
 
                 key = (program_counter, data_type)
 
-                deq = self.incoming_data.setdefault(key, deque())
-                if deq and isinstance(deq[0], Deferred):
+                if key in self.waiting_deferreds:
+                    deq = self.waiting_deferreds[key]
                     deferred = deq.popleft()
                     if not deq:
-                        del self.incoming_data[key]
+                        del self.waiting_deferreds[key]
                     deferred.callback(data)
                 else:
+                    deq = self.incoming_data.setdefault(key, deque())
                     deq.append(data)
             except struct.error, e:
                 self.factory.runtime.abort(self, e)
@@ -637,15 +639,16 @@
         pc = tuple(self.program_counter)
         key = (pc, data_type)
 
-        deq = self.protocols[peer_id].incoming_data.setdefault(key, deque())
-        if deq and not isinstance(deq[0], Deferred):
+        if key in self.protocols[peer_id].incoming_data:
             # We have already received some data from the other side.
+            deq = self.protocols[peer_id].incoming_data[key]
             data = deq.popleft()
             if not deq:
                 del self.protocols[peer_id].incoming_data[key]
             deferred.callback(data)
         else:
             # We have not yet received anything from the other side.
+            deq = self.protocols[peer_id].waiting_deferreds.setdefault(key, deque())
             deq.append(deferred)
 
     def _exchange_shares(self, peer_id, field_element):