changeset 667:e41ce22ed035

Basic support for pre-processing. This adds support for pre-processing to the BasicRuntime. The idea is that a protocol run records the needed data using the preprocess decorator, and using a description of this data, the runtime can begin the following executions generating it. The data needed is recorded based on the methods name and program counter, so this assumes that every execution produces the same tree of program counters. This is true for simple programs like the benchmark, but need not be true for programs that branch on the input data. Please see Issue 3: http://tracker.viff.dk/issue3.
author Martin Geisler <mg@daimi.au.dk>
date Sun, 13 Apr 2008 23:24:46 +0200
parents 876f98730e4c
children 66e0f8fdac7c
files apps/benchmark.py viff/runtime.py viff/test/test_active_runtime.py
diffstat 3 files changed, 138 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/apps/benchmark.py	Sun Apr 13 22:16:28 2008 +0200
+++ b/apps/benchmark.py	Sun Apr 13 23:24:46 2008 +0200
@@ -57,6 +57,7 @@
 import time
 from optparse import OptionParser
 import operator
+from pprint import pprint
 
 from twisted.internet import reactor
 
@@ -122,9 +123,23 @@
 class Benchmark:
 
     def __init__(self, rt, operation):
-        print "Runtime ready, starting protocol"
         self.rt = rt
         self.operation = operation
+
+        if isinstance(self.rt, ActiveRuntime) and self.operation == operator.mul:
+            print "Starting preprocessing"
+            program_desc = {
+                ("generate_triples", (Zp,)):
+                    [(i, 1, 0) for i in range(3 + 2*count, 3 + 3*count)]
+                }
+            preproc = rt.preprocess(program_desc)
+            preproc.addCallback(self.begin)
+        else:
+            print "Need no preprocessing"
+            self.begin(None)
+
+    def begin(self, _):
+        print "Runtime ready, starting protocol"
         self.a_shares = [self.rt.prss_share_random(Zp) for _ in range(count)]
         self.b_shares = [self.rt.prss_share_random(Zp) for _ in range(count)]
         shares_ready = gather_shares(self.a_shares + self.b_shares)
@@ -151,6 +166,11 @@
 
     def finished(self, _):
         print "Finished, synchronizing shutdown."
+
+        if self.rt._needed_data:
+            print "Missing pre-processed data:"
+            pprint(self.rt._needed_data)
+
         sync = self.rt.synchronize()
         sync.addCallback(self.shutdown)
 
--- a/viff/runtime.py	Sun Apr 13 22:16:28 2008 +0200
+++ b/viff/runtime.py	Sun Apr 13 23:24:46 2008 +0200
@@ -44,7 +44,7 @@
 from viff.util import wrapper, rand
 
 from twisted.internet import reactor
-from twisted.internet.defer import Deferred, DeferredList, gatherResults
+from twisted.internet.defer import Deferred, DeferredList, gatherResults, succeed
 from twisted.internet.protocol import ClientFactory, ServerFactory
 from twisted.protocols.basic import Int16StringReceiver
 
@@ -370,6 +370,38 @@
     return inc_pc_wrapper
 
 
