viff

changeset 1218:c43b0e520aba

It is now possible to send data to your self. The sendData and _expect_data primitives now accepts and transmits data send from a player to himself.
author Janus Dam Nielsen <janus.nielsen@alexandra.dk>
date Tue, 06 Oct 2009 10:04:20 +0200
parents 80125f56beaa
children 8a5eb6550111
files viff/runtime.py viff/test/test_runtime.py viff/test/util.py
diffstat 3 files changed, 116 insertions(+), 9 deletions(-) [+]
line diff
     1.1 --- a/viff/runtime.py	Fri Sep 18 19:31:34 2009 +0200
     1.2 +++ b/viff/runtime.py	Tue Oct 06 10:04:20 2009 +0200
     1.3 @@ -382,6 +382,65 @@
     1.4          """Disconnect this protocol instance."""
     1.5          self.transport.loseConnection()
     1.6  
     1.7 +class SelfShareExchanger(ShareExchanger):
     1.8 +
     1.9 +    def __init__(self, id, factory):
    1.10 +        ShareExchanger.__init__(self)
    1.11 +        self.peer_id = id
    1.12 +        self.factory = factory
    1.13 +
    1.14 +    def stringReceived(self, program_counter, data_type, data):
    1.15 +        """Called when a share is received.
    1.16 +
    1.17 +        The string received is unpacked into the program counter, and
    1.18 +        a data part. The data is passed the appropriate Deferred in
    1.19 +        :class:`self.incoming_data`.
    1.20 +        """
    1.21 +        try:
    1.22 +            key = (program_counter, data_type)
    1.23 +            
    1.24 +            if key in self.waiting_deferreds:
    1.25 +                deq = self.waiting_deferreds[key]
    1.26 +                deferred = deq.popleft()
    1.27 +                if not deq:
    1.28 +                    del self.waiting_deferreds[key]
    1.29 +                self.factory.runtime.handle_deferred_data(deferred, data)
    1.30 +            else:
    1.31 +                deq = self.incoming_data.setdefault(key, deque())
    1.32 +                deq.append(data)
    1.33 +        except struct.error, e:
    1.34 +            self.factory.runtime.abort(self, e)
    1.35 +
    1.36 +    def sendData(self, program_counter, data_type, data):
    1.37 +        """Send data to the self.id."""
    1.38 +        self.stringReceived(program_counter, data_type, data)
    1.39 +
    1.40 +    def loseConnection(self):
    1.41 +        """Disconnect this protocol instance."""
    1.42 +        self.lost_connection.callback(self)
    1.43 +        return None
    1.44 +
    1.45 +
    1.46 +class SelfShareExchangerFactory(ReconnectingClientFactory, ServerFactory):
    1.47 +    """Factory for creating SelfShareExchanger protocols."""
    1.48 +
    1.49 +    protocol = SelfShareExchanger
    1.50 +    maxDelay = 3
    1.51 +    factor = 1.234567 # About half of the Twisted default
    1.52 +
    1.53 +    def __init__(self, runtime):
    1.54 +        """Initialize the factory."""
    1.55 +        self.runtime = runtime
    1.56 +
    1.57 +    def identify_peer(self, protocol):
    1.58 +        raise Exception("Is identify_peer necessary?")
    1.59 +
    1.60 +    def clientConnectionLost(self, connector, reason):
    1.61 +        reason.trap(ConnectionDone)
    1.62 +
    1.63 +class FakeTransport(object):
    1.64 +    def close(self):
    1.65 +        return True
    1.66  
    1.67  class ShareExchangerFactory(ReconnectingClientFactory, ServerFactory):
    1.68      """Factory for creating ShareExchanger protocols."""
    1.69 @@ -535,7 +594,9 @@
    1.70          self.players = {}
    1.71          # Add ourselves, but with no protocol since we wont be
    1.72          # communicating with ourselves.
    1.73 -        self.add_player(player, None)
    1.74 +        protocol = SelfShareExchanger(self.id, SelfShareExchangerFactory(self))
    1.75 +        protocol.transport = FakeTransport()
    1.76 +        self.add_player(player, protocol)
    1.77  
    1.78          #: Queue of deferreds and data.
    1.79          self.deferred_queue = deque()
    1.80 @@ -551,9 +612,7 @@
    1.81      def add_player(self, player, protocol):
    1.82          self.players[player.id] = player
    1.83          self.num_players = len(self.players)
    1.84 -        # There is no protocol for ourselves, so we wont add that:
    1.85 -        if protocol is not None:
    1.86 -            self.protocols[player.id] = protocol
    1.87 +        self.protocols[player.id] = protocol
    1.88  
    1.89      def shutdown(self):
    1.90          """Shutdown the runtime.
    1.91 @@ -670,7 +729,6 @@
    1.92          return result
    1.93  
    1.94      def _expect_data(self, peer_id, data_type, deferred):
    1.95 -        assert peer_id != self.id, "Do not expect data from yourself!"
    1.96          # Convert self.program_counter to a hashable value in order to
    1.97          # use it as a key in self.protocols[peer_id].incoming_data.
    1.98          pc = tuple(self.program_counter)
     2.1 --- a/viff/test/test_runtime.py	Fri Sep 18 19:31:34 2009 +0200
     2.2 +++ b/viff/test/test_runtime.py	Tue Oct 06 10:04:20 2009 +0200
     2.3 @@ -27,10 +27,10 @@
     2.4  from random import Random
     2.5  import operator
     2.6  
     2.7 -from twisted.internet.defer import gatherResults
     2.8 +from twisted.internet.defer import gatherResults, Deferred, DeferredList
     2.9  
    2.10  from viff.field import GF256
    2.11 -from viff.runtime import Share
    2.12 +from viff.runtime import Share, SHARE
    2.13  from viff.comparison import Toft05Runtime
    2.14  from viff.test.util import RuntimeTestCase, BinaryOperatorTestCase, protocol
    2.15  
    2.16 @@ -191,6 +191,49 @@
    2.17  
    2.18          return gatherResults([opened_a, opened_b, opened_c])
    2.19  
    2.20 +    @protocol
    2.21 +    def test_send_receive_self(self, runtime):
    2.22 +        """Test send and receive of values."""
    2.23 +        value = 42
    2.24 +        
    2.25 +        pc = tuple(runtime.program_counter)
    2.26 +        runtime.protocols[runtime.id].sendData(pc, SHARE, str(value))
    2.27 +
    2.28 +        d = Deferred()
    2.29 +        runtime._expect_data(runtime.id, SHARE, d)
    2.30 +        def check(x):
    2.31 +            self.assertEquals(int(x), value)
    2.32 +            return x
    2.33 +        d.addCallback(check)
    2.34 +        return d
    2.35 +
    2.36 +    @protocol
    2.37 +    def test_send_receive_self2(self, runtime):
    2.38 +        """Test send and receive of values."""
    2.39 +        value = 42
    2.40 +        
    2.41 +        pc = tuple(runtime.program_counter)
    2.42 +        for peer_id in runtime.players:
    2.43 +            data = str(value)
    2.44 +            runtime.protocols[peer_id].sendData(pc, SHARE, data)
    2.45 +
    2.46 +        ds = []
    2.47 +        for peer_id in runtime.players:
    2.48 +            d = Deferred()
    2.49 +            runtime._expect_data(peer_id, SHARE, d)
    2.50 +            ds.append(d)
    2.51 +
    2.52 +        dls = DeferredList(ds)
    2.53 +        def check(ls):
    2.54 +            for s, x in ls:
    2.55 +                self.assertEquals(int(x), value)
    2.56 +            return ls
    2.57 +
    2.58 +        dls.addCallback(check)
    2.59 +        return dls
    2.60 +
    2.61 +
    2.62 +
    2.63  
    2.64  class ConvertBitShareTest(RuntimeTestCase):
    2.65      runtime_class = Toft05Runtime
     3.1 --- a/viff/test/util.py	Fri Sep 18 19:31:34 2009 +0200
     3.2 +++ b/viff/test/util.py	Tue Oct 06 10:04:20 2009 +0200
     3.3 @@ -22,7 +22,7 @@
     3.4  from twisted.internet import reactor
     3.5  
     3.6  from viff.passive import PassiveRuntime
     3.7 -from viff.runtime import Share, ShareExchanger, ShareExchangerFactory
     3.8 +from viff.runtime import Share, ShareExchanger, ShareExchangerFactory, SelfShareExchanger, SelfShareExchangerFactory, FakeTransport
     3.9  from viff.field import GF
    3.10  from viff.config import generate_configs, load_config
    3.11  from viff.util import rand
    3.12 @@ -220,7 +220,13 @@
    3.13                      # fire.
    3.14                      sentinel = loopbackAsync(server, client)
    3.15                      self.close_sentinels.append(sentinel)
    3.16 -
    3.17 +            else:
    3.18 +                protocol = SelfShareExchanger(id, SelfShareExchangerFactory(runtime))
    3.19 +                protocol.transport = FakeTransport()
    3.20 +                # Keys for when we are the client and when we are the server.
    3.21 +                server_key = (id, id)
    3.22 +                # Store a protocol used when we are the server.
    3.23 +                self.protocols[server_key] = protocol     
    3.24  
    3.25  class BinaryOperatorTestCase:
    3.26      """Test a binary operator.