changeset 1218:c43b0e520aba

It is now possible to send data to your self. The sendData and _expect_data primitives now accepts and transmits data send from a player to himself.
author Janus Dam Nielsen <janus.nielsen@alexandra.dk>
date Tue, 06 Oct 2009 10:04:20 +0200
parents 80125f56beaa
children 8a5eb6550111
files viff/runtime.py viff/test/test_runtime.py viff/test/util.py
diffstat 3 files changed, 116 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
--- a/viff/runtime.py	Fri Sep 18 19:31:34 2009 +0200
+++ b/viff/runtime.py	Tue Oct 06 10:04:20 2009 +0200
@@ -382,6 +382,65 @@
         """Disconnect this protocol instance."""
         self.transport.loseConnection()
 
+class SelfShareExchanger(ShareExchanger):
+
+    def __init__(self, id, factory):
+        ShareExchanger.__init__(self)
+        self.peer_id = id
+        self.factory = factory
+
+    def stringReceived(self, program_counter, data_type, data):
+        """Called when a share is received.
+
+        The string received is unpacked into the program counter, and
+        a data part. The data is passed the appropriate Deferred in
+        :class:`self.incoming_data`.
+        """
+        try:
+            key = (program_counter, data_type)
+            
+            if key in self.waiting_deferreds:
+                deq = self.waiting_deferreds[key]
+                deferred = deq.popleft()
+                if not deq:
+                    del self.waiting_deferreds[key]
+                self.factory.runtime.handle_deferred_data(deferred, data)
+            else:
+                deq = self.incoming_data.setdefault(key, deque())
+                deq.append(data)
+        except struct.error, e:
+            self.factory.runtime.abort(self, e)
+
+    def sendData(self, program_counter, data_type, data):
+        """Send data to the self.id."""
+        self.stringReceived(program_counter, data_type, data)
+
+    def loseConnection(self):
+        """Disconnect this protocol instance."""
+        self.lost_connection.callback(self)
+        return None
+
+
+class SelfShareExchangerFactory(ReconnectingClientFactory, ServerFactory):
+    """Factory for creating SelfShareExchanger protocols."""
+
+    protocol = SelfShareExchanger
+    maxDelay = 3
+    factor = 1.234567 # About half of the Twisted default
+
+    def __init__(self, runtime):
+        """Initialize the factory."""
+        self.runtime = runtime
+
+    def identify_peer(self, protocol):
+        raise Exception("Is identify_peer necessary?")
+
+    def clientConnectionLost(self, connector, reason):
+        reason.trap(ConnectionDone)
+
+class FakeTransport(object):
+    def close(self):
+        return True
 
 class ShareExchangerFactory(ReconnectingClientFactory, ServerFactory):
     """Factory for creating ShareExchanger protocols."""
@@ -535,7 +594,9 @@
         self.players = {}
         # Add ourselves, but with no protocol since we wont be
         # communicating with ourselves.
-        self.add_player(player, None)
+        protocol = SelfShareExchanger(self.id, SelfShareExchangerFactory(self))
+        protocol.transport = FakeTransport()
+        self.add_player(player, protocol)
 
         #: Queue of deferreds and data.
         self.deferred_queue = deque()
@@ -551,9 +612,7 @@
     def add_player(self, player, protocol):
         self.players[player.id] = player
         self.num_players = len(self.players)
-        # There is no protocol for ourselves, so we wont add that:
-        if protocol is not None:
-            self.protocols[player.id] = protocol
+        self.protocols[player.id] = protocol
 
     def shutdown(self):
         """Shutdown the runtime.
@@ -670,7 +729,6 @@
         return result
 
     def _expect_data(self, peer_id, data_type, deferred):
-        assert peer_id != self.id, "Do not expect data from yourself!"
         # Convert self.program_counter to a hashable value in order to
         # use it as a key in self.protocols[peer_id].incoming_data.
         pc = tuple(self.program_counter)
--- a/viff/test/test_runtime.py	Fri Sep 18 19:31:34 2009 +0200
+++ b/viff/test/test_runtime.py	Tue Oct 06 10:04:20 2009 +0200
@@ -27,10 +27,10 @@
 from random import Random
 import operator
 
-from twisted.internet.defer import gatherResults
+from twisted.internet.defer import gatherResults, Deferred, DeferredList
 
 from viff.field import GF256
-from viff.runtime import Share
+from viff.runtime import Share, SHARE
 from viff.comparison import Toft05Runtime
 from viff.test.util import RuntimeTestCase, BinaryOperatorTestCase, protocol
 
@@ -191,6 +191,49 @@
 
         return gatherResults([opened_a, opened_b, opened_c])
 
+    @protocol
+    def test_send_receive_self(self, runtime):
+        """Test send and receive of values."""
+        value = 42
+        
+        pc = tuple(runtime.program_counter)
+        runtime.protocols[runtime.id].sendData(pc, SHARE, str(value))
+
+        d = Deferred()
+        runtime._expect_data(runtime.id, SHARE, d)
+        def check(x):
+            self.assertEquals(int(x), value)
+            return x
+        d.addCallback(check)
+        return d
+
+    @protocol
+    def test_send_receive_self2(self, runtime):
+        """Test send and receive of values."""
+        value = 42
+        
+        pc = tuple(runtime.program_counter)
+        for peer_id in runtime.players:
+            data = str(value)
+            runtime.protocols[peer_id].sendData(pc, SHARE, data)
+
+        ds = []
+        for peer_id in runtime.players:
+            d = Deferred()
+            runtime._expect_data(peer_id, SHARE, d)
+            ds.append(d)
+
+        dls = DeferredList(ds)
+        def check(ls):
+            for s, x in ls:
+                self.assertEquals(int(x), value)
+            return ls
+
+        dls.addCallback(check)
+        return dls
+
+
+
 
 class ConvertBitShareTest(RuntimeTestCase):
     runtime_class = Toft05Runtime
--- a/viff/test/util.py	Fri Sep 18 19:31:34 2009 +0200
+++ b/viff/test/util.py	Tue Oct 06 10:04:20 2009 +0200
@@ -22,7 +22,7 @@
 from twisted.internet import reactor
 
 from viff.passive import PassiveRuntime
-from viff.runtime import Share, ShareExchanger, ShareExchangerFactory
+from viff.runtime import Share, ShareExchanger, ShareExchangerFactory, SelfShareExchanger, SelfShareExchangerFactory, FakeTransport
 from viff.field import GF
 from viff.config import generate_configs, load_config
 from viff.util import rand
@@ -220,7 +220,13 @@
                     # fire.
                     sentinel = loopbackAsync(server, client)
                     self.close_sentinels.append(sentinel)
-
+            else:
+                protocol = SelfShareExchanger(id, SelfShareExchangerFactory(runtime))
+                protocol.transport = FakeTransport()
+                # Keys for when we are the client and when we are the server.
+                server_key = (id, id)
+                # Store a protocol used when we are the server.
+                self.protocols[server_key] = protocol     
 
 class BinaryOperatorTestCase:
     """Test a binary operator.