PK%CGp0Xc,c,tests/test_pebble.pyimport os import time import signal import unittest import threading try: # Python 2 from Queue import Queue except: # Python 3 from queue import Queue from pebble import decorators from pebble import synchronized, sighandler, thread from pebble import waitfortasks, waitforthreads, waitforqueues from pebble import Task, TimeoutError, TaskCancelled results = 0 semaphore = threading.Semaphore() @synchronized def synchronized_function(): """A docstring.""" return decorators._synchronized_lock.acquire(False) @synchronized(semaphore) def custom_synchronized_function(): """A docstring.""" return semaphore.acquire(False) try: from signal import SIGALRM, SIGFPE, SIGIO @sighandler(SIGALRM) def signal_handler(signum, frame): """A docstring.""" global results results = 1 @sighandler((SIGFPE, SIGIO)) def signals_handler(signum, frame): pass except ImportError: pass @thread.spawn def thread_function(value): time.sleep(value) return value @thread.spawn def queue_function(queues, index, value): time.sleep(value) queues[index].put(value) return value @thread.concurrent def concurrent_function(value): time.sleep(value) return value @thread.spawn def spurious_wakeup_function(value, lock): value = value / 2 time.sleep(value) lock.acquire() time.sleep(value) return value class TestSynchronizedDecorator(unittest.TestCase): def test_wrapper_decorator_docstring(self): """Synchronized docstring of the original function is preserved.""" self.assertEqual(synchronized_function.__doc__, "A docstring.") def test_syncronized_locked(self): """Synchronized Lock is acquired during execution of decorated function.""" self.assertFalse(synchronized_function()) def test_syncronized_released(self): """Synchronized Lock is released during execution of decorated function.""" synchronized_function() self.assertTrue(decorators._synchronized_lock.acquire(False)) decorators._synchronized_lock.release() def test_custom_syncronized_locked(self): """Synchronized semaphore is acquired during execution of decorated function.""" self.assertFalse(custom_synchronized_function()) def test_custom_syncronized_released(self): """Synchronized semaphore is acquired during execution of decorated function.""" custom_synchronized_function() self.assertTrue(semaphore.acquire(False)) semaphore.release() class TestSigHandler(unittest.TestCase): def test_wrapper_decorator_docstring(self): """Sighandler docstring of the original function is preserved.""" if os.name != 'nt': self.assertEqual(signal_handler.__doc__, "A docstring.") def test_sighandler(self): """Sighandler installs SIGALRM.""" if os.name != 'nt': self.assertEqual(signal.getsignal(signal.SIGALRM).__name__, signal_handler.__name__) def test_sighandler_multiple(self): """Sighandler installs SIGFPE and SIGIO.""" if os.name != 'nt': self.assertEqual(signal.getsignal(signal.SIGFPE).__name__, signals_handler.__name__) self.assertEqual(signal.getsignal(signal.SIGIO).__name__, signals_handler.__name__) def test_sigalarm_sighandler(self): """Sighandler for SIGALARM works.""" if os.name != 'nt': os.kill(os.getpid(), signal.SIGALRM) time.sleep(0.1) self.assertEqual(results, 1) class TestWaitForTasks(unittest.TestCase): def test_waitfortasks_single(self): """Waitfortasks waits for a single task.""" task = concurrent_function(0.01) self.assertEqual(list(waitfortasks([task]))[0], task) def test_waitfortasks_multiple(self): """Waitfortasks waits for multiple tasks.""" tasks = [] for _ in range(5): tasks.append(concurrent_function(0.01)) time.sleep(0.1) self.assertEqual(list(waitfortasks(tasks)), tasks) def test_waitfortasks_timeout(self): """Waitfortasks returns empty list if timeout.""" task = concurrent_function(0.1) self.assertEqual(list(waitfortasks([task], timeout=0.01)), []) def test_waitfortasks_restore(self): """Waitfortasks Task object is restored to original one.""" task = concurrent_function(0.01) expected = sorted(dir(task)) waitfortasks([task]) self.assertEqual(sorted(dir(task)), expected) class TestWaitForThreads(unittest.TestCase): def test_waitforthreads_single(self): """Waitforthreads waits for a single thread.""" thread = thread_function(0.01) self.assertEqual(list(waitforthreads([thread]))[0], thread) def test_waitforthreads_multiple(self): """Waitforthreads waits for multiple threads.""" threads = [] for _ in range(5): threads.append(thread_function(0.01)) time.sleep(0.1) self.assertEqual(list(waitforthreads(threads)), threads) def test_waitforthreads_timeout(self): """Waitforthreads returns empty list if timeout.""" thread = thread_function(0.1) self.assertEqual(list(waitforthreads([thread], timeout=0.01)), []) def test_waitforthreads_restore(self): """Waitforthreads get_ident is restored to original one.""" if hasattr(threading, 'get_ident'): expected = threading.get_ident else: expected = threading._get_ident thread = thread_function(0) time.sleep(0.01) waitforthreads([thread]) if hasattr(threading, 'get_ident'): self.assertEqual(threading.get_ident, expected) else: self.assertEqual(threading._get_ident, expected) def test_waitforthreads_spurious(self): """Waitforthreads tolerates spurious wakeups.""" lock = threading.RLock() thread = spurious_wakeup_function(0.1, lock) self.assertEqual(list(waitforthreads([thread])), [thread]) class TestWaitForQueues(unittest.TestCase): def setUp(self): self.queues = [Queue(), Queue(), Queue()] def test_waitforqueues_single(self): """Waitforqueues waits for a single queue.""" queue_function(self.queues, 0, 0.01) self.assertEqual(list(waitforqueues(self.queues))[0], self.queues[0]) def test_waitforqueues_multiple(self): """Waitforqueues waits for multiple queues.""" for index in range(3): queue_function(self.queues, index, 0.01) time.sleep(0.1) self.assertEqual(list(waitforqueues(self.queues)), self.queues) def test_waitforqueues_timeout(self): """Waitforqueues returns empty list if timeout.""" queue_function(self.queues, 0, 0.1) self.assertEqual(list(waitforqueues(self.queues, timeout=0.01)), []) def test_waitforqueues_restore(self): """Waitforqueues Queue object is restored to original one.""" expected = sorted(dir(self.queues[0])) queue_function(self.queues, 0, 0) waitforqueues(self.queues) self.assertEqual(sorted(dir(self.queues[0])), expected) class TestTask(unittest.TestCase): def setUp(self): self.task = Task(0) def test_number(self): """Task number is reported correctly.""" t = Task(42) self.assertEqual(t.number, 42) def test_task_id(self): """Task ID is forwarded to it.""" t = Task(0, identifier='foo') self.assertEqual(t.id, 'foo') def test_ready(self): """Task is ready if results are seself.task.""" self.task._set(None) self.assertTrue(self.task.ready) def test_not_read(self): """Task is not ready if results are not seself.task.""" self.assertFalse(self.task.ready) def test_cancelled(self): """Task is cancelled if cancel() is called.""" self.task.cancel() self.assertTrue(self.task.cancelled) def test_not_cancelled(self): """Task is not cancelled if cancel() is not called.""" self.assertFalse(self.task.cancelled) def test_started(self): """Task is started if timestamp is self.task.""" self.task._timestamp = 42 self.assertTrue(self.task.started) def test_not_started(self): """Task is not started if timestamp is not seself.task.""" self.assertFalse(self.task.started) def test_success(self): """Task is successful if results are seself.task.""" self.task._set(42) self.assertTrue(self.task.success) def test_not_success(self): """Task is not successful if results are not seself.task.""" self.assertFalse(self.task.success) def test_not_success_exception(self): """Task is not successful if results are an Exception.""" self.task._set(Exception("BOOM")) self.assertFalse(self.task.success) def test_wait(self): """Task wait returns True if results are ready.""" self.task._set(42) self.assertTrue(self.task.wait()) def test_wait_no_timeout(self): """Task wait returns True if timeout does not expire.""" self.task._set(42) self.assertTrue(self.task.wait(timeout=0)) def test_wait_timeout(self): """Task wait returns False if timeout expired.""" self.assertFalse(self.task.wait(timeout=0)) def test_get(self): """Task values are returned by get if results are set.""" self.task._set(42) self.assertEqual(self.task.get(), 42) def test_get_exception(self): """Task get raises the exception set as results.""" self.task._set(Exception("BOOM")) self.assertRaises(Exception, self.task.get) def test_get_timeout(self): """Task get raises TimeoutError if timeout expires.""" self.assertRaises(TimeoutError, self.task.get, 0) def test_get_no_timeout(self): """Task values are returned by get if results are set before timeout expires.""" self.task._set(42) self.assertEqual(self.task.get(0), 42) def test_get_timeout_cancelled(self): """Task is cancelled if Timeout expires and cancel is set.""" try: self.task.get(timeout=0, cancel=True) except TimeoutError: pass self.assertTrue(self.task.cancelled) def test_cancel(self): """Task get raises TaskCancelled if task is cancelled.""" self.task.cancel() self.assertRaises(TaskCancelled, self.task.get) def test_set_unique(self): """Task _set works only once.""" self.task._set(42) self.task._set(None) self.assertEqual(self.task.get(), 42) def test_set_not_overriding(self): """Task _set does not override a cancelled task.""" self.task.cancel() self.task._set(42) self.assertRaises(TaskCancelled, self.task.get) def test_cancel_overriding(self): """Task cancel does not override a set task.""" self.task._set(42) self.task.cancel() self.assertEqual(self.task.get(), 42) PK%CG tests/test_thread_spawn.pyimport os import unittest try: # Python 2 from Queue import Queue except: # Python 3 from queue import Queue from pebble import thread def undecorated(queue, argument, keyword_argument=0): queue.put(argument + keyword_argument) @thread.spawn(name='foo') def decorated(queue, argument, keyword_argument=0): """A docstring.""" queue.put(argument + keyword_argument) class TestThreadSpawnObj(object): a = 0 def __init__(self): self.b = 1 @classmethod @thread.spawn def clsmethod(cls, queue): queue.put(cls.a) @thread.spawn def instmethod(self, queue): queue.put(self.b) @staticmethod @thread.spawn def stcmethod(queue): queue.put(2) class TestThreadSpawn(unittest.TestCase): def setUp(self): self.spawnobj = TestThreadSpawnObj() def test_docstring(self): """Thread Spawn docstring is preserved.""" self.assertEqual(decorated.__doc__, "A docstring.") def test_wrong_parameters(self): """Thread Spawn raises ValueError if wrong params.""" self.assertRaises(ValueError, thread.spawn, undecorated, args=[1]) def test_thread_wrong_decoration(self): """Thread Spawn raises ValueError if given wrong params.""" try: @thread.spawn(5, name='foo') def wrong(): return except Exception as error: self.assertTrue(isinstance(error, ValueError)) def test_defaults(self): """Thread Spawn default values are preserved.""" queue = Queue() thrd = decorated(queue, 1, 1) thrd.join() self.assertFalse(thrd.daemon) def test_arguments(self): """Thread Spawn decorator arguments are forwarded.""" queue = Queue() thrd = decorated(queue, 1, 1) thrd.join() self.assertEqual(thrd.name, 'foo') def test_undecorated_results(self): """Thread Spawn undecorated results are produced.""" queue = Queue() thrd = thread.spawn(target=decorated, args=[queue, 1], kwargs={'keyword_argument': 1}) results = queue.get() thrd.join() self.assertEqual(results, 2) def test_decorated_results(self): """Thread Spawn results are produced.""" queue = Queue() thrd = decorated(queue, 1, 1) results = queue.get() thrd.join() self.assertEqual(results, 2) def test_class_method(self): """Thread Spawn decorated classmethods.""" queue = Queue() thrd = TestThreadSpawnObj.clsmethod(queue) results = queue.get() thrd.join() self.assertEqual(results, 0) def test_instance_method(self): """Thread Spawn decorated instance methods.""" queue = Queue() thrd = self.spawnobj.instmethod(queue) results = queue.get() thrd.join() self.assertEqual(results, 1) def test_static_method(self): """Thread Spawn decorated static methods.""" if os.name != 'nt': queue = Queue() thrd = self.spawnobj.stcmethod(queue) results = queue.get() thrd.join() self.assertEqual(results, 2) PK%CGtests/__init__.pyPKuKGW~4~4tests/test_process_pool.pyimport os import time import signal import unittest import threading try: from queue import Queue except ImportError: from Queue import Queue import pebble from pebble import process from pebble import TaskCancelled, TimeoutError, PoolError, ProcessExpired event = threading.Event() initarg = 0 results = 0 exception = None def callback(task): global results global exception try: results = task.get() except Exception as error: exception = error event.set() def queue_factory(): return Queue(maxsize=5) def error_callback(_): raise Exception("BOOM!") def initializer(value): global initarg initarg = value def broken_initializer(): raise Exception("BOOM!") def function(argument, keyword_argument=0): """A docstring.""" return argument + keyword_argument def initializer_function(): return initarg def error_function(): raise Exception("BOOM!") def long_function(): time.sleep(1) def pid_function(): time.sleep(0.1) return os.getpid() def sigterm_function(): signal.signal(signal.SIGTERM, signal.SIG_IGN) time.sleep(100) def suicide_function(): os._exit(1) class TestProcessPool(unittest.TestCase): def setUp(self): global initarg initarg = 0 self.event = threading.Event() self.event.clear() self.results = None self.exception = None def callback(self, task): try: self.results = task.get() except Exception as error: self.exception = error finally: self.event.set() def test_process_pool_queue_factory(self): """Process Pool queue factory is called.""" with process.Pool(queue_factory=queue_factory) as pool: self.assertEqual(pool._context.task_queue.maxsize, 5) def test_process_pool_single_task(self): """Process Pool single task.""" with process.Pool() as pool: task = pool.schedule(function, args=[1], kwargs={'keyword_argument': 1}) self.assertEqual(task.get(), 2) def test_process_pool_multiple_tasks(self): """Process Pool multiple tasks.""" tasks = [] with process.Pool() as pool: for index in range(5): tasks.append(pool.schedule(function, args=[1])) self.assertEqual(sum([t.get() for t in tasks]), 5) def test_process_pool_callback(self): """Process Pool results are forwarded to the callback.""" with process.Pool() as pool: pool.schedule(function, args=[1], callback=self.callback, kwargs={'keyword_argument': 1}) self.event.wait() self.assertEqual(self.results, 2) def test_process_pool_error(self): """Process Pool errors are raised by task get.""" with process.Pool() as pool: task = pool.schedule(error_function) self.assertRaises(Exception, task.get) def test_process_pool_error_callback(self): """Process Pool errors are forwarded to callback.""" with process.Pool() as pool: pool.schedule(error_function, callback=self.callback) self.event.wait() self.assertTrue(isinstance(self.exception, Exception)) def test_process_pool_timeout(self): """Process Pool task raises TimeoutError if so.""" with process.Pool() as pool: task = pool.schedule(long_function, timeout=0.1) self.assertRaises(TimeoutError, task.get) def test_process_pool_timeout_callback(self): """Process Pool TimeoutError is forwarded to callback.""" with process.Pool() as pool: pool.schedule(long_function, callback=self.callback, timeout=0.1) self.event.wait() self.assertTrue(isinstance(self.exception, TimeoutError)) def test_process_pool_cancel(self): """Process Pool task raises TaskCancelled if so.""" with process.Pool() as pool: task = pool.schedule(long_function) task.cancel() self.assertRaises(TaskCancelled, task.get) def test_process_pool_cancel_callback(self): """Process Pool TaskCancelled is forwarded to callback.""" with process.Pool() as pool: task = pool.schedule(long_function, callback=self.callback) task.cancel() self.event.wait() self.assertTrue(isinstance(self.exception, TaskCancelled)) def test_process_pool_different_process(self): """Process Pool multiple tasks are handled by different processes.""" tasks = [] with process.Pool(workers=2) as pool: for i in range(0, 5): tasks.append(pool.schedule(pid_function)) self.assertEqual(len(set([t.get() for t in tasks])), 2) def test_process_pool_task_limit(self): """Process Pool task limit is honored.""" tasks = [] with process.Pool(task_limit=2) as pool: for i in range(0, 4): tasks.append(pool.schedule(pid_function)) self.assertEqual(len(set([t.get() for t in tasks])), 2) def test_process_pool_stop_timeout(self): """Process Pool workers are stopped if task timeout.""" with process.Pool() as pool: task1 = pool.schedule(pid_function) pool.schedule(long_function, timeout=0.1) task2 = pool.schedule(pid_function) self.assertNotEqual(task1.get(), task2.get()) def test_process_pool_stop_cancel(self): """Process Pool workers are stopped if task cancelled.""" with process.Pool() as pool: task1 = pool.schedule(pid_function) task = pool.schedule(long_function) # sleep if not the task will be cancelled before sending to worker time.sleep(0.1) task.cancel() task2 = pool.schedule(pid_function) self.assertNotEqual(task1.get(), task2.get()) def test_process_pool_schedule_id(self): """Process Pool task ID is forwarded to it.""" with process.Pool() as pool: task = pool.schedule(function, args=[1], identifier='foo') self.assertEqual(task.id, 'foo') def test_process_pool_initializer(self): """Process Pool initializer is correctly run.""" with process.Pool(initializer=initializer, initargs=[1]) as pool: task = pool.schedule(initializer_function) self.assertEqual(task.get(), 1) def test_process_pool_broken_initializer(self): """Process Pool broken initializer is notified.""" with self.assertRaises(PoolError): with pebble.process.Pool(initializer=broken_initializer) as pool: pool.schedule(function) def test_process_pool_running(self): """Process Pool is active if a task is scheduled.""" with process.Pool() as pool: pool.schedule(function, args=[1]) self.assertTrue(pool.active) def test_process_pool_stopped(self): """Process Pool is not active once stopped.""" with process.Pool() as pool: pool.schedule(function, args=[1]) self.assertFalse(pool.active) def test_process_pool_close_tasks(self): """Process Pool all tasks are performed on close.""" tasks = [] pool = process.Pool() for index in range(10): tasks.append(pool.schedule(function, args=[index])) pool.close() pool.join() map(self.assertTrue, [t.ready for t in tasks]) def test_process_pool_close_stopped(self): """Process Pool is stopped after close.""" pool = process.Pool() pool.schedule(function, args=[1]) pool.close() pool.join() self.assertFalse(pool.active) def test_process_pool_stop_tasks(self): """Process Pool not all tasks are performed on stop.""" tasks = [] pool = process.Pool() for index in range(10): tasks.append(pool.schedule(function, args=[index])) pool.stop() pool.join() self.assertTrue(len([t for t in tasks if not t.ready]) > 0) def test_process_pool_stop_stopped(self): """Process Pool is stopped after stop.""" pool = process.Pool() pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertFalse(pool.active) def test_process_pool_join_workers(self): """Process Pool no worker is running after join.""" pool = process.Pool(workers=4) pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertEqual(len(pool._pool_manager.worker_manager.workers), 0) def test_process_pool_join_running(self): """Process Pool RuntimeError is raised if active pool joined.""" with process.Pool() as pool: pool.schedule(function, args=[1]) self.assertRaises(RuntimeError, pool.join) def test_process_pool_join_tasks_timeout(self): """Process Pool TimeoutError is raised if join on long tasks.""" pool = process.Pool() for index in range(2): pool.schedule(long_function) pool.close() self.assertRaises(TimeoutError, pool.join, 0.4) pool.stop() pool.join() def test_process_pool_callback_error(self): """Process Pool does not stop if error in callback.""" with process.Pool() as pool: try: pool.schedule(function, args=[1], callback=error_callback, kwargs={'keyword_argument': 1}) pool.schedule(function, args=[1], kwargs={'keyword_argument': 1}) except Exception: self.fail("Error raised") def test_process_pool_exception_isolated(self): """Process Pool an Exception does not affect other tasks.""" with process.Pool() as pool: task = pool.schedule(error_function) try: task.get() except: pass task = pool.schedule(function, args=[1], kwargs={'keyword_argument': 1}) self.assertEqual(task.get(), 2) @unittest.skipIf(os.name == 'nt', "Test won't run on Windows'.") def test_process_pool_ignoring_sigterm(self): """Process Pool ignored SIGTERM signal are handled on Unix.""" with process.Pool() as pool: task = pool.schedule(sigterm_function, timeout=0.2) self.assertRaises(TimeoutError, task.get) def test_process_pool_expired_worker(self): """Process Pool unexpect death of worker raises ProcessExpired.""" with process.Pool() as pool: task = pool.schedule(suicide_function) self.assertRaises(ProcessExpired, task.get) # DEADLOCK TESTS @process.spawn(name='worker_process', daemon=True) def broken_worker_process_tasks(_, channel): """Process failing in receiving new tasks.""" with channel.mutex.reader: os._exit(1) @process.spawn(name='worker_process', daemon=True) def broken_worker_process_results(_, channel): """Process failing in delivering results.""" for _ in pebble.process.pool.worker_get_next_task(channel, 2): with channel.mutex.writer: os._exit(1) class TestProcessPoolDeadlockOnNewTasks(unittest.TestCase): def setUp(self): self.worker_process = pebble.process.pool.worker_process pebble.process.pool.worker_process = broken_worker_process_tasks pebble.process.channel.LOCK_TIMEOUT = 0.1 def tearDown(self): pebble.process.pool.worker_process = self.worker_process pebble.process.channel.LOCK_TIMEOUT = 60 def test_pool_deadlock(self): """Process Pool no deadlock if reading worker dies locking channel.""" with self.assertRaises(PoolError): with pebble.process.Pool() as pool: with self.assertRaises(pebble.ProcessExpired): pool.schedule(function) def test_pool_deadlock_stop(self): """Process Pool reading deadlocks are stopping the Pool.""" with self.assertRaises(PoolError): pool = pebble.process.Pool() for _ in range(10): pool.schedule(function) time.sleep(0.1) class TestProcessPoolDeadlockOnResults(unittest.TestCase): def setUp(self): self.worker_process = pebble.process.pool.worker_process pebble.process.pool.worker_process = broken_worker_process_results pebble.process.channel.LOCK_TIMEOUT = 0.1 def tearDown(self): pebble.process.pool.worker_process = self.worker_process pebble.process.channel.LOCK_TIMEOUT = 60 def test_pool_deadlock(self): """Process Pool no deadlock if writing worker dies locking channel.""" with self.assertRaises(PoolError): with pebble.process.Pool() as pool: with self.assertRaises(pebble.ProcessExpired): pool.schedule(function).get() def test_pool_deadlock_stop(self): """Process Pool writing deadlocks are stopping the Pool.""" with self.assertRaises(PoolError): pool = pebble.process.Pool() for _ in range(10): pool.schedule(function) time.sleep(0.1) PK%CG|۳@@ tests/test_process_concurrent.pyimport os import time import signal import unittest import threading from pebble import process, TaskCancelled, TimeoutError, ProcessExpired event = threading.Event() initarg = 0 results = 0 exception = None def callback(concurrent): global results global exception try: results = concurrent.get() except Exception as error: exception = error finally: event.set() def undecorated_simple(): return 0 def undecorated(argument, keyword_argument=0): """A docstring.""" return argument + keyword_argument @process.concurrent def decorated(argument, keyword_argument=0): """A docstring.""" return argument + keyword_argument @process.concurrent def error_decorated(): raise Exception("BOOM!") @process.concurrent def critical_decorated(): os._exit(123) @process.concurrent def long_decorated(): time.sleep(1) @process.concurrent(timeout=0.2) def timeout_decorated(): time.sleep(1) @process.concurrent(callback=callback) def decorated_callback(argument, keyword_argument=0): """A docstring.""" return argument + keyword_argument @process.concurrent(callback=callback) def error_decorated_callback(): raise Exception("BOOM!") @process.concurrent(callback=callback) def long_decorated_callback(): time.sleep(1) @process.concurrent(callback=callback, timeout=0.2) def timeout_decorated_callback(): time.sleep(1) @process.concurrent(timeout=0.2) def sigterm_decorated(): signal.signal(signal.SIGTERM, signal.SIG_IGN) time.sleep(100) class TestProcessConcurrentObj(object): a = 0 def __init__(self): self.b = 1 @classmethod @process.concurrent def clsmethod(cls): return cls.a @process.concurrent def instmethod(self): return self.b @staticmethod @process.concurrent def stcmethod(): return 2 class TestProcessConcurrent(unittest.TestCase): def setUp(self): global results global exception results = 0 exception = None event.clear() self.results = 0 self.concurrentobj = TestProcessConcurrentObj() def callback(self, task): self.results = task.get() event.set() def test_docstring(self): """Process Concurrent docstring is preserved.""" self.assertEqual(decorated.__doc__, "A docstring.") def test_wrong_parameters(self): """Process Concurrent raises ValueError if wrong params.""" self.assertRaises(ValueError, process.concurrent, undecorated, args=[1]) def test_process_wrong_decoration(self): """Process Concurrent raises ValueError if given wrong params.""" try: @process.concurrent(5, name='foo') def wrong(): return except Exception as error: self.assertTrue(isinstance(error, ValueError)) def test_class_method(self): """Process Concurrent decorated classmethods.""" task = TestProcessConcurrentObj.clsmethod() self.assertEqual(task.get(), 0) def test_instance_method(self): """Process Concurrent decorated instance methods.""" task = self.concurrentobj.instmethod() self.assertEqual(task.get(), 1) @unittest.skipIf(os.name == 'nt', "Test won't run on Windows.") def test_static_method(self): """Process Concurrent decorated static methods (Unix only).""" task = self.concurrentobj.stcmethod() self.assertEqual(task.get(), 2) def test_undecorated_results(self): """Process Concurrent undecorated results are produced.""" task = process.concurrent(target=undecorated_simple) self.assertEqual(task.get(), 0) def test_undecorated_results_arguments(self): """Process Concurrent undecorated with args results are produced.""" task = process.concurrent(target=undecorated, args=[1], kwargs={'keyword_argument': 1}) self.assertEqual(task.get(), 2) def test_undecorated_started(self): """Process Concurrent undecorated task is set to started.""" task = process.concurrent(target=undecorated, args=[1], kwargs={'keyword_argument': 1}) self.assertTrue(task.started) def test_decorated_results(self): """Process Concurrent results are produced.""" task = decorated(1, 1) self.assertEqual(task.get(), 2) def test_decorated_results_callback(self): """Process Concurrent results are forwarded to the callback.""" decorated_callback(1, 1) event.wait() self.assertEqual(results, 2) def test_error_decorated(self): """Process Concurrent errors are raised by concurrent get.""" task = error_decorated() self.assertRaises(Exception, task.get) def test_error_decorated_callback(self): """Process Concurrent errors are forwarded to callback.""" error_decorated_callback() event.wait() self.assertTrue(isinstance(exception, Exception)) def test_cancel_decorated(self): """Process Concurrent concurrent raises ConcurrentCancelled if so.""" task = long_decorated() task.cancel() self.assertRaises(TaskCancelled, task.get) def test_cancel_decorated_callback(self): """Process Concurrent TaskCancelled is forwarded to callback.""" task = long_decorated_callback() task.cancel() event.wait() self.assertTrue(isinstance(exception, TaskCancelled)) def test_undecorated_callback(self): """Process Concurrent undecorated results are forwarded to callback.""" task = process.concurrent(target=undecorated, args=[1], kwargs={'keyword_argument': 1}, callback=self.callback) event.wait() self.assertEqual(task.get(), 2) def test_timeout_decorated(self): """Process Concurrent concurrent raises TimeoutError if so.""" task = timeout_decorated() self.assertRaises(TimeoutError, task.get) def test_timeout_decorated_callback(self): """Process Concurrent TimeoutError is forwarded to callback.""" timeout_decorated_callback() event.wait() self.assertTrue(isinstance(exception, TimeoutError)) def test_undecorated_id(self): """Process concurrent ID is forwarded to it.""" task = process.concurrent(target=undecorated, args=[1], kwargs={'keyword_argument': 1}, identifier='foo') self.assertEqual(task.id, 'foo') def test_decorated_dead_process(self): """Process Concurrent ProcessExpired is raised if process dies.""" task = critical_decorated() self.assertRaises(ProcessExpired, task.get) @unittest.skipIf(os.name == 'nt', "Test won't run on Windows.") def test_decorated_pool_ignoring_sigterm(self): """Process Concurrent ignored SIGTERM signal are handled on Unix.""" task = sigterm_decorated() self.assertRaises(TimeoutError, task.get) PKuKG:j@.!.!tests/test_thread_pool.pyimport time import unittest import threading try: from queue import Queue except ImportError: from Queue import Queue from pebble import thread from pebble import PoolError, TaskCancelled, TimeoutError event = threading.Event() initarg = 0 results = 0 exception = None def callback(task): global results global exception try: results = task.get() except Exception as error: exception = error event.set() def queue_factory(): return Queue(maxsize=5) def error_callback(task): raise Exception("BOOM!") def initializer(value): global initarg initarg = value def broken_initializer(): raise Exception("BOOM!") def function(argument, keyword_argument=0): """A docstring.""" return argument + keyword_argument def initializer_function(): return initarg def error_function(): raise Exception("BOOM!") def long_function(): time.sleep(1) def tid_function(): time.sleep(0.1) return threading.current_thread() class TestThreadPool(unittest.TestCase): def setUp(self): global initarg initarg = 0 self.event = threading.Event() self.event.clear() self.results = None self.exception = None def callback(self, task): try: self.results = task.get() except Exception as error: self.exception = error finally: self.event.set() def test_thread_pool_queue_factory(self): """Thread Pool queue factory is called.""" with thread.Pool(queue_factory=queue_factory) as pool: self.assertEqual(pool._context.task_queue.maxsize, 5) def test_thread_pool_single_task(self): """Thread Pool single task.""" with thread.Pool() as pool: task = pool.schedule(function, args=[1], kwargs={'keyword_argument': 1}) self.assertEqual(task.get(), 2) def test_thread_pool_multiple_tasks(self): """Thread Pool multiple tasks.""" tasks = [] with thread.Pool() as pool: for index in range(5): tasks.append(pool.schedule(function, args=[1])) self.assertEqual(sum([t.get() for t in tasks]), 5) def test_thread_pool_callback(self): """Thread Pool results are forwarded to the callback.""" with thread.Pool() as pool: pool.schedule(function, args=[1], callback=self.callback, kwargs={'keyword_argument': 1}) self.event.wait() self.assertEqual(self.results, 2) def test_thread_pool_error(self): """Thread Pool errors are raised by task get.""" with thread.Pool() as pool: task = pool.schedule(error_function) self.assertRaises(Exception, task.get) def test_thread_pool_error_callback(self): """Thread Pool errors are forwarded to callback.""" with thread.Pool() as pool: pool.schedule(error_function, callback=self.callback) self.event.wait() self.assertTrue(isinstance(self.exception, Exception)) def test_thread_pool_cancel(self): """Thread Pool task raises TaskCancelled if so.""" with thread.Pool() as pool: task = pool.schedule(long_function) task.cancel() self.assertRaises(TaskCancelled, task.get) def test_thread_pool_cancel_callback(self): """Thread Pool TaskCancelled is forwarded to callback.""" with thread.Pool() as pool: task = pool.schedule(long_function, callback=self.callback) task.cancel() self.event.wait() self.assertTrue(isinstance(self.exception, TaskCancelled)) def test_thread_pool_different_thread(self): """Thread Pool multiple tasks are handled by different threades.""" tasks = [] with thread.Pool(workers=2) as pool: for i in range(0, 5): tasks.append(pool.schedule(tid_function)) self.assertEqual(len(set([t.get() for t in tasks])), 2) def test_thread_pool_task_limit(self): """Thread Pool task limit is honored.""" tasks = [] with thread.Pool(task_limit=2) as pool: for i in range(0, 4): tasks.append(pool.schedule(tid_function)) self.assertEqual(len(set([t.get() for t in tasks])), 2) def test_thread_pool_schedule_id(self): """Thread Pool task ID is forwarded to it.""" with thread.Pool() as pool: task = pool.schedule(function, args=[1], identifier='foo') self.assertEqual(task.id, 'foo') def test_thread_pool_initializer(self): """Thread Pool initializer is correctly run.""" with thread.Pool(initializer=initializer, initargs=[1]) as pool: task = pool.schedule(initializer_function) self.assertEqual(task.get(), 1) def test_thread_pool_broken_initializer(self): """Thread Pool broken initializer is notified.""" with self.assertRaises(PoolError): with thread.Pool(initializer=broken_initializer) as pool: pool.schedule(function) def test_thread_pool_running(self): """Thread Pool is active if a task is scheduled.""" with thread.Pool() as pool: pool.schedule(function, args=[1]) self.assertTrue(pool.active) def test_thread_pool_stopped(self): """Thread Pool is not active once stopped.""" with thread.Pool() as pool: pool.schedule(function, args=[1]) self.assertFalse(pool.active) def test_thread_pool_close_tasks(self): """Thread Pool all tasks are performed on close.""" tasks = [] pool = thread.Pool() for index in range(10): tasks.append(pool.schedule(function, args=[index])) pool.close() pool.join() map(self.assertTrue, [t.ready for t in tasks]) def test_thread_pool_close_stopped(self): """Thread Pool is stopped after close.""" pool = thread.Pool() pool.schedule(function, args=[1]) pool.close() pool.join() self.assertFalse(pool.active) def test_thread_pool_stop_tasks(self): """Thread Pool not all tasks are performed on stop.""" tasks = [] pool = thread.Pool() for index in range(10): tasks.append(pool.schedule(long_function, args=[index])) pool.stop() pool.join() self.assertTrue(len([t for t in tasks if not t.ready]) > 0) def test_thread_pool_stop_stopped(self): """Thread Pool is stopped after stop.""" pool = thread.Pool() pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertFalse(pool.active) def test_thread_pool_join_workers(self): """Thread Pool no worker is running after join.""" pool = thread.Pool(workers=4) pool.schedule(function, args=[1]) pool.stop() pool.join() self.assertEqual(len(pool._pool_manager.workers), 0) def test_thread_pool_join_running(self): """Thread Pool RuntimeError is raised if active pool joined.""" with thread.Pool() as pool: pool.schedule(function, args=[1]) self.assertRaises(RuntimeError, pool.join) def test_thread_pool_join_tasks_timeout(self): """Thread Pool TimeoutError is raised if join on long tasks.""" pool = thread.Pool() for index in range(2): pool.schedule(long_function) pool.close() self.assertRaises(TimeoutError, pool.join, 0.4) pool.stop() pool.join() def test_thread_pool_callback_error(self): """Thread Pool stop if error in callback.""" with thread.Pool() as pool: pool.schedule(function, args=[1], callback=error_callback, kwargs={'keyword_argument': 1}) time.sleep(0.1) self.assertRaises(PoolError, pool.schedule, function, args=[1]) def test_thread_pool_exception_isolated(self): """Thread Pool an Exception does not affect other tasks.""" with thread.Pool() as pool: task = pool.schedule(error_function) try: task.get() except: pass task = pool.schedule(function, args=[1], kwargs={'keyword_argument': 1}) self.assertEqual(task.get(), 2) PK%CG# z.--tests/test_thread_concurrent.pyimport time import unittest import threading from pebble import thread, TaskCancelled event = threading.Event() initarg = 0 results = 0 exception = None def callback(task): global results global exception try: results = task.get() except Exception as error: exception = error finally: event.set() def undecorated_simple(): return 0 def undecorated(argument, keyword_argument=0): """A docstring.""" return argument + keyword_argument @thread.concurrent def decorated(argument, keyword_argument=0): """A docstring.""" return argument + keyword_argument @thread.concurrent def error_decorated(): raise Exception("BOOM!") @thread.concurrent def long_decorated(): time.sleep(1) @thread.concurrent(callback=callback) def decorated_callback(argument, keyword_argument=0): """A docstring.""" return argument + keyword_argument @thread.concurrent(callback=callback) def error_decorated_callback(): raise Exception("BOOM!") @thread.concurrent(callback=callback) def long_decorated_callback(): time.sleep(1) class TestThreadConcurrentObj(object): a = 0 def __init__(self): self.b = 1 @classmethod @thread.concurrent def clsmethod(cls): return cls.a @thread.concurrent def instmethod(self): return self.b @staticmethod @thread.concurrent def stcmethod(): return 2 class TestThreadConcurrent(unittest.TestCase): def setUp(self): global results global exception results = 0 exception = None event.clear() self.concurrentobj = TestThreadConcurrentObj() def callback(self, task): self.results = task.get() event.set() def test_docstring(self): """Thread Concurrent docstring is preserved.""" self.assertEqual(decorated.__doc__, "A docstring.") def test_wrong_parameters(self): """Thread Concurrent raises ValueError if wrong params.""" self.assertRaises(ValueError, thread.concurrent, undecorated, args=[1]) def test_thread_wrong_decoration(self): """Thread Concurrent raises ValueError if given wrong params.""" try: @thread.concurrent(5, name='foo') def wrong(): return except Exception as error: self.assertTrue(isinstance(error, ValueError)) def test_class_method(self): """Thread Concurrent decorated classmethods.""" task = TestThreadConcurrentObj.clsmethod() self.assertEqual(task.get(), 0) def test_instance_method(self): """Thread Concurrent decorated instance methods.""" task = self.concurrentobj.instmethod() self.assertEqual(task.get(), 1) def test_static_method(self): """Thread Concurrent decorated static methods.""" task = self.concurrentobj.stcmethod() self.assertEqual(task.get(), 2) def test_undecorated_results(self): """Process Concurrent undecorated results are produced.""" task = thread.concurrent(target=undecorated_simple) self.assertEqual(task.get(), 0) def test_undecorated_results_arguments(self): """Process Concurrent undecorated with args results are produced.""" task = thread.concurrent(target=undecorated, args=[1], kwargs={'keyword_argument': 1}) self.assertEqual(task.get(), 2) def test_decorated_results(self): """Thread Concurrent results are produced.""" task = decorated(1, 1) self.assertEqual(task.get(), 2) def test_decorated_results_callback(self): """Thread Concurrent results are forwarded to the callback.""" decorated_callback(1, 1) event.wait() self.assertEqual(results, 2) def test_error_decorated(self): """Thread Concurrent errors are raised by task get.""" task = error_decorated() self.assertRaises(Exception, task.get) def test_error_decorated_callback(self): """Thread Concurrent errors are forwarded to callback.""" error_decorated_callback() event.wait() self.assertTrue(isinstance(exception, Exception)) def test_cancel_decorated(self): """Thread Concurrent task raises TaskCancelled if so.""" task = long_decorated() task.cancel() self.assertRaises(TaskCancelled, task.get) def test_cancel_decorated_callback(self): """Thread Concurrent TaskCancelled is forwarded to callback.""" task = long_decorated_callback() task.cancel() event.wait() self.assertTrue(isinstance(exception, TaskCancelled)) def test_undecorated_callback(self): """Thread Concurrent undecorated results are forwarded to callback.""" task = thread.concurrent(target=undecorated, args=[1], kwargs={'keyword_argument': 1}, callback=self.callback) event.wait() self.assertEqual(task.get(), 2) def test_undecorated_id(self): """Thread Concurrent ID is forwarded to it.""" task = thread.concurrent(target=undecorated, args=[1], kwargs={'keyword_argument': 1}, identifier='foo') self.assertEqual(task.id, 'foo') PK%CG^V/tests/test_process_spawn.pyimport os import unittest from multiprocessing import Queue from pebble import process def undecorated(queue, argument, keyword_argument=0): queue.put(argument + keyword_argument) @process.spawn def decorated(queue, argument, keyword_argument=0): """A docstring.""" queue.put(argument + keyword_argument) @process.spawn(name='foo') def decorated_kword(queue, argument, keyword_argument=0): """A docstring.""" queue.put(argument + keyword_argument) class TestProcessSpawnObj(object): a = 0 def __init__(self): self.b = 1 @classmethod @process.spawn def clsmethod(cls, queue): queue.put(cls.a) @process.spawn def instmethod(self, queue): queue.put(self.b) @staticmethod @process.spawn def stcmethod(queue): queue.put(2) class TestProcessSpawn(unittest.TestCase): def setUp(self): self.spawnobj = TestProcessSpawnObj() def test_docstring(self): """Process Spawn docstring is preserved.""" self.assertEqual(decorated.__doc__, "A docstring.") def test_wrong_parameters(self): """Process Spawn raises ValueError if wrong params.""" self.assertRaises(ValueError, process.spawn, undecorated, args=[1]) def test_wrong_decoration(self): """Process Spawn decorator raises ValueError if wrong params.""" try: @process.spawn(5, name='foo') def wrong(): return except Exception as error: self.assertTrue(isinstance(error, ValueError)) def test_undecorated_results(self): """Process Spawn undecorated results are produced.""" queue = Queue() proc = process.spawn(target=decorated_kword, args=[queue, 1]) results = queue.get() proc.join() self.assertEqual(results, 1) def test_undecorated_keyworkd_results(self): """Process Spawn undecorated with keyword, results are produced.""" queue = Queue() proc = process.spawn(target=decorated_kword, args=[queue, 1], kwargs={'keyword_argument': 1}) results = queue.get() proc.join() self.assertEqual(results, 2) def test_defaults(self): """Process Spawn default values are preserved.""" queue = Queue() proc = decorated(queue, 1, 1) proc.join() self.assertFalse(proc.daemon) def test_arguments(self): """Process Spawn decorator arguments are forwarded.""" queue = Queue() proc = decorated_kword(queue, 1, 1) proc.join() self.assertEqual(proc.name, 'foo') def test_decorated_results(self): """Process Spawn results are produced.""" queue = Queue() proc = decorated(queue, 1, 1) results = queue.get() proc.join() self.assertEqual(results, 2) def test_class_method(self): """Process Spawn decorated_kword classmethods.""" queue = Queue() proc = TestProcessSpawnObj.clsmethod(queue) results = queue.get() proc.join() self.assertEqual(results, 0) def test_instance_method(self): """Process Spawn decorated_kword instance methods.""" queue = Queue() proc = self.spawnobj.instmethod(queue) results = queue.get() proc.join() self.assertEqual(results, 1) def test_static_method(self): """Process Spawn decorated_kword static methods.""" if os.name != 'nt': queue = Queue() proc = self.spawnobj.stcmethod(queue) results = queue.get() proc.join() self.assertEqual(results, 2) PK%CG wwpebble/__init__.py__all__ = ['process', 'thread', 'waitfortasks', 'waitforthreads', 'waitforqueues', 'synchronized', 'sighandler', 'Task', 'PebbleError', 'PoolError', 'TimeoutError', 'TaskCancelled', 'ProcessExpired'] from . import thread from . import process from .task import Task from .decorators import synchronized, sighandler from .functions import waitforqueues, waitfortasks, waitforthreads from .exceptions import TimeoutError, ProcessExpired from .exceptions import PebbleError, PoolError, TaskCancelled PK%CG9pebble/decorators.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . import signal import threading from functools import wraps _synchronized_lock = threading.Lock() def synchronized(*args): """A synchronized function prevents two or more callers to interleave its execution preventing race conditions. The synchronized decorator accepts as optional parameter a Lock, RLock or Semaphore object which will be employed to ensure the function's atomicity. If no synchronization object is given, a single threading.Lock will be used. This implies that between different decorated function only one at a time will be executed. """ if callable(args[0]): return decorate_synchronized(args[0], _synchronized_lock) else: def wrap(function): return decorate_synchronized(function, args[0]) return wrap def decorate_synchronized(function, lock): @wraps(function) def wrapper(*args, **kwargs): with lock: return function(*args, **kwargs) return wrapper def sighandler(signals): """Sets the decorated function as signal handler of given *signals*. *signals* can be either a single signal or a list/tuple of multiple ones. """ def wrap(function): set_signal_handlers(signals, function) @wraps(function) def wrapper(*args, **kwargs): return function(*args, **kwargs) return wrapper return wrap def set_signal_handlers(signals, function): if isinstance(signals, (list, tuple)): for signum in signals: signal.signal(signum, function) else: signal.signal(signals, function) PK%CG pebble/utils.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . # Common utility functions from time import sleep from inspect import isclass from itertools import count from traceback import format_exc, print_exc try: # Python 2 from Queue import Queue except: # Python 3 from queue import Queue from .exceptions import PoolError, TimeoutError # Pool states STOPPED = 0 RUNNING = 1 CLOSED = 2 CREATED = 3 EXPIRED = 4 ERROR = 5 def function_handler(launcher, decorator, *args, **kwargs): """Distinguishes between function and decorator usage of spawn and concurrent functions. """ if isfunction(args, kwargs): return launcher(kwargs.pop('target', None), **kwargs) elif issimpledecorator(args, kwargs): return decorator(args[0], launcher) elif isparametrizeddecorator(args, kwargs): def wrap(function): return decorator(function, launcher, **kwargs) return wrap else: raise ValueError("Only keyword arguments are accepted.") def isfunction(args, kwargs): """spawn or concurrent used as regular function.""" if not args and kwargs and 'target' in kwargs: return True else: return False def issimpledecorator(args, kwargs): """spawn or concurrent used as decorator with no parameters.""" if args and not kwargs: return True else: return False def isparametrizeddecorator(args, kwargs): """spawn or concurrent used as decorator with parameters.""" if not args and kwargs: return True else: return False def execute(function, args=(), kwargs={}): """Runs the given function returning its results or exception.""" try: return function(*args, **kwargs) except Exception as error: error.traceback = format_exc() return error PK%CG}*  pebble/functions.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . import threading from time import time from types import MethodType _waitforthreads_lock = threading.Lock() def waitfortasks(tasks, timeout=None): """Waits for one or more *Task* to be ready or until *timeout* expires. *tasks* is a list containing one or more *pebble.Task* objects. If *timeout* is not None the function will block for the specified amount of seconds. The function returns a list containing the ready *Tasks*. """ lock = threading.Condition(threading.Lock()) prepare_tasks(tasks, lock) try: wait_tasks(tasks, lock, timeout) finally: reset_tasks(tasks) return filter(lambda t: t.ready, tasks) def prepare_tasks(tasks, lock): """Replaces task._set() method in order to notify the waiting Condition.""" for task in tasks: task._pebble_lock = lock with task._task_ready: task._pebble_old_method = task._set task._set = MethodType(new_method, task) def wait_tasks(tasks, lock, timeout): with lock: if not any(map(lambda t: t.ready, tasks)): lock.wait(timeout) def reset_tasks(tasks): """Resets original task._set() method.""" for task in tasks: with task._task_ready: task._set = task._pebble_old_method delattr(task, '_pebble_old_method') delattr(task, '_pebble_lock') def waitforqueues(queues, timeout=None): """Waits for one or more *Queue* to be ready or until *timeout* expires. *queues* is a list containing one or more *Queue.Queue* objects. If *timeout* is not None the function will block for the specified amount of seconds. The function returns a list containing the ready *Queues*. """ lock = threading.Condition(threading.Lock()) prepare_queues(queues, lock) try: wait_queues(queues, lock, timeout) finally: reset_queues(queues) return filter(lambda q: not q.empty(), queues) def prepare_queues(queues, lock): """Replaces queue._put() method in order to notify the waiting Condition.""" for queue in queues: queue._pebble_lock = lock with queue.mutex: queue._pebble_old_method = queue._put queue._put = MethodType(new_method, queue) def wait_queues(queues, lock, timeout): with lock: if not any(map(lambda q: not q.empty(), queues)): lock.wait(timeout) def reset_queues(queues): """Resets original queue._put() method.""" for queue in queues: with queue.mutex: queue._put = queue._pebble_old_method delattr(queue, '_pebble_old_method') delattr(queue, '_pebble_lock') def waitforthreads(threads, timeout=None): """Waits for one or more *Thread* to exit or until *timeout* expires. .. note:: Expired *Threads* are not joined by *waitforthreads*. *threads* is a list containing one or more *threading.Thread* objects. If *timeout* is not None the function will block for the specified amount of seconds. The function returns a list containing the ready *Threads*. """ old_function = None lock = threading.Condition(threading.Lock()) def new_function(*args): old_function(*args) with lock: lock.notify_all() old_function = prepare_threads(new_function) try: wait_threads(threads, lock, timeout) finally: reset_threads(old_function) return filter(lambda t: not t.is_alive(), threads) def prepare_threads(new_function): """Replaces threading._get_ident() function in order to notify the waiting Condition.""" with _waitforthreads_lock: if hasattr(threading, 'get_ident'): old_function = threading.get_ident threading.get_ident = new_function else: old_function = threading._get_ident threading._get_ident = new_function return old_function def wait_threads(threads, lock, timeout): timestamp = time() time_left = lambda: timeout - (time() - timestamp) with lock: while not any(map(lambda t: not t.is_alive(), threads)): if timeout is None: lock.wait() elif time_left() > 0: lock.wait(time_left()) else: return def reset_threads(old_function): """Resets original threading._get_ident() function.""" with _waitforthreads_lock: if hasattr(threading, 'get_ident'): threading.get_ident = old_function else: threading._get_ident = old_function def new_method(self, *args): self._pebble_old_method(*args) with self._pebble_lock: self._pebble_lock.notify_all() PKxKGxI<==pebble/pool.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . import time from itertools import count from traceback import print_exc from collections import namedtuple try: from queue import Queue except ImportError: from Queue import Queue from .task import Task from .exceptions import PoolError, TimeoutError SLEEP_UNIT = 0.1 # Pool states STOPPED = 0 RUNNING = 1 CLOSED = 2 CREATED = 3 EXPIRED = 4 ERROR = 5 TaskParameters = namedtuple('TaskParameters', ('function', 'args', 'kwargs')) WorkerParameters = namedtuple('WorkerParameters', ('task_limit', 'initializer', 'initargs')) class BasePool(object): def __init__(self, workers, task_limit, queue_factory, initializer, initargs): self._context = PoolContext(workers, task_limit, queue_factory, initializer, initargs) self._loops = () def __enter__(self): return self def __exit__(self, *args): self.close() self.join() @property def active(self): self._update_pool_state() return self._context.state in (CLOSED, RUNNING) def close(self): """Closes the Pool preventing new tasks from being accepted. Pending tasks will be completed. """ self._context.state = CLOSED def stop(self): """Stops the pool without performing any pending task.""" self._context.state = STOPPED def join(self, timeout=None): """Joins the pool waiting until all workers exited. If *timeout* is set, it block until all workers are done or raises TimeoutError. """ if self._context.state == RUNNING: raise RuntimeError('The Pool is still running') if self._context.state == CLOSED: self._wait_queue_depletion(timeout) self.stop() self.join() else: for loop in self._loops: loop.join() self._stop_pool() def _wait_queue_depletion(self, timeout): tick = time.time() while self.active: if timeout is not None and time.time() - tick > timeout: raise TimeoutError("Tasks are still being executed") elif self._context.task_queue.unfinished_tasks: time.sleep(SLEEP_UNIT) else: return raise PoolError() def schedule(self, function, args=(), kwargs={}, identifier=None, callback=None, timeout=0): """Schedules *function* to be run the Pool. *args* and *kwargs* will be forwareded to the scheduled function respectively as arguments and keyword arguments. If *callback* is a callable it will be executed once the function execution has completed with the returned *Task* as a parameter. *timeout* is an integer, if expires the task will be terminated and *Task.get()* will raise *TimeoutError*. The *identifier* value will be forwarded to the *Task.id* attribute. A *Task* object is returned. """ metadata = TaskParameters(function, args, kwargs) return self._schedule_task(callback, timeout, identifier, metadata) def _schedule_task(self, callback, timeout, identifier, metadata): self._check_pool_state() task = Task(next(self._context.task_counter), callback=callback, timeout=timeout, identifier=identifier, metadata=metadata) self._context.task_queue.put(task) return task def _check_pool_state(self): self._update_pool_state() if self._context.state == ERROR: raise PoolError('Unexpected error within the Pool') elif self._context.state != RUNNING: raise RuntimeError('The Pool is not running') def _update_pool_state(self): if self._context.state == CREATED: self._start_pool() else: for loop in self._loops: if not loop.is_alive(): self._context.state = ERROR def _start_pool(self): raise NotImplementedError("Not implemented") def _stop_pool(self): raise NotImplementedError("Not implemented") class PoolContext(object): def __init__(self, workers, task_limit, queue_factory, initializer, initargs): self.state = CREATED self.workers = workers self.task_counter = count() self.task_queue = create_queue(queue_factory) self.worker_parameters = WorkerParameters(task_limit, initializer, initargs) @property def alive(self): return self.state not in (ERROR, STOPPED) def create_queue(queue_factory): if queue_factory is not None: return queue_factory() else: return Queue() def run_initializer(initializer, initargs): try: initializer(*initargs) return True except Exception: print_exc() return False def task_limit_reached(counter, task_limit): return task_limit > 0 and next(counter) >= task_limit PKtJG`pebble/exceptions.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . class PebbleError(Exception): """Pebble base exception.""" pass class PoolError(PebbleError): """Raised if an error occurred within the Pool.""" pass class TaskCancelled(PebbleError): """Raised if get is called on a cancelled task.""" pass class ChannelError(PebbleError): """Error occurring within the process channel.""" pass class TimeoutError(PebbleError): """Raised when a timeout expires.""" def __init__(self, msg, value=0): super(TimeoutError, self).__init__(msg) self.timeout = value class ProcessExpired(PebbleError): """Raised when process dies unexpectedly.""" def __init__(self, msg, code=0): super(ProcessExpired, self).__init__(msg) self.exitcode = code PK%CG|QN88pebble/task.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . # Pebble's generic objects import threading from traceback import print_exc from .exceptions import TimeoutError, TaskCancelled class Task(object): """Handler to the ongoing task.""" def __init__(self, task_nr, callback=None, timeout=None, identifier=None, metadata=None): self.id = identifier self._timeout = timeout self._number = task_nr self._ready = False self._cancelled = False self._results = None self._task_ready = threading.Condition(threading.Lock()) self._timestamp = 0 self._callback = callback self._metadata = metadata def __str__(self): return self.__repr__() def __repr__(self): return "%s (Task-%d, %s)" % (self.__class__, self.number, self.id) @property def number(self): return self._number @property def ready(self): return self._ready @property def cancelled(self): return self._cancelled @property def started(self): return self._timestamp > 0 and True or False @property def timeout(self): return self._timeout @property def success(self): return (self._ready and not isinstance(self._results, BaseException) or False) def get(self, timeout=None, cancel=False): """Retrieves the produced results, blocks until results are ready. If the executed code raised an error it will be re-raised. If *timeout* is set the call will block until the timeout expires raising *TimeoutError" if results are not yet available. If *cancel* is True while *timeout* is set *Task* will be cancelled once the timeout expires. """ if self.wait(timeout=timeout): if (isinstance(self._results, BaseException)): raise self._results else: return self._results else: if cancel: self.cancel() raise TimeoutError("Task is still running") def wait(self, timeout=None): """Waits until results are ready. If *timeout* is set the call will block until the timeout expires. Returns *True* if results are ready, *False* if timeout expired. """ with self._task_ready: if not self._ready: self._task_ready.wait(timeout) return self._ready def cancel(self): """Cancels the Task.""" self._cancelled = True self.set_results(TaskCancelled("Task cancelled")) def set_results(self, results): """Sets the results within the task and run the installed callback. This function is meant for testing and internal use. """ self._set(results) self._run_callback() def _set(self, results): with self._task_ready: if not self._ready: self._ready = True self._results = results self._task_ready.notify_all() def _run_callback(self): if self._callback is not None: try: self._callback(self) except Exception: print_exc() PK%CG\=<pebble/thread/__init__.py__all__ = ['concurrent', 'spawn', 'Pool'] from pebble.thread.pool import Pool from pebble.thread.decorators import spawn, concurrent PK%CGk> > pebble/thread/decorators.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . from itertools import count from threading import Thread from pebble.task import Task from pebble.thread.utils import decorate from pebble.utils import execute, function_handler _task_counter = count() def spawn(*args, **kwargs): """Spawns a new thread and runs a function within it. *target* is the desired function to be run with the given *args* and *kwargs* parameters; if *daemon* is True, the thread will be stopped if the parent exits (default False). *name* is a string, if assigned will be given to the thread. The *spawn* function works as well as a decorator. Returns the Thread object which is running the *target* function or decorated one. .. note: The decorator accepts the keywords *daemon* and *name* only. If *target* keyword is not specified, the function will act as a decorator. """ return function_handler(launch_thread, decorate, *args, **kwargs) def launch_thread(target, name=None, daemon=False, args=(), kwargs={}): """Launches the target function within a thread.""" thread = Thread(target=target, name=name, args=args, kwargs=kwargs) thread.daemon = daemon thread.start() return thread def concurrent(*args, **kwargs): """Runs the given function in a concurrent thread, taking care of the results and error management. *target* is the desired function to be run with the given *args* and *kwargs* parameters; if *timeout* is set, the thread will be stopped once expired returning TimeoutError as results. If a *callback* is passed, it will be run after the job has finished with the returned *Task* as parameter. The *concurrent* function works as well as a decorator. Returns a *Task* object. .. note: The decorator accepts the keywords *timeout* and *callback* only. If *target* keyword is not specified, the function will act as a decorator. """ return function_handler(launch_task, decorate, *args, **kwargs) def launch_task(function, callback=None, identifier=None, args=(), kwargs={}): """Wraps the target function within a Task and executes it in a separate thread. """ metadata = {'function': function, 'args': args, 'kwargs': kwargs} task = Task(next(_task_counter), callback=callback, metadata=metadata, identifier=identifier) task_worker(task) return task @spawn(daemon=True) def task_worker(task): """Runs the actual function in separate thread.""" function = task._metadata['function'] args = task._metadata['args'] kwargs = task._metadata['kwargs'] results = execute(function, args, kwargs) task.set_results(results) PK%CG+LeLLpebble/thread/utils.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . from functools import wraps def decorate(function, launcher, **properties): """Decorates the given function. *function* represent the target function to be decorated, *launcher* takes care of executing the function with the given decoration *properties*. """ @wraps(function) def wrapper(*args, **kwargs): return launcher(function, args=args, kwargs=kwargs, **properties) return wrapper PKxKG6\ggpebble/thread/pool.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . import time from itertools import count from pebble.utils import execute from pebble.thread.decorators import spawn from pebble.pool import ERROR, RUNNING, SLEEP_UNIT from pebble.pool import BasePool, run_initializer, task_limit_reached class Pool(BasePool): """Allows to schedule jobs within a Pool of Threads. workers is an integer representing the amount of desired thread workers managed by the pool. If worker_task_limit is a number greater than zero, each worker will be restarted after performing an equal amount of tasks. The queue_factory callable allows to replace the internal task buffer of the Pool with a custom one. The callable must return a thread safe object exposing the same interface of the standard Python Queue. initializer must be callable, if passed, it will be called every time a worker is started, receiving initargs as arguments. """ def __init__(self, workers=1, task_limit=0, queue_factory=None, initializer=None, initargs=()): super(Pool, self).__init__(workers, task_limit, queue_factory, initializer, initargs) self._pool_manager = PoolManager(self._context) def _start_pool(self): self._pool_manager.start() self._loops = (pool_manager_loop(self._pool_manager),) self._context.state = RUNNING def _stop_pool(self): self._pool_manager.stop() @spawn(daemon=True, name='pool_manager') def pool_manager_loop(pool_manager): context = pool_manager.context while context.alive: pool_manager.update_status() time.sleep(SLEEP_UNIT) class PoolManager(object): def __init__(self, context): self.context = context self.workers = [] def start(self): self.create_workers() def stop(self): for _ in self.workers: self.context.task_queue.put(None) for worker in tuple(self.workers): self.join_worker(worker) def update_status(self): expired = self.inspect_workers() for worker in expired: self.join_worker(worker) self.create_workers() def inspect_workers(self): return tuple(w for w in self.workers if not w.is_alive()) def create_workers(self): for _ in range(self.context.workers - len(self.workers)): self.workers.append(worker_thread(self.context)) def join_worker(self, worker): worker.join() self.workers.remove(worker) @spawn(name='worker_thread', daemon=True) def worker_thread(context): """The worker thread routines.""" parameters = context.worker_parameters task_limit = parameters.task_limit if parameters.initializer is not None: if not run_initializer(parameters.initializer, parameters.initargs): context.state = ERROR return for task in get_next_task(context, task_limit): execute_next_task(task) context.task_queue.task_done() def get_next_task(context, task_limit): counter = count() queue = context.task_queue while context.alive and not task_limit_reached(counter, task_limit): task = queue.get() if task is not None and not task.cancelled: yield task else: queue.task_done() def execute_next_task(task): parameters = task._metadata task._timestamp = time.time() results = execute(parameters.function, parameters.args, parameters.kwargs) task.set_results(results) PK%CGb@Apebble/process/__init__.py__all__ = ['concurrent', 'spawn', 'Pool'] from pebble.process.pool import Pool from pebble.process.decorators import spawn, concurrent PK%CGLuupebble/process/decorators.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . from time import time from itertools import count from multiprocessing import Pipe, Process from pebble import thread from pebble.task import Task from pebble.exceptions import ProcessExpired from pebble.utils import execute, function_handler from pebble.process.utils import stop, send_results, get_results, decorate _task_counter = count() def spawn(*args, **kwargs): """Spawns a new process and runs a function within it. *target* is the desired function to be run with the given *args* and *kwargs* parameters; if *daemon* is True, the process will be stopped if the parent exits (default False). *name* is a string, if assigned will be given to the process. The *spawn* function works as well as a decorator. Returns the Process object which is running the *target* function or decorated one. .. note: The decorator accepts the keywords *daemon* and *name* only. If *target* keyword is not specified, the function will act as a decorator. """ return function_handler(launch_process, decorate, *args, **kwargs) def launch_process(target, name=None, daemon=False, args=(), kwargs={}): """Launches the target function within a process.""" process = Process(target=target, name=name, args=args, kwargs=kwargs) process.daemon = daemon process.start() return process def concurrent(*args, **kwargs): """Runs the given function in a concurrent process, taking care of the results and error management. *target* is the desired function to be run with the given *args* and *kwargs* parameters; if *timeout* is set, the process will be stopped once expired returning TimeoutError as results. If a *callback* is passed, it will be run after the job has finished with the returned *Task* as parameter. The *concurrent* function works as well as a decorator. Returns a *Task* object. .. note: The decorator accepts the keywords *timeout* and *callback* only. If *target* keyword is not specified, the function will act as a decorator. """ return function_handler(launch_task, decorate, *args, **kwargs) def launch_task(target, timeout=None, callback=None, identifier=None, args=(), kwargs={}): """Wraps the target function within a Task and executes it in a separate process. """ reader, writer = Pipe(duplex=False) worker = task_worker(writer, target, args, kwargs) writer.close() task = ProcessTask(next(_task_counter), worker, callback=callback, timeout=timeout, identifier=identifier) task_manager(task, reader) return task @spawn(daemon=True) def task_worker(pipe, function, args, kwargs): """Runs the actual function in separate process.""" results = execute(function, args, kwargs) send_results(pipe, results) @thread.spawn(daemon=True) def task_manager(task, pipe): """Task's lifecycle manager. Waits for the *Task* to be performed, collects results, runs the callback and cleans up the process. """ worker = task._worker results = get_results(pipe, task.timeout) if isinstance(results, ProcessExpired): results.exitcode = worker.exitcode task.set_results(results) if worker.is_alive(): stop(worker) class ProcessTask(Task): """Extends the *Task* object to support *process* decorator.""" def __init__(self, task_nr, worker, **kwargs): super(ProcessTask, self).__init__(task_nr, **kwargs) self._worker = worker self._timestamp = time() def cancel(self): """Overrides the *Task* cancel method in order to signal it to the *process* decorator handler.""" super(ProcessTask, self).cancel() stop(self._worker) PK%CGJK<<pebble/process/utils.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . import os import sys from select import select from functools import wraps from traceback import format_exc try: # Python 2 from cPickle import PicklingError except: # Python 3 from pickle import PicklingError if os.name in ('posix', 'os2'): from signal import SIGKILL from pebble.exceptions import TimeoutError, ProcessExpired _registered_functions = {} def stop(worker): """Does its best to stop the worker.""" worker.terminate() worker.join(3) if worker.is_alive() and os.name != 'nt': try: os.kill(worker.pid, SIGKILL) worker.join() except OSError: return if worker.is_alive(): raise RuntimeError("Unable to terminate PID %d" % os.getpid()) def get_results(pipe, timeout): """Waits for results and handles communication errors.""" try: if poll(pipe, timeout): return pipe.recv() else: return TimeoutError('Task Timeout', timeout) except (EnvironmentError, EOFError): return ProcessExpired('Abnormal termination') except Exception as error: return error def poll(pipe, timeout): """Python's Pipe.poll blocks undefinitely if data is too big.""" if os.name != 'nt': return select([pipe], [], [], timeout)[0] and True or False else: return pipe.poll(timeout) def send_results(pipe, data): """Send results and handles communication errors.""" try: pipe.send(data) except PicklingError as error: error.traceback = format_exc() pipe.send(error) def decorate(function, launcher, **properties): """Decorates the given function taking care of Windows process decoration issues. *function* represent the target function to be decorated, *launcher* takes care of executing the function with the given decoration *properties*. """ if os.name == 'nt': register_function(function) @wraps(function) def wrapper(*args, **kwargs): if os.name == 'nt': target, args = dump_function(function, args) else: target = function return launcher(target, args=args, kwargs=kwargs, **properties) return wrapper def register_function(function): global _registered_functions _registered_functions[function.__name__] = function def dump_function(function, args): """Dumps a decorated function.""" args = [function.__name__, function.__module__] + list(args) return trampoline, args def trampoline(name, module, *args, **kwargs): """Trampoline function for decorators. Lookups the function between the registered ones; if not found, forces its registering and then executes it. """ function = function_lookup(name, module) return function(*args, **kwargs) def function_lookup(name, module): """Searches the function between the registered ones. If not found, it imports the module forcing its registration. """ try: return _registered_functions[name] except KeyError: # force function registering __import__(module) mod = sys.modules[module] getattr(mod, name) return _registered_functions[name] PKtJGr??pebble/process/channel.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . import os from select import select from contextlib import contextmanager from multiprocessing import RLock, Pipe from pebble.exceptions import ChannelError LOCK_TIMEOUT = 60 def channels(): read0, write0 = Pipe(duplex=False) read1, write1 = Pipe(duplex=False) return Channel(read1, write0), WorkerChannel(read0, write1) class Channel(object): def __init__(self, reader, writer): self.reader = reader self.writer = writer self.poll = self._make_poll_method() def _make_poll_method(self): def unix_poll(timeout=None): return select([self.reader], [], [], timeout)[0] and True or False def windows_poll(timeout=None): return self.reader.poll(timeout) return os.name != 'nt' and unix_poll or windows_poll def recv(self): return self.reader.recv() def send(self, obj): return self.writer.send(obj) class WorkerChannel(Channel): def __init__(self, reader, writer): super(WorkerChannel, self).__init__(reader, writer) self.mutex = ChannelMutex() self.recv = self._make_recv_method() self.send = self._make_send_method() def __getstate__(self): return self.reader, self.writer, self.mutex def __setstate__(self, state): self.reader, self.writer, self.mutex = state self.poll = self._make_poll_method() self.recv = self._make_recv_method() self.send = self._make_send_method() def _make_recv_method(self): def recv(): with self.mutex.reader: return self.reader.recv() return recv def _make_send_method(self): def unix_send(obj): with self.mutex.writer: return self.writer.send(obj) def windows_send(obj): return self.writer.send(obj) return os.name != 'nt' and unix_send or windows_send @property @contextmanager def lock(self): with self.mutex: yield self class ChannelMutex(object): def __init__(self): self.reader_mutex = RLock() self.writer_mutex = os.name != 'nt' and RLock() or None self.acquire = self._make_acquire_method() self.release = self._make_release_method() def __getstate__(self): return self.reader_mutex, self.writer_mutex def __setstate__(self, state): self.reader_mutex, self.writer_mutex = state self.acquire = self._make_acquire_method() self.release = self._make_release_method() def __enter__(self): if self.acquire(): return self else: raise ChannelError("Channel mutex time out") def __exit__(self, *_): self.release() def _make_acquire_method(self): def unix_acquire(): return (self.reader_mutex.acquire(timeout=LOCK_TIMEOUT) and self.writer_mutex.acquire(timeout=LOCK_TIMEOUT)) def windows_acquire(): return self.reader_mutex.acquire(timeout=LOCK_TIMEOUT) return os.name != 'nt' and unix_acquire or windows_acquire def _make_release_method(self): def unix_release(): self.reader_mutex.release() self.writer_mutex.release() def windows_release(): self.reader_mutex.release() return os.name != 'nt' and unix_release or windows_release @property @contextmanager def reader(self): if self.reader_mutex.acquire(timeout=LOCK_TIMEOUT): try: yield self finally: self.reader_mutex.release() else: raise ChannelError("Channel mutex time out") @property @contextmanager def writer(self): if self.writer_mutex.acquire(timeout=LOCK_TIMEOUT): try: yield self finally: self.writer_mutex.release() else: raise ChannelError("Channel mutex time out") PKnxKG,T++pebble/process/pool.py# This file is part of Pebble. # Pebble is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License # as published by the Free Software Foundation, # either version 3 of the License, or (at your option) any later version. # Pebble is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # You should have received a copy of the GNU Lesser General Public License # along with Pebble. If not, see . import os import time from itertools import count from collections import namedtuple from signal import SIG_IGN, SIGINT, signal from pebble import thread from pebble.utils import execute from pebble.pool import RUNNING, SLEEP_UNIT from pebble.pool import BasePool, run_initializer, task_limit_reached from pebble.process.channel import channels from pebble.process.decorators import spawn from pebble.process.utils import stop, send_results from pebble.exceptions import ChannelError, PoolError from pebble.exceptions import TimeoutError, TaskCancelled, ProcessExpired NoMessage = namedtuple('NoMessage', ()) NewTask = namedtuple('NewTask', ('id', 'payload')) Results = namedtuple('Results', ('task', 'results')) AssignedWorker = namedtuple('AssignedWorker', ('pid', )) Acknowledgement = namedtuple('Acknowledgement', ('worker', 'task')) class Pool(BasePool): """Allows to schedule jobs within a Pool of Processes. workers is an integer representing the amount of desired process workers managed by the pool. If worker_task_limit is a number greater than zero, each worker will be restarted after performing an equal amount of tasks. The queue_factory callable allows to replace the internal task buffer of the Pool with a custom one. The callable must return a thread safe object exposing the same interface of the standard Python Queue. initializer must be callable, if passed, it will be called every time a worker is started, receiving initargs as arguments. """ def __init__(self, workers=1, task_limit=0, queue_factory=None, initializer=None, initargs=()): super(Pool, self).__init__(workers, task_limit, queue_factory, initializer, initargs) self._pool_manager = PoolManager(self._context) def _start_pool(self): self._pool_manager.start() self._loops = (task_scheduler_loop(self._pool_manager), pool_manager_loop(self._pool_manager), message_manager_loop(self._pool_manager)) self._context.state = RUNNING def _stop_pool(self): self._pool_manager.stop() def stop(self): """Stops the pool without performing any pending task.""" super(Pool, self).stop() self._context.task_queue.put(None) @thread.spawn(daemon=True, name='task_scheduler') def task_scheduler_loop(pool_manager): for task in pool_get_next_task(pool_manager): pool_manager.schedule(task) def pool_get_next_task(pool_manager): context = pool_manager.context task_queue = context.task_queue while context.alive: task = task_queue.get() if task is not None and not task.cancelled: yield task else: task_queue.task_done() @thread.spawn(daemon=True, name='pool_manager') def pool_manager_loop(pool_manager): context = pool_manager.context while context.alive: pool_manager.update_status() time.sleep(SLEEP_UNIT) @thread.spawn(daemon=True, name='message_manager') def message_manager_loop(pool_manager): for message in get_next_message(pool_manager): pool_manager.process_message(message) def get_next_message(pool_manager): context = pool_manager.context channel = pool_manager.worker_manager.pool_channel while context.alive: if channel.poll(SLEEP_UNIT): yield channel.recv() else: yield NoMessage() class PoolManager(object): """Combines Task and Worker Managers providing a higher level one.""" def __init__(self, context): self.context = context self.task_manager = TaskManager(context.task_queue.task_done) self.worker_manager = WorkerManager(context.workers, context.worker_parameters) def start(self): self.worker_manager.create_workers() def stop(self): self.worker_manager.stop_workers() def schedule(self, task): """Schedules a new Task in the PoolManager.""" self.task_manager.register(task) self.worker_manager.dispatch(task) def process_message(self, message): """Processes a message coming from the workers.""" if isinstance(message, Acknowledgement): self.task_manager.task_start(message.task, message.worker) elif isinstance(message, Results): self.task_manager.task_done(message.task, message.results) def update_status(self): self.update_tasks() self.update_workers() def update_tasks(self): """Handles timing out and cancelled Tasks.""" timeout, cancelled = self.task_manager.inspect_tasks() for task in timeout: self.task_manager.task_done(task.number, TimeoutError('Timeout')) for task in cancelled: self.task_manager.task_done(task.number, TaskCancelled('Cancelled')) for worker in (t._metadata for t in timeout + cancelled): self.worker_manager.stop_worker(worker.pid) def update_workers(self): """Handles unexpected processes termination.""" for expiration in self.worker_manager.inspect_workers(): self.handle_worker_expiration(expiration) self.worker_manager.create_workers() def handle_worker_expiration(self, expiration): worker_id, exitcode = expiration try: task = self.find_expired_task(worker_id) except LookupError: return else: error = ProcessExpired('Abnormal termination', code=exitcode) self.task_manager.task_done(task.number, error) def find_expired_task(self, worker_id): running_tasks = [t for t in tuple(self.task_manager.tasks.values()) if isinstance(t._metadata, AssignedWorker)] if running_tasks: return worker_lookup(running_tasks, worker_id) else: raise PoolError("All workers are dead") class TaskManager(object): """Manages the tasks flow within the Pool. Tasks are registered, acknowledged and completed. Timing out and cancelled tasks are handled as well. """ def __init__(self, task_done_callback): self.tasks = {} self.task_done_callback = task_done_callback def register(self, task): self.tasks[task.number] = task def task_start(self, task_id, worker_id): task = self.tasks[task_id] task._metadata = AssignedWorker(worker_id) task._timestamp = time.time() def task_done(self, task_id, results): """Set the tasks results and run the callback.""" try: task = self.tasks.pop(task_id) except KeyError: return # results of previously timeout/cancelled task else: task.set_results(results) self.task_done_callback() def inspect_tasks(self): """Updates the tasks status. Returns the tasks which have been cancelled or timeod out. """ tasks = tuple(self.tasks.values()) return (tuple(t for t in tasks if self.has_timeout(t)), tuple(t for t in tasks if t.started and t.cancelled)) @staticmethod def has_timeout(task): if task.timeout and task.started: return time.time() - task._timestamp > task.timeout else: return False class WorkerManager(object): """Manages the workers related mechanics within the Pool. Maintains the workers active and encapsulates their communication logic. """ def __init__(self, workers, worker_parameters): self.workers = {} self.workers_number = workers self.worker_parameters = worker_parameters self.pool_channel, self.workers_channel = channels() def dispatch(self, task): self.pool_channel.send(NewTask(task.number, task._metadata)) def inspect_workers(self): """Updates the workers status. Returns the workers which have unexpectedly ended. """ expired = tuple(w for w in self.workers.values() if not w.is_alive()) for worker in expired: self.workers.pop(worker.pid) return ((w.pid, w.exitcode) for w in expired if w.exitcode != 0) def create_workers(self): for _ in range(self.workers_number - len(self.workers)): self.new_worker() def stop_workers(self): for worker_id in tuple(self.workers.keys()): self.stop_worker(worker_id) def new_worker(self): worker = worker_process(self.worker_parameters, self.workers_channel) self.workers[worker.pid] = worker def stop_worker(self, worker_id): try: with self.workers_channel.lock: stop(self.workers.pop(worker_id)) except KeyError: return # worker already expired except ChannelError as error: raise PoolError(error) @spawn(name='worker_process', daemon=True) def worker_process(params, channel): """The worker process routines.""" signal(SIGINT, SIG_IGN) if params.initializer is not None: if not run_initializer(params.initializer, params.initargs): os._exit(1) try: for task in worker_get_next_task(channel, params.task_limit): payload = task.payload results = execute(payload.function, payload.args, payload.kwargs) send_results(channel, Results(task.id, results)) except (EOFError, EnvironmentError) as error: os._exit(error.errno) def worker_get_next_task(channel, task_limit): counter = count() while not task_limit_reached(counter, task_limit): yield fetch_task(channel) def fetch_task(channel): while channel.poll(): try: return task_transaction(channel) except RuntimeError: continue # another worker got the task def task_transaction(channel): """Ensures a task is fetched and acknowledged atomically.""" with channel.lock: if channel.poll(0): task = channel.recv() channel.send(Acknowledgement(os.getpid(), task.id)) else: raise RuntimeError("Race condition between workers") return task def worker_lookup(running_tasks, worker_id): for task in running_tasks: assigned_worker = task._metadata if assigned_worker.pid == worker_id: return task raise LookupError("Not found") PKzKGG[5'Pebble-3.1.14.dist-info/DESCRIPTION.rstPebble ====== Description ----------- Pebble provides a neat API to manage threads and processes within an application. Examples -------- Spawn a function within a thread:: from pebble import thread def function(foo, bar=0): print foo + bar thrd = thread.spawn(target=function, args=[1], kwargs={'bar':2}) thrd.join() Most of the functions work as well as decorators:: from pebble import process @process.spawn(daemon=True) def function(foo, bar=0): print(foo + bar) proc = function(1, bar=2) proc.join() Run a job in a separate process and wait for its results:: from pebble import process @process.concurrent def function(foo, bar=0): return foo + bar task = function(1, bar=2) results = task.get() # blocks until results are ready Pools allow to execute several tasks without the need of spawning a new worker for each one of them:: from threading import current_thread from pebble import thread def task_done(task): results, thread_id = task.get() print "Task %s returned %d from thread %s" % (task.id, results, thread_id) def do_job(foo, bar=0): return foo + bar, current_thread().ident with thread.Pool(workers=5) as pool: for i in range(0, 10): pool.schedule(do_job, args=(i, ), callback=task_done) Check the documentation for more examples. Pebble 4: * use Futures instead of Tasks * move callback and timeout assignments to Futures * merge with concurrent.futures and asyncio modules API PKzKGdBB%Pebble-3.1.14.dist-info/metadata.json{"classifiers": ["Programming Language :: Python", "Programming Language :: Python :: 3", "Development Status :: 5 - Production/Stable", "Intended Audience :: Developers", "Operating System :: OS Independent", "Topic :: Software Development :: Libraries :: Python Modules", "License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)"], "extensions": {"python.details": {"contacts": [{"email": "noxdafox@gmail.com", "name": "Matteo Cafasso", "role": "author"}], "document_names": {"description": "DESCRIPTION.rst"}, "project_urls": {"Home": "https://github.com/noxdafox/pebble"}}}, "generator": "bdist_wheel (0.26.0)", "keywords": ["thread", "process", "pool", "decorator"], "license": "LGPL", "metadata_version": "2.0", "name": "Pebble", "summary": "Threading and multiprocessing eye-candy.", "version": "3.1.14"}PKzKG#Q %Pebble-3.1.14.dist-info/top_level.txtpebble tests PKzKGndnnPebble-3.1.14.dist-info/WHEELWheel-Version: 1.0 Generator: bdist_wheel (0.26.0) Root-Is-Purelib: true Tag: py2-none-any Tag: py3-none-any PKzKGƨj. . Pebble-3.1.14.dist-info/METADATAMetadata-Version: 2.0 Name: Pebble Version: 3.1.14 Summary: Threading and multiprocessing eye-candy. Home-page: https://github.com/noxdafox/pebble Author: Matteo Cafasso Author-email: noxdafox@gmail.com License: LGPL Keywords: thread process pool decorator Platform: UNKNOWN Classifier: Programming Language :: Python Classifier: Programming Language :: Python :: 3 Classifier: Development Status :: 5 - Production/Stable Classifier: Intended Audience :: Developers Classifier: Operating System :: OS Independent Classifier: Topic :: Software Development :: Libraries :: Python Modules Classifier: License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL) Pebble ====== Description ----------- Pebble provides a neat API to manage threads and processes within an application. Examples -------- Spawn a function within a thread:: from pebble import thread def function(foo, bar=0): print foo + bar thrd = thread.spawn(target=function, args=[1], kwargs={'bar':2}) thrd.join() Most of the functions work as well as decorators:: from pebble import process @process.spawn(daemon=True) def function(foo, bar=0): print(foo + bar) proc = function(1, bar=2) proc.join() Run a job in a separate process and wait for its results:: from pebble import process @process.concurrent def function(foo, bar=0): return foo + bar task = function(1, bar=2) results = task.get() # blocks until results are ready Pools allow to execute several tasks without the need of spawning a new worker for each one of them:: from threading import current_thread from pebble import thread def task_done(task): results, thread_id = task.get() print "Task %s returned %d from thread %s" % (task.id, results, thread_id) def do_job(foo, bar=0): return foo + bar, current_thread().ident with thread.Pool(workers=5) as pool: for i in range(0, 10): pool.schedule(do_job, args=(i, ), callback=task_done) Check the documentation for more examples. Pebble 4: * use Futures instead of Tasks * move callback and timeout assignments to Futures * merge with concurrent.futures and asyncio modules API PKzKG z z Pebble-3.1.14.dist-info/RECORDPebble-3.1.14.dist-info/DESCRIPTION.rst,sha256=j9aM4ilAYandrlvvoNBPf5dw99nKyImafsHQR90hbTM,1672 Pebble-3.1.14.dist-info/METADATA,sha256=XZ2qIWiDRlWI4nXU75MYOfCypDxOLNRQlOLK8PO86-o,2350 Pebble-3.1.14.dist-info/RECORD,, Pebble-3.1.14.dist-info/WHEEL,sha256=GrqQvamwgBV4nLoJe0vhYRSWzWsx7xjlt74FT0SWYfE,110 Pebble-3.1.14.dist-info/metadata.json,sha256=T_u2Js_x50RdoH8gjsme042ehQP4e0QOZ5w5GGTU7Lk,834 Pebble-3.1.14.dist-info/top_level.txt,sha256=wKFP-iu-JM5N1064sO5wvmjm3c5heV4ygkBcUDNEShA,13 pebble/__init__.py,sha256=jdEa_BcfCAAL0mB5TczAyr5tLdd7don4rn5AsaYVcwQ,631 pebble/decorators.py,sha256=VNtyRMQw5zaGCjv30Sr-PIR0QC3JiVPVvCkpAZMTLNQ,2288 pebble/exceptions.py,sha256=Pcz9aM0AMNkTf-nOz1qBtx093uSxVcg7anlBSWXgTe4,1428 pebble/functions.py,sha256=o3VAQSQNI_6SU7TKrBu0FwWZVSXQ_Zcco_Dn3EDjWiE,5408 pebble/pool.py,sha256=D-NjUQ3xnWYqZDG-S3DqDos-krVD9la6Ho_xVTGaTTk,5949 pebble/task.py,sha256=sjLQkxfwEzA-kuhuhyqp1x2s8Qr39-3Zu7dKzs8U8Q4,3896 pebble/utils.py,sha256=n9-qCm-PDFO9a4iepK565UXvsP_r78w38YB8NBV9GEY,2448 pebble/process/__init__.py,sha256=_tRWxaojn_FaNASHKjAvg6M06TFCJufLwhSw6-lYXJw,136 pebble/process/channel.py,sha256=SA6U4UGC_S00eUNg-aOZLRf6QLQj0b4ieTsfaRgBRm0,4671 pebble/process/decorators.py,sha256=dC-jrBYGhG38HtrfKL5C_mK8GbZ9d3M334YTBNglCqQ,4469 pebble/process/pool.py,sha256=mo3NJeKsMHhHV5VnI7uSq3C9zySmlbXc0brGgCwbrsE,11191 pebble/process/utils.py,sha256=evBWYbdpW6M0IqcRRr90iCTNZW6xnuF1hMIPBZDU2ro,3900 pebble/thread/__init__.py,sha256=EDhfoa1Tykh3_1FCzu3nOyRVT3F6XN4QLrA7UQcW35A,134 pebble/thread/decorators.py,sha256=tHSiGMv-0O_BG6FVrNxBYEYJoUpewTcTKTgPWW_dC7o,3390 pebble/thread/pool.py,sha256=DSQ8PcjY0Yw2AZvIMbj11JERQod4fKlwTFRjxnpdtr4,4199 pebble/thread/utils.py,sha256=MQyyVrxqgCag75eWw6L8f8mTw17gzpIzuZb07mhIbOA,1100 tests/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 tests/test_pebble.py,sha256=aE8OXCj6-Ex-LHlKYivmA34WSUzKV0tSI4yqhNgyysk,11363 tests/test_process_concurrent.py,sha256=caAtfVW8Ylyb5eeLTtgKzAFG6Hc2mpEqcm0MF2ITcwY,7232 tests/test_process_pool.py,sha256=T5maNbwIHEt33Hi3i0BVMmkYNrsRx7Q9ki4seS7aUu8,13438 tests/test_process_spawn.py,sha256=oj6D_wSoL-3TAgVUj0l_B-3lISrxD8CyD-DFSisioZQ,3724 tests/test_thread_concurrent.py,sha256=Mn9Cv-Com9vpP9Rll8HlgSrt9O2oDgLYg5K4fbHystI,5421 tests/test_thread_pool.py,sha256=fVYKNO5WPE0fd6rv10N-i7cVAlHSJLDvpeM0reFQBkM,8494 tests/test_thread_spawn.py,sha256=TpsbwbHEZPGgfakses7LDwwqynN4RcKhE4EaGke6N_w,3285 PK%CGp0Xc,c,tests/test_pebble.pyPK%CG ,tests/test_thread_spawn.pyPK%CG9tests/__init__.pyPKuKGW~4~49tests/test_process_pool.pyPK%CG|۳@@ ntests/test_process_concurrent.pyPKuKG:j@.!.!tests/test_thread_pool.pyPK%CG# z.--jtests/test_thread_concurrent.pyPK%CG^V/tests/test_process_spawn.pyPK%CG wwpebble/__init__.pyPK%CG9@pebble/decorators.pyPK%CG bpebble/utils.pyPK%CG}*  pebble/functions.pyPKxKGxI<==ppebble/pool.pyPKtJG`pebble/exceptions.pyPK%CG|QN88pebble/task.pyPK%CG\=<(pebble/thread/__init__.pyPK%CGk> > (pebble/thread/decorators.pyPK%CG+LeLL76pebble/thread/utils.pyPKxKG6\gg:pebble/thread/pool.pyPK%CGb@AQKpebble/process/__init__.pyPK%CGLuuLpebble/process/decorators.pyPK%CGJK<<]pebble/process/utils.pyPKtJGr??1mpebble/process/channel.pyPKnxKG,T++pebble/process/pool.pyPKzKGG[5'Pebble-3.1.14.dist-info/DESCRIPTION.rstPKzKGdBB%_Pebble-3.1.14.dist-info/metadata.jsonPKzKG#Q %Pebble-3.1.14.dist-info/top_level.txtPKzKGndnn4Pebble-3.1.14.dist-info/WHEELPKzKGƨj. . ݶPebble-3.1.14.dist-info/METADATAPKzKG z z IPebble-3.1.14.dist-info/RECORDPKO