viff

changeset 1435:6d838b2d24a2

Improved hash based broadcast. It now also works with the VIFF reacktor.
author Janus Dam Nielsen <janus.nielsen@alexandra.dk>
date Tue, 30 Mar 2010 11:34:39 +0200
parents 066a6ff68f7c
children a3cc11ca0ca0
files viff/hash_broadcast.py
diffstat 1 files changed, 22 insertions(+), 34 deletions(-) [+]
line diff
     1.1 --- a/viff/hash_broadcast.py	Wed Mar 03 11:24:49 2010 +0100
     1.2 +++ b/viff/hash_broadcast.py	Tue Mar 30 11:34:39 2010 +0200
     1.3 @@ -25,6 +25,8 @@
     1.4      from sha import sha as sha1
     1.5  from viff.constants import TEXT, INCONSISTENTHASH, OK, HASH
     1.6  
     1.7 +from twisted.internet.defer import gatherResults
     1.8 +
     1.9  error_msg = "Player %i, has received an inconsistent hash %s."
    1.10  
    1.11  class InconsistentHashException(Exception):
    1.12 @@ -44,53 +46,41 @@
    1.13              self.protocols[peer_id].sendData(pc, TEXT, message)
    1.14  
    1.15      def _receive_broadcast(self, pc, unique_pc, sender, receivers):
    1.16 -        # The result.
    1.17 -        result = Deferred()
    1.18 -        # The message store.
    1.19 -        message = []
    1.20 -        # The hash store
    1.21 -        g_hashes = {}
    1.22  
    1.23 -        def hash_received(h, unique_pc, peer_id, receivers, a_hashes):
    1.24 -            # Store the hash.
    1.25 -            a_hashes[peer_id] = h
    1.26 -            # If we have received a hash from everybody, then compute check them.
    1.27 -            if len(receivers) == len(a_hashes.keys()):
    1.28 -                # We check if the hashes we received are equal to
    1.29 -                # the hash we computed ourselves.
    1.30 -                s = reduce(lambda x, y: (a_hashes[self.id] == y and x) or INCONSISTENTHASH, [OK] + a_hashes.values())
    1.31 -                if OK == s:
    1.32 -                    # Make the result ready.
    1.33 -                    result.callback(message[0])
    1.34 -                else:
    1.35 -                    raise InconsistentHashException(error_msg % (self.id, a_hashes.values()))
    1.36 -
    1.37 -        def message_received(m, unique_pc, message, receivers, hashes):
    1.38 -            # Store the message.
    1.39 -            message.append(m)
    1.40 +        def message_received(m, unique_pc, receivers):
    1.41              # Compute hash of message.
    1.42              h = sha1(m).hexdigest()
    1.43 -            # Store hash.
    1.44 -            hashes[self.id] = h
    1.45              # Send the hash to all receivers.
    1.46              for peer_id in receivers:
    1.47                  self.protocols[peer_id].sendData(unique_pc, HASH, str(h))
    1.48 -
    1.49 +            return m, h
    1.50 +                
    1.51          # Set up receiver for hashes.
    1.52          # Note, we use the unique_pc to avoid data to cross
    1.53          # method invocation boundaries.
    1.54          for peer_id in receivers:
    1.55 -            d_hash = Deferred().addCallbacks(hash_received,
    1.56 -                                             self.error_handler, 
    1.57 -                                             callbackArgs=(unique_pc, peer_id, receivers, g_hashes))
    1.58 +            hashes = []
    1.59 +            d_hash = Deferred()
    1.60              self._expect_data_with_pc(unique_pc, peer_id, HASH, d_hash)
    1.61 +            hashes.append(d_hash)
    1.62  
    1.63          # Set up receiving of the message.
    1.64          d_message = Deferred().addCallbacks(message_received, 
    1.65                                              self.error_handler, 
    1.66 -                                            callbackArgs=(unique_pc, message, receivers, g_hashes))
    1.67 +                                            callbackArgs=(unique_pc, receivers))
    1.68          self._expect_data(sender, TEXT, d_message)
    1.69 -        return result
    1.70 +
    1.71 +	def combine(ls):                      
    1.72 +            m, h = ls[0]
    1.73 +            s = reduce(lambda x, y: (h == y and x) or INCONSISTENTHASH, [OK] + ls[1:])
    1.74 +            if not OK == s:
    1.75 +		raise InconsistentHashException(error_msg % (self.id, ls))
    1.76 +            return m
    1.77 +
    1.78 +        r = gatherResults([d_message] + hashes)
    1.79 +        r.addCallback(combine)
    1.80 +        return r
    1.81 +
    1.82  
    1.83  
    1.84      def broadcast(self, senders, receivers, message=None):
    1.85 @@ -110,7 +100,7 @@
    1.86          Note: You send implicitly to your self."""
    1.87          assert message is None or self.id in senders
    1.88  
    1.89 -        self.program_counter[-1] += 1
    1.90 +        self.increment_pc()
    1.91  
    1.92          pc = tuple(self.program_counter)
    1.93          if self.id in receivers or self.id in senders:
    1.94 @@ -133,8 +123,6 @@
    1.95              d.callback(message)
    1.96              results = [d]
    1.97  
    1.98 -        self.program_counter[-1] += 1
    1.99 -
   1.100          if len(results) == 1:
   1.101              return results[0]
   1.102