+def preprocess(generator):
+    """Track calls to this method.
+
+    The decorated method will be replaced with a proxy method which
+    first tries to get the data needed from C{self._pool}, and if that
+    fails it falls back to the original method.
+
+    The C{generator} method is only used to record where the data
+    should be generated from, the method is not actually called.
+
+    @param generator: Use this method as the generator for
+    pre-processed data.
+    @type generator: C{str}
+    """
+
+    def preprocess_decorator(method):
+
+        @wrapper(method)
+        def preprocess_wrapper(self, *args, **kwargs):
+            pc = tuple(self.program_counter)
+            try:
+                return self._pool[pc]
+            except KeyError:
+                key = (generator, args)
+                pcs = self._needed_data.setdefault(key, [])
+                pcs.append(pc)
+                return method(self, *args, **kwargs)
+
+        return preprocess_wrapper
+    return preprocess_decorator
+
+
 class BasicRuntime:
     """Basic VIFF runtime with no crypto.
 
@@ -426,6 +458,11 @@
             from twisted.internet import defer
             defer.setDebugging(True)
 
+        #: Pool of preprocessed data.
+        self._pool = {}
+        #: Description of needed preprocessed data.
+        self._needed_data = {}
+
         #: Current program counter.
         #:
         #: Whenever a share is sent over the network, it must be
@@ -607,6 +644,72 @@
         self._expect_data(peer_id, "share", share)
         return share
 
+    @increment_pc
+    def preprocess(self, program):
+        """Generate preprocess material.
+
+        The C{program} specifies which methods to call and with which
+        arguments. The generator methods called must adhere to the
+        following interface:
+
+          - They must return a C{(int, Deferred)} tuple where the
+            C{int} tells us how many items of pre-processed data the
+            Deferred will yield.
+
+          - The Deferred must yield a C{list} of the promissed length.
+
+          - The C{list} contains the actual data. This data can be
+            either a Deferred or a C{tuple} of Deferreds.
+
+        The L{ActiveRuntime.generate_triples} method is an example of
+        a method fulfilling this interface.
+
+        @param program: A description of the needed data.
+        @type program: C{dict} mapping C{(str, args)} tuples to
+        program counters
+        """
+
+        def update(results, program_counters):
+            # We concatenate the sub-lists in results.
+            results = sum(results, [])
+
+            wait_list = []
+            for result in results:
+                # We allow pre-processing methods to return tuples of
+                # shares or individual shares as their result. Here we
+                # deconstruct result (if possible) and wait on its
+                # individual parts.
+                if isinstance(result, tuple):
+                    wait_list.extend(result)
+                else:
+                    wait_list.append(result)
+
+            # The pool must map program counters to Deferreds to
+            # present a uniform interface for the functions we
+            # pre-process.
+            results = map(succeed, results)
+
+            # Update the pool with pairs of program counter and data.
+            self._pool.update(zip(program_counters, results))
+            # Return a Deferred that waits on the individual results.
+            # This is important to make it possible for the players to
+            # avoid starting before the pre-processing is complete.
+            return gatherResults(wait_list)
+
+        wait_list = []
+        for ((generator, args), program_counters) in program.iteritems():
+            print "Preprocessing %s (%d items)" % (generator, len(program_counters))
+            func = getattr(self, generator)
+            results = []
+            items = 0
+            while items < len(program_counters):
+                item_count, result = func(*args)
+                items += item_count
+                results.append(result)
+            ready = gatherResults(results)
+            ready.addCallback(update, program_counters)
+            wait_list.append(ready)
+        return DeferredList(wait_list)
 
 class Runtime(BasicRuntime):
     """The VIFF runtime.
@@ -1206,12 +1309,11 @@
         return result
 
     @increment_pc
+    @preprocess("generate_triples")
     def get_triple(self, field):
-        # TODO: This is of course insecure... We should move
-        # generate_triples to a preprocessing step and draw the
-        # triples from a pool instead. Also, using only the first
-        # triple is quite wasteful...
-        result = self.generate_triples(field)
+        # This is a waste, but this function is only called if there
+        # are no pre-processed triples left.
+        count, result = self.generate_triples(field)
         result.addCallback(lambda triples: triples[0])
         return result
 
@@ -1220,10 +1322,12 @@
         """Generate multiplication triples.
 
         These are random numbers M{a}, M{b}, and M{c} such that M{c =
-        ab}.
+        ab}. This function can be used in pre-processing.
 
-        @return: C{list} of 3-tuples.
-        @returntype: C{list} of C{(Share, Share, Share)}
+        @return: Number of triples returned and a Deferred which will
+        yield a C{list} of 3-tuples.
+        @returntype: (C{int}, C{list} of Deferred C{(Share, Share,
+        Share)})
         """
         n = self.num_players
         t = self.threshold
@@ -1250,7 +1354,7 @@
 
         result = gatherResults([single_a, single_b, double_c])
         self.schedule_callback(result, make_triple)
-        return result
+        return T, result
 
     @increment_pc
     def _broadcast(self, sender, message=None):
--- a/viff/test/test_active_runtime.py	Sun Apr 13 22:16:28 2008 +0200
+++ b/viff/test/test_active_runtime.py	Sun Apr 13 23:24:46 2008 +0200
@@ -132,6 +132,8 @@
                 results.append(result)
             return gatherResults(results)
 
-        triples = runtime.generate_triples(self.Zp)
+        count, triples = runtime.generate_triples(self.Zp)
+        self.assertEquals(count, runtime.num_players - 2*runtime.threshold)
+
         triples.addCallback(check)
         return triples