changeset 1435:6d838b2d24a2

Improved hash based broadcast. It now also works with the VIFF reacktor.
author Janus Dam Nielsen <janus.nielsen@alexandra.dk>
date Tue, 30 Mar 2010 11:34:39 +0200
parents 066a6ff68f7c
children a3cc11ca0ca0
files viff/hash_broadcast.py
diffstat 1 files changed, 22 insertions(+), 34 deletions(-) [+]
line wrap: on
line diff
--- a/viff/hash_broadcast.py	Wed Mar 03 11:24:49 2010 +0100
+++ b/viff/hash_broadcast.py	Tue Mar 30 11:34:39 2010 +0200
@@ -25,6 +25,8 @@
     from sha import sha as sha1
 from viff.constants import TEXT, INCONSISTENTHASH, OK, HASH
 
+from twisted.internet.defer import gatherResults
+
 error_msg = "Player %i, has received an inconsistent hash %s."
 
 class InconsistentHashException(Exception):
@@ -44,53 +46,41 @@
             self.protocols[peer_id].sendData(pc, TEXT, message)
 
     def _receive_broadcast(self, pc, unique_pc, sender, receivers):
-        # The result.
-        result = Deferred()
-        # The message store.
-        message = []
-        # The hash store
-        g_hashes = {}
 
-        def hash_received(h, unique_pc, peer_id, receivers, a_hashes):
-            # Store the hash.
-            a_hashes[peer_id] = h
-            # If we have received a hash from everybody, then compute check them.
-            if len(receivers) == len(a_hashes.keys()):
-                # We check if the hashes we received are equal to
-                # the hash we computed ourselves.
-                s = reduce(lambda x, y: (a_hashes[self.id] == y and x) or INCONSISTENTHASH, [OK] + a_hashes.values())
-                if OK == s:
-                    # Make the result ready.
-                    result.callback(message[0])
-                else:
-                    raise InconsistentHashException(error_msg % (self.id, a_hashes.values()))
-
-        def message_received(m, unique_pc, message, receivers, hashes):
-            # Store the message.
-            message.append(m)
+        def message_received(m, unique_pc, receivers):
             # Compute hash of message.
             h = sha1(m).hexdigest()
-            # Store hash.
-            hashes[self.id] = h
             # Send the hash to all receivers.
             for peer_id in receivers:
                 self.protocols[peer_id].sendData(unique_pc, HASH, str(h))
-
+            return m, h
+                
         # Set up receiver for hashes.
         # Note, we use the unique_pc to avoid data to cross
         # method invocation boundaries.
         for peer_id in receivers:
-            d_hash = Deferred().addCallbacks(hash_received,
-                                             self.error_handler, 
-                                             callbackArgs=(unique_pc, peer_id, receivers, g_hashes))
+            hashes = []
+            d_hash = Deferred()
             self._expect_data_with_pc(unique_pc, peer_id, HASH, d_hash)
+            hashes.append(d_hash)
 
         # Set up receiving of the message.
         d_message = Deferred().addCallbacks(message_received, 
                                             self.error_handler, 
-                                            callbackArgs=(unique_pc, message, receivers, g_hashes))
+                                            callbackArgs=(unique_pc, receivers))
         self._expect_data(sender, TEXT, d_message)
-        return result
+
+	def combine(ls):                      
+            m, h = ls[0]
+            s = reduce(lambda x, y: (h == y and x) or INCONSISTENTHASH, [OK] + ls[1:])
+            if not OK == s:
+		raise InconsistentHashException(error_msg % (self.id, ls))
+            return m
+
+        r = gatherResults([d_message] + hashes)
+        r.addCallback(combine)
+        return r
+
 
 
     def broadcast(self, senders, receivers, message=None):
@@ -110,7 +100,7 @@
         Note: You send implicitly to your self."""
         assert message is None or self.id in senders
 
-        self.program_counter[-1] += 1
+        self.increment_pc()
 
         pc = tuple(self.program_counter)
         if self.id in receivers or self.id in senders:
@@ -133,8 +123,6 @@
             d.callback(message)
             results = [d]
 
-        self.program_counter[-1] += 1
-
         if len(results) == 1:
             return results[0]