diff options
| author | Mattias Andrée <maandree@operamail.com> | 2014-03-07 15:28:48 +0100 |
|---|---|---|
| committer | Mattias Andrée <maandree@operamail.com> | 2014-03-07 15:28:48 +0100 |
| commit | 82bf4bdfc282ecee2f7da4b2cd1f5f14641d8dc8 (patch) | |
| tree | 07ff8efcc2f6f858d3e4faf68baa765c728dc329 /src/join.py | |
| parent | m (diff) | |
| download | join-python-82bf4bdfc282ecee2f7da4b2cd1f5f14641d8dc8.tar.gz join-python-82bf4bdfc282ecee2f7da4b2cd1f5f14641d8dc8.tar.bz2 join-python-82bf4bdfc282ecee2f7da4b2cd1f5f14641d8dc8.tar.xz | |
remove test stuff
Signed-off-by: Mattias Andrée <maandree@operamail.com>
Diffstat (limited to 'src/join.py')
| -rwxr-xr-x | src/join.py | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/src/join.py b/src/join.py new file mode 100755 index 0000000..b33094d --- /dev/null +++ b/src/join.py @@ -0,0 +1,118 @@ +#!/usr/bin/python3 +# -*- python -*- +''' +join python – Join-calculus for Python +Copyright © 2014 Mattias Andrée (maandree@member.fsf.org) + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. +''' + +import random +import threading + + +class signal: + def __init__(self, f): + self.f = f + + def __call__(self, *args, **kwargs): + class signal_: + def __init__(self, f): + def f_(): + self.rc = f(*args, **kwargs) + self.rc = None + self.t = threading.Thread(target = f_) + self.t.start() + def join(self): + self.t.join() + return self.rc + return signal_(self.f) + + +class fragment: + def __init__(self, f): + self.f = f + self.queue = [] + self.condition = threading.Condition() + + def __call__(self, *args, **kwargs): + self.f(*args, **kwargs) + self.condition.acquire() + self.queue.append((args, kwargs)) + self.condition.notify() + self.condition.release() + + def unjoin(self, args, kwargs): + self.condition.acquire() + self.queue.insert(0, (args, kwargs)) + self.condition.notify() + self.condition.release() + + +def join(*fs): + rc = [] + for f in fs: + f.condition.acquire() + f.condition.wait() + rc.append(f.queue.pop(0)) + f.condition.release() + return rc[0] if len(fs) == 1 else rc + + +def ordered_join(*f_groups): + condition = threading.Condition() + rc = None + index = 0 + for f_group in f_groups: + def join_(fs, index): + params = join(*fs) + already_done = rc is not None + if not already_done: + condition.acquire() + if rc is None: + params = (index, rc) + condition.notify() + condition.release() + else: + if not already_done: + condition.release() + if len(fs) == 1: + fs[0].unjoin(*params) + else: + for i, f in enumerate(fs): + f.unjoin(*(params[i])) + threading.Thread(target = join_, args = (f_group, index)).start() + index += 1 + condition.acquire() + condition.wait() + condition.release() + return rc + + +def unordered_join(*f_groups): + ready = [i for i, fs in enumerate(f_groups) if all([len(f.queue) for f in fs])] + if len(ready): + i = ready[random.randrange(len(ready))] + return (i, join(*(f_groups[i]))) + else: + return ordered_join(*f_groups) + + +def concurrently(*fs): + ts = [threading.Thread(target = f, args = args, kwargs = kwargs) for f in fs] + for t in ts: + ts.start() + for t in ts: + ts.join() + |
