diff options
| author | Mattias Andrée <maandree@operamail.com> | 2014-03-07 15:51:24 +0100 |
|---|---|---|
| committer | Mattias Andrée <maandree@operamail.com> | 2014-03-07 15:51:24 +0100 |
| commit | 4aecd57a40ca118ef808955632396157096563e2 (patch) | |
| tree | 1d4dfd235bad02708c83384f6d4c183ac48cbf35 | |
| parent | remove test stuff (diff) | |
| download | join-python-4aecd57a40ca118ef808955632396157096563e2.tar.gz join-python-4aecd57a40ca118ef808955632396157096563e2.tar.bz2 join-python-4aecd57a40ca118ef808955632396157096563e2.tar.xz | |
m bug fixes + doc
Signed-off-by: Mattias Andrée <maandree@operamail.com>
| -rwxr-xr-x | src/join.py | 92 |
1 files changed, 90 insertions, 2 deletions
diff --git a/src/join.py b/src/join.py index b33094d..9af8b0d 100755 --- a/src/join.py +++ b/src/join.py @@ -23,10 +23,28 @@ import threading class signal: + ''' + Function decorator for signals + ''' def __init__(self, f): + ''' + Constructor + + @param f:(...)→¿R? The decorated function + ''' self.f = f + def __call__(self, *args, **kwargs): + ''' + Function invocation method + + @param args:*... Positional arguments + @param kwargs:*... Named arguments + @return :join()→¿R? Object with an argumentless function, `join`, that + joins with the signal and returns the signals return. + This is an extension to join-calculus + ''' class signal_: def __init__(self, f): def f_(): @@ -40,27 +58,66 @@ class signal: return signal_(self.f) + class fragment: + ''' + Function decorator for fragments + ''' def __init__(self, f): + ''' + Constructor + + @param f:(...)→¿R? The decorated function + ''' self.f = f self.queue = [] self.condition = threading.Condition() + def __call__(self, *args, **kwargs): - self.f(*args, **kwargs) + ''' + Function invocation method + + @param args:*... Positional arguments + @param kwargs:*... Named arguments + @return :¿R? The value returned by the functon + ''' + rc = self.f(*args, **kwargs) self.condition.acquire() self.queue.append((args, kwargs)) self.condition.notify() self.condition.release() + return rc + def unjoin(self, args, kwargs): + ''' + Used internally be the module to revert non-selected fragments in join-switches + + @param args:tuple<...> Positional arguments + @param kwargs:dict<str, ...> Named arguments + ''' self.condition.acquire() self.queue.insert(0, (args, kwargs)) self.condition.notify() self.condition.release() + def join(*fs): + ''' + Join with fragments + + @param fs:*fragment The fragments + @return :list<(args:tuple<...>, kwargs:dict<str, ...>)> The positional arguments and named arguments + with which the fragments were invoked + + -- OR -- + + @param f:fragment The fragment + @return :(args:tuple<...>, kwargs:dict<str, ...>) The positional arguments and named arguments + with which the fragment were invoked + ''' rc = [] for f in fs: f.condition.acquire() @@ -70,7 +127,19 @@ def join(*fs): return rc[0] if len(fs) == 1 else rc + def ordered_join(*f_groups): + ''' + Ordered join-switch, joins with the first group of fragments that returns. + If there are matched fragments groups that have already returned, the one + that appears first the case set is selected. + + @param f_groups:*itr<fragment> The fragments groups + @return :(int, (args:tuple<...>, kwargs:dict<str, ...>)|list<←>) + The index (zero-based) of the selected case and the positional arguments + and named arguments with which the fragments were invoked (as a list of + not exactly one fragemnt) + ''' condition = threading.Condition() rc = None index = 0 @@ -100,7 +169,19 @@ def ordered_join(*f_groups): return rc + def unordered_join(*f_groups): + ''' + Ordered join-switch, joins with the first group of fragments that returns. + If there are matched fragments groups that have already returned, one is + selected at random, uniformally. + + @param f_groups:*itr<fragment> The fragments groups + @return :(int, (args:tuple<...>, kwargs:dict<str, ...>)|list<←>) + The index (zero-based) of the selected case and the positional arguments + and named arguments with which the fragments were invoked (as a list of + not exactly one fragemnt) + ''' ready = [i for i, fs in enumerate(f_groups) if all([len(f.queue) for f in fs])] if len(ready): i = ready[random.randrange(len(ready))] @@ -109,10 +190,17 @@ def unordered_join(*f_groups): return ordered_join(*f_groups) + def concurrently(*fs): - ts = [threading.Thread(target = f, args = args, kwargs = kwargs) for f in fs] + ''' + Run a set of functions concurrently and wait for all of them to return + + @param fs:*()→void The functions to run + ''' + ts = [threading.Thread(target = f) for f in fs] for t in ts: ts.start() for t in ts: ts.join() + |
