aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMattias Andrée <maandree@operamail.com>2014-03-07 15:51:24 +0100
committerMattias Andrée <maandree@operamail.com>2014-03-07 15:51:24 +0100
commit4aecd57a40ca118ef808955632396157096563e2 (patch)
tree1d4dfd235bad02708c83384f6d4c183ac48cbf35
parentremove test stuff (diff)
downloadjoin-python-4aecd57a40ca118ef808955632396157096563e2.tar.gz
join-python-4aecd57a40ca118ef808955632396157096563e2.tar.bz2
join-python-4aecd57a40ca118ef808955632396157096563e2.tar.xz
m bug fixes + doc
Signed-off-by: Mattias Andrée <maandree@operamail.com>
-rwxr-xr-xsrc/join.py92
1 files changed, 90 insertions, 2 deletions
diff --git a/src/join.py b/src/join.py
index b33094d..9af8b0d 100755
--- a/src/join.py
+++ b/src/join.py
@@ -23,10 +23,28 @@ import threading
class signal:
+ '''
+ Function decorator for signals
+ '''
def __init__(self, f):
+ '''
+ Constructor
+
+ @param f:(...)→¿R? The decorated function
+ '''
self.f = f
+
def __call__(self, *args, **kwargs):
+ '''
+ Function invocation method
+
+ @param args:*... Positional arguments
+ @param kwargs:*... Named arguments
+ @return :join()→¿R? Object with an argumentless function, `join`, that
+ joins with the signal and returns the signals return.
+ This is an extension to join-calculus
+ '''
class signal_:
def __init__(self, f):
def f_():
@@ -40,27 +58,66 @@ class signal:
return signal_(self.f)
+
class fragment:
+ '''
+ Function decorator for fragments
+ '''
def __init__(self, f):
+ '''
+ Constructor
+
+ @param f:(...)→¿R? The decorated function
+ '''
self.f = f
self.queue = []
self.condition = threading.Condition()
+
def __call__(self, *args, **kwargs):
- self.f(*args, **kwargs)
+ '''
+ Function invocation method
+
+ @param args:*... Positional arguments
+ @param kwargs:*... Named arguments
+ @return :¿R? The value returned by the functon
+ '''
+ rc = self.f(*args, **kwargs)
self.condition.acquire()
self.queue.append((args, kwargs))
self.condition.notify()
self.condition.release()
+ return rc
+
def unjoin(self, args, kwargs):
+ '''
+ Used internally be the module to revert non-selected fragments in join-switches
+
+ @param args:tuple<...> Positional arguments
+ @param kwargs:dict<str, ...> Named arguments
+ '''
self.condition.acquire()
self.queue.insert(0, (args, kwargs))
self.condition.notify()
self.condition.release()
+
def join(*fs):
+ '''
+ Join with fragments
+
+ @param fs:*fragment The fragments
+ @return :list<(args:tuple<...>, kwargs:dict<str, ...>)> The positional arguments and named arguments
+ with which the fragments were invoked
+
+ -- OR --
+
+ @param f:fragment The fragment
+ @return :(args:tuple<...>, kwargs:dict<str, ...>) The positional arguments and named arguments
+ with which the fragment were invoked
+ '''
rc = []
for f in fs:
f.condition.acquire()
@@ -70,7 +127,19 @@ def join(*fs):
return rc[0] if len(fs) == 1 else rc
+
def ordered_join(*f_groups):
+ '''
+ Ordered join-switch, joins with the first group of fragments that returns.
+ If there are matched fragments groups that have already returned, the one
+ that appears first the case set is selected.
+
+ @param f_groups:*itr<fragment> The fragments groups
+ @return :(int, (args:tuple<...>, kwargs:dict<str, ...>)|list<←>)
+ The index (zero-based) of the selected case and the positional arguments
+ and named arguments with which the fragments were invoked (as a list of
+ not exactly one fragemnt)
+ '''
condition = threading.Condition()
rc = None
index = 0
@@ -100,7 +169,19 @@ def ordered_join(*f_groups):
return rc
+
def unordered_join(*f_groups):
+ '''
+ Ordered join-switch, joins with the first group of fragments that returns.
+ If there are matched fragments groups that have already returned, one is
+ selected at random, uniformally.
+
+ @param f_groups:*itr<fragment> The fragments groups
+ @return :(int, (args:tuple<...>, kwargs:dict<str, ...>)|list<←>)
+ The index (zero-based) of the selected case and the positional arguments
+ and named arguments with which the fragments were invoked (as a list of
+ not exactly one fragemnt)
+ '''
ready = [i for i, fs in enumerate(f_groups) if all([len(f.queue) for f in fs])]
if len(ready):
i = ready[random.randrange(len(ready))]
@@ -109,10 +190,17 @@ def unordered_join(*f_groups):
return ordered_join(*f_groups)
+
def concurrently(*fs):
- ts = [threading.Thread(target = f, args = args, kwargs = kwargs) for f in fs]
+ '''
+ Run a set of functions concurrently and wait for all of them to return
+
+ @param fs:*()→void The functions to run
+ '''
+ ts = [threading.Thread(target = f) for f in fs]
for t in ts:
ts.start()
for t in ts:
ts.join()
+