viff

changeset 1237:4b6f9e4db99e

Automatic generation of program counters sequences. The benchmark program now automatically generates the needed sequence of program counters by running the benchmark. The generated program counters are then feed to the preprocessor and the benchmark is reexecuted and timing performed.
author Janus Dam Nielsen <janus.nielsen@alexandra.dk>
date Wed, 07 Oct 2009 12:03:06 +0200
parents 5aa1a609ae35
children 199782817963 ed2d02202af0
files apps/benchmark.py viff/active.py viff/runtime.py
diffstat 3 files changed, 81 insertions(+), 70 deletions(-) [+]
line diff
     1.1 --- a/apps/benchmark.py	Wed Oct 07 12:02:23 2009 +0200
     1.2 +++ b/apps/benchmark.py	Wed Oct 07 12:03:06 2009 +0200
     1.3 @@ -63,6 +63,7 @@
     1.4  import viff.reactor
     1.5  viff.reactor.install()
     1.6  from twisted.internet import reactor
     1.7 +from twisted.internet.defer import Deferred
     1.8  
     1.9  from viff.field import GF, GF256, FakeGF
    1.10  from viff.runtime import Runtime, create_runtime, gather_shares, \
    1.11 @@ -87,12 +88,13 @@
    1.12      print "Started", what
    1.13  
    1.14  
    1.15 -def record_stop(_, what):
    1.16 +def record_stop(x, what):
    1.17      stop = time.time()
    1.18      print
    1.19      print "Total time used: %.3f sec" % (stop-start)
    1.20      print "Time per %s operation: %.0f ms" % (what, 1000*(stop-start) / count)
    1.21      print "*" * 6
    1.22 +    return x
    1.23  
    1.24  operations = {"mul": (operator.mul,[]),
    1.25                "compToft05": (operator.ge, [ComparisonToft05Mixin]),
    1.26 @@ -116,7 +118,7 @@
    1.27                    help="the name of the basic runtime to test")
    1.28  parser.add_option("-n", "--num_players", action="store_true", dest="num_players",
    1.29                    help="number of players")
    1.30 -parser.add_option("--mixins", type="choice", choices=mixins.keys(),
    1.31 +parser.add_option("--mixins", type="string",
    1.32                    help="operation to benchmark")
    1.33  parser.add_option("--prss", action="store_true",
    1.34                    help="use PRSS for preprocessing")
    1.35 @@ -138,7 +140,7 @@
    1.36                    help="additional arguments to the runtime, the format is a comma separated list of id=value pairs e.g. --args s=1,d=0,lambda=1")
    1.37  
    1.38  parser.set_defaults(modulus=2**65, threshold=1, count=10,
    1.39 -                    runtime=runtimes.keys()[0], mixins=mixins.keys(), num_players=2, prss=True,
    1.40 +                    runtime=runtimes.keys()[0], mixins="", num_players=2, prss=True,
    1.41                      operation=operations.keys()[0], parallel=True, fake=False)
    1.42  
    1.43  # Add standard VIFF options.
    1.44 @@ -174,53 +176,43 @@
    1.45      def __init__(self, rt, operation):
    1.46          self.rt = rt
    1.47          self.operation = operation
    1.48 -        self.sync_preprocess()
    1.49 -
    1.50 -    def sync_preprocess(self):
    1.51 -        print "Synchronizing preprocessing"
    1.52 +        self.pc = None
    1.53          sys.stdout.flush()
    1.54          sync = self.rt.synchronize()
    1.55 +        self.doTest(sync, lambda x: x)
    1.56          self.rt.schedule_callback(sync, self.preprocess)
    1.57 +        self.doTest(sync, lambda x: self.rt.shutdown())
    1.58 +        
    1.59 +#     def sync_preprocess(self):
    1.60 +#         print "Synchronizing preprocessing"
    1.61 +#         sys.stdout.flush()
    1.62 +#         sync = self.rt.synchronize()
    1.63 +#         self.rt.schedule_callback(sync, self.preprocess)
    1.64  
    1.65 -    def preprocess(self, _):
    1.66 -        program_desc = {}
    1.67 -
    1.68 -        if isinstance(self.rt, BasicActiveRuntime):
    1.69 -            # TODO: Make this optional and maybe automatic. The
    1.70 -            # program descriptions below were found by carefully
    1.71 -            # studying the output reported when the benchmarks were
    1.72 -            # run with no preprocessing. So they are quite brittle.
    1.73 -            if self.operation == operator.mul:
    1.74 -                key = ("generate_triples", (Zp,))
    1.75 -                desc = [(i, 1, 0) for i in range(3, 3 + count)]
    1.76 -                program_desc.setdefault(key, []).extend(desc)
    1.77 -            elif isinstance(self.rt, ComparisonToft05Mixin):
    1.78 -                key = ("generate_triples", (GF256,))
    1.79 -                desc = sum([[(c, 64, i, 1, 1, 0) for i in range(2, 33)] +
    1.80 -                            [(c, 64, i, 3, 1, 0) for i in range(17, 33)]
    1.81 -                            for c in range(3 + 2*count, 3 + 3*count)],
    1.82 -                           [])
    1.83 -                program_desc.setdefault(key, []).extend(desc)
    1.84 -            elif isinstance(self.rt, ComparisonToft07Mixin):
    1.85 -                key = ("generate_triples", (Zp,))
    1.86 -                desc = sum([[(c, 2, 4, i, 2, 1, 0) for i in range(1, 33)] +
    1.87 -                            [(c, 2, 4, 99, 2, 1, 0)] +
    1.88 -                            [(c, 2, 4, i, 1, 0) for i in range(65, 98)]
    1.89 -                            for c in range(3 + 2*count, 3 + 3*count)],
    1.90 -                           [])
    1.91 -                program_desc.setdefault(key, []).extend(desc)
    1.92 -
    1.93 -        if program_desc:
    1.94 +    def preprocess(self, needed_data):
    1.95 +        print "Preprocess", needed_data
    1.96 +        if needed_data:
    1.97              print "Starting preprocessing"
    1.98              record_start("preprocessing")
    1.99 -            preproc = self.rt.preprocess(program_desc)
   1.100 +            preproc = self.rt.preprocess(needed_data)
   1.101              preproc.addCallback(record_stop, "preprocessing")
   1.102 -            self.rt.schedule_callback(preproc, self.begin)
   1.103 +            return preproc
   1.104          else:
   1.105              print "Need no preprocessing"
   1.106 -            self.begin(None)
   1.107 +            return None
   1.108 +
   1.109 +    def doTest(self, d, termination_function):
   1.110 +        print "doTest", self.rt.program_counter
   1.111 +        self.rt.schedule_callback(d, self.begin)
   1.112 +        self.rt.schedule_callback(d, self.sync_test)
   1.113 +#         self.rt.schedule_callback(d, self.countdown, 3)
   1.114 +        self.rt.schedule_callback(d, self.run_test)
   1.115 +        self.rt.schedule_callback(d, self.sync_test)
   1.116 +        self.rt.schedule_callback(d, self.finished, termination_function)
   1.117 +        return d
   1.118  
   1.119      def begin(self, _):
   1.120 +        print "begin", self.rt.program_counter
   1.121          print "Runtime ready, generating shares"
   1.122          self.a_shares = []
   1.123          self.b_shares = []
   1.124 @@ -234,43 +226,49 @@
   1.125              self.a_shares.append(self.rt.input([inputter], Zp, a))
   1.126              self.b_shares.append(self.rt.input([inputter], Zp, b))
   1.127          shares_ready = gather_shares(self.a_shares + self.b_shares)
   1.128 -        self.rt.schedule_callback(shares_ready, self.sync_test)
   1.129 +        return shares_ready
   1.130  
   1.131 -    def sync_test(self, _):
   1.132 +    def sync_test(self, x):
   1.133          print "Synchronizing test start."
   1.134          sys.stdout.flush()
   1.135          sync = self.rt.synchronize()
   1.136 -        self.rt.schedule_callback(sync, self.countdown, 3)
   1.137 +        self.rt.schedule_callback(sync, lambda y: x)
   1.138 +        return sync
   1.139  
   1.140 -    def countdown(self, _, seconds):
   1.141 -        if seconds > 0:
   1.142 -            print "Starting test in %d" % seconds
   1.143 -            sys.stdout.flush()
   1.144 -            reactor.callLater(1, self.countdown, None, seconds - 1)
   1.145 -        else:
   1.146 -            print "Starting test now"
   1.147 -            sys.stdout.flush()
   1.148 -            self.run_test(None)
   1.149 +#     def countdown(self, _, seconds):
   1.150 +#         if seconds > 0:
   1.151 +#             print "Starting test in %d" % seconds
   1.152 +#             sys.stdout.flush()
   1.153 +#             reactor.callLater(1, self.countdown, None, seconds - 1)
   1.154 +#         else:
   1.155 +#             print "Starting test now"
   1.156 +#             sys.stdout.flush()
   1.157 +#             self.run_test(None)
   1.158  
   1.159      def run_test(self, _):
   1.160          raise NotImplemented("Override this abstract method in a sub class.")
   1.161  
   1.162 -    def finished(self, _):
   1.163 +    def finished(self, needed_data, termination_function):
   1.164          sys.stdout.flush()
   1.165  
   1.166          if self.rt._needed_data:
   1.167              print "Missing pre-processed data:"
   1.168 -            for (func, args), pcs in self.rt._needed_data.iteritems():
   1.169 +            for (func, args), pcs in needed_data.iteritems():
   1.170                  print "* %s%s:" % (func, args)
   1.171                  print "  " + pformat(pcs).replace("\n", "\n  ")
   1.172  
   1.173 -        self.rt.shutdown()
   1.174 +        return termination_function(needed_data)
   1.175  
   1.176  # This class implements a benchmark where run_test executes all
   1.177  # operations in parallel.
   1.178  class ParallelBenchmark(Benchmark):
   1.179  
   1.180 -    def run_test(self, _):
   1.181 +    def run_test(self, shares):
   1.182 +        print "rt", self.rt.program_counter, self.pc
   1.183 +        if self.pc != None:
   1.184 +            self.rt.program_counter = self.pc
   1.185 +        else:
   1.186 +            self.pc = list(self.rt.program_counter)
   1.187          c_shares = []
   1.188          record_start("parallel test")
   1.189          while self.a_shares and self.b_shares:
   1.190 @@ -280,24 +278,30 @@
   1.191  
   1.192          done = gather_shares(c_shares)
   1.193          done.addCallback(record_stop, "parallel test")
   1.194 -        self.rt.schedule_callback(done, self.finished)
   1.195 +        def f(x):
   1.196 +            needed_data = self.rt._needed_data
   1.197 +            self.rt._needed_data = {}
   1.198 +            return needed_data
   1.199 +        done.addCallback(f)
   1.200 +        return done
   1.201 +
   1.202  
   1.203  # A benchmark where the operations are executed one after each other.
   1.204  class SequentialBenchmark(Benchmark):
   1.205  
   1.206 -    def run_test(self, _):
   1.207 +    def run_test(self, _, termination_function, d):
   1.208          record_start("sequential test")
   1.209 -        self.single_operation(None)
   1.210 +        self.single_operation(None, termination_function)
   1.211  
   1.212 -    def single_operation(self, _):
   1.213 +    def single_operation(self, _, termination_function):
   1.214          if self.a_shares and self.b_shares:
   1.215              a = self.a_shares.pop()
   1.216              b = self.b_shares.pop()
   1.217              c = self.operation(a, b)
   1.218 -            self.rt.schedule_callback(c, self.single_operation)
   1.219 +            self.rt.schedule_callback(c, self.single_operation, termination_function)
   1.220          else:
   1.221              record_stop(None, "sequential test")
   1.222 -            self.finished(None)
   1.223 +            self.finished(None, termination_function)
   1.224  
   1.225  # Identify the base runtime class.
   1.226  base_runtime_class = runtimes[options.runtime]
     2.1 --- a/viff/active.py	Wed Oct 07 12:02:23 2009 +0200
     2.2 +++ b/viff/active.py	Wed Oct 07 12:03:06 2009 +0200
     2.3 @@ -378,11 +378,11 @@
     2.4      def get_triple(self, field):
     2.5          # This is a waste, but this function is only called if there
     2.6          # are no pre-processed triples left.
     2.7 -        count, result = self.generate_triples(field)
     2.8 +        count, result = self.generate_triples(field, None)
     2.9          result.addCallback(lambda triples: triples[0])
    2.10          return result
    2.11  
    2.12 -    def generate_triples(self, field):
    2.13 +    def generate_triples(self, field, number_of_requested_triples):
    2.14          """Generate multiplication triples.
    2.15  
    2.16          These are random numbers *a*, *b*, and *c* such that ``c =
    2.17 @@ -423,11 +423,11 @@
    2.18  
    2.19      @preprocess("generate_triples")
    2.20      def get_triple(self, field):
    2.21 -        count, result = self.generate_triples(field)
    2.22 +        count, result = self.generate_triples(field, None)
    2.23          result.addCallback(lambda triples: triples[0])
    2.24          return result
    2.25  
    2.26 -    def generate_triples(self, field):
    2.27 +    def generate_triples(self, field, number_of_requested_triples):
    2.28          """Generate a multiplication triple using PRSS.
    2.29  
    2.30          These are random numbers *a*, *b*, and *c* such that ``c =
     3.1 --- a/viff/runtime.py	Wed Oct 07 12:02:23 2009 +0200
     3.2 +++ b/viff/runtime.py	Wed Oct 07 12:03:06 2009 +0200
     3.3 @@ -475,17 +475,21 @@
     3.4      """
     3.5  
     3.6      def preprocess_decorator(method):
     3.7 -
     3.8          @wrapper(method)
     3.9          def preprocess_wrapper(self, *args, **kwargs):
    3.10              pc = tuple(self.program_counter)
    3.11              try:
    3.12 +                self.program_counter[-1] += 1
    3.13                  return self._pool[pc]
    3.14              except KeyError:
    3.15 -                key = (generator, args)
    3.16 -                pcs = self._needed_data.setdefault(key, [])
    3.17 -                pcs.append(pc)
    3.18 -                return method(self, *args, **kwargs)
    3.19 +                try:
    3.20 +                    key = (generator, args)
    3.21 +                    pcs = self._needed_data.setdefault(key, [])
    3.22 +                    pcs.append(pc)
    3.23 +                    self.program_counter.append(0)
    3.24 +                    return method(self, *args, **kwargs)
    3.25 +                finally:
    3.26 +                    self.program_counter.pop()
    3.27  
    3.28          return preprocess_wrapper
    3.29      return preprocess_decorator
    3.30 @@ -808,6 +812,9 @@
    3.31              func = getattr(self, generator)
    3.32              results = []
    3.33              items = 0
    3.34 +            args = list(args)
    3.35 +            args.append(len(program_counters))
    3.36 +            args = tuple(args)
    3.37              while items < len(program_counters):
    3.38                  item_count, result = func(*args)
    3.39                  items += item_count