From 4aecd57a40ca118ef808955632396157096563e2 Mon Sep 17 00:00:00 2001 From: Mattias Andrée Date: Fri, 7 Mar 2014 15:51:24 +0100 Subject: m bug fixes + doc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Mattias Andrée --- src/join.py | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 90 insertions(+), 2 deletions(-) diff --git a/src/join.py b/src/join.py index b33094d..9af8b0d 100755 --- a/src/join.py +++ b/src/join.py @@ -23,10 +23,28 @@ import threading class signal: + ''' + Function decorator for signals + ''' def __init__(self, f): + ''' + Constructor + + @param f:(...)→¿R? The decorated function + ''' self.f = f + def __call__(self, *args, **kwargs): + ''' + Function invocation method + + @param args:*... Positional arguments + @param kwargs:*... Named arguments + @return :join()→¿R? Object with an argumentless function, `join`, that + joins with the signal and returns the signals return. + This is an extension to join-calculus + ''' class signal_: def __init__(self, f): def f_(): @@ -40,27 +58,66 @@ class signal: return signal_(self.f) + class fragment: + ''' + Function decorator for fragments + ''' def __init__(self, f): + ''' + Constructor + + @param f:(...)→¿R? The decorated function + ''' self.f = f self.queue = [] self.condition = threading.Condition() + def __call__(self, *args, **kwargs): - self.f(*args, **kwargs) + ''' + Function invocation method + + @param args:*... Positional arguments + @param kwargs:*... Named arguments + @return :¿R? The value returned by the functon + ''' + rc = self.f(*args, **kwargs) self.condition.acquire() self.queue.append((args, kwargs)) self.condition.notify() self.condition.release() + return rc + def unjoin(self, args, kwargs): + ''' + Used internally be the module to revert non-selected fragments in join-switches + + @param args:tuple<...> Positional arguments + @param kwargs:dict Named arguments + ''' self.condition.acquire() self.queue.insert(0, (args, kwargs)) self.condition.notify() self.condition.release() + def join(*fs): + ''' + Join with fragments + + @param fs:*fragment The fragments + @return :list<(args:tuple<...>, kwargs:dict)> The positional arguments and named arguments + with which the fragments were invoked + + -- OR -- + + @param f:fragment The fragment + @return :(args:tuple<...>, kwargs:dict) The positional arguments and named arguments + with which the fragment were invoked + ''' rc = [] for f in fs: f.condition.acquire() @@ -70,7 +127,19 @@ def join(*fs): return rc[0] if len(fs) == 1 else rc + def ordered_join(*f_groups): + ''' + Ordered join-switch, joins with the first group of fragments that returns. + If there are matched fragments groups that have already returned, the one + that appears first the case set is selected. + + @param f_groups:*itr The fragments groups + @return :(int, (args:tuple<...>, kwargs:dict)|list<←>) + The index (zero-based) of the selected case and the positional arguments + and named arguments with which the fragments were invoked (as a list of + not exactly one fragemnt) + ''' condition = threading.Condition() rc = None index = 0 @@ -100,7 +169,19 @@ def ordered_join(*f_groups): return rc + def unordered_join(*f_groups): + ''' + Ordered join-switch, joins with the first group of fragments that returns. + If there are matched fragments groups that have already returned, one is + selected at random, uniformally. + + @param f_groups:*itr The fragments groups + @return :(int, (args:tuple<...>, kwargs:dict)|list<←>) + The index (zero-based) of the selected case and the positional arguments + and named arguments with which the fragments were invoked (as a list of + not exactly one fragemnt) + ''' ready = [i for i, fs in enumerate(f_groups) if all([len(f.queue) for f in fs])] if len(ready): i = ready[random.randrange(len(ready))] @@ -109,10 +190,17 @@ def unordered_join(*f_groups): return ordered_join(*f_groups) + def concurrently(*fs): - ts = [threading.Thread(target = f, args = args, kwargs = kwargs) for f in fs] + ''' + Run a set of functions concurrently and wait for all of them to return + + @param fs:*()→void The functions to run + ''' + ts = [threading.Thread(target = f) for f in fs] for t in ts: ts.start() for t in ts: ts.join() + -- cgit v1.2.3-70-g09d2