changeset 546:21fbfaae93e2

Implemented asynchronous loopback tests.
author Martin Geisler <mg@daimi.au.dk>
date Thu, 06 Mar 2008 10:58:28 +0100
parents 91966740ab55
children f558b5d1b197
files viff/test/loopback.py
diffstat 1 files changed, 62 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- a/viff/test/loopback.py	Tue Mar 04 08:50:52 2008 +0100
+++ b/viff/test/loopback.py	Thu Mar 06 10:58:28 2008 +0100
@@ -60,10 +60,11 @@
 
 # Twisted Imports
 from twisted.protocols import policies
-from twisted.internet import interfaces, protocol, main, defer
+from twisted.internet import interfaces, protocol, main, defer, reactor
 from twisted.python import failure
 from twisted.internet.interfaces import IAddress
 
+from viff.util import rand
 
 class _LoopbackQueue(object):
     """
@@ -108,16 +109,68 @@
     # ITransport
     def __init__(self, q):
         self.q = q
+        self._buffer = ''
+        self._pending = None
+        self._will_disconnect = False
+
+    def close(self):
+        self.q.disconnect = True
+        self._cancel_pending()
+
+    def _cancel_pending(self):
+        if self._pending is not None and self._pending.active():
+            self._pending.cancel()
+
+    def _schedule_write(self):
+        if self.q.disconnect:
+            # We are disconnected, so just return.
+            return
+            
+        if self._pending is None or not self._pending.active():
+            delay = rand.uniform(0, 0.001)
+            self._pending = reactor.callLater(delay, self._send_some_bytes)
+
+    def _send_some_bytes(self):
+        assert self._pending.called
+        assert not self._pending.cancelled
+
+        # Check that there is still something to send:
+        if not self._buffer:
+            # If the buffer is empty and we have been asked to
+            # disconnect, then do so.
+            if self._will_disconnect:
+                self._cancel_pending()
+                # Empty the buffer.
+                bytes = self._buffer
+                self._buffer = ''
+                # Signal the disconnect to those waiting on the queue.
+                self.q.disconnect = True
+                self.q.put(bytes)
+            # Return without scheduling another write.
+            return
+
+        # Cut the buffer in two at a random place and write a chunk to
+        # the protocol:
+        cut = rand.randint(0, len(self._buffer))
+        chunk, self._buffer = self._buffer[:cut], self._buffer[cut:]
+
+        # Schedule another go after a random delay.
+        self._schedule_write()
+
+        # Finally put the chunk into the queue.
+        self.q.put(chunk)
 
     def write(self, bytes):
-        self.q.put(bytes)
+        self._buffer += bytes
+        self._schedule_write()
 
     def writeSequence(self, iovec):
-        self.q.put(''.join(iovec))
+        self._buffer += ''.join(iovec)
+        self._schedule_write()
 
     def loseConnection(self):
-        self.q.disconnect = True
-        self.q.put('')
+        self._will_disconnect = True
+        self._schedule_write()
 
     def getPeer(self):
         return _LoopbackAddress()
@@ -224,6 +277,10 @@
             disconnect = True
             pump(client, clientToServer, server)
         if disconnect:
+            # Forcibly close both transports. This is important to
+            # cancel any pending calls.
+            server.transport.close()
+            client.transport.close()
             # Someone wanted to disconnect, so okay, the connection is gone.
             server.connectionLost(failure.Failure(main.CONNECTION_DONE))
             client.connectionLost(failure.Failure(main.CONNECTION_DONE))