PK %CGp0Xc, c, tests/test_pebble.pyimport os
import time
import signal
import unittest
import threading
try: # Python 2
from Queue import Queue
except: # Python 3
from queue import Queue
from pebble import decorators
from pebble import synchronized, sighandler, thread
from pebble import waitfortasks, waitforthreads, waitforqueues
from pebble import Task, TimeoutError, TaskCancelled
results = 0
semaphore = threading.Semaphore()
@synchronized
def synchronized_function():
"""A docstring."""
return decorators._synchronized_lock.acquire(False)
@synchronized(semaphore)
def custom_synchronized_function():
"""A docstring."""
return semaphore.acquire(False)
try:
from signal import SIGALRM, SIGFPE, SIGIO
@sighandler(SIGALRM)
def signal_handler(signum, frame):
"""A docstring."""
global results
results = 1
@sighandler((SIGFPE, SIGIO))
def signals_handler(signum, frame):
pass
except ImportError:
pass
@thread.spawn
def thread_function(value):
time.sleep(value)
return value
@thread.spawn
def queue_function(queues, index, value):
time.sleep(value)
queues[index].put(value)
return value
@thread.concurrent
def concurrent_function(value):
time.sleep(value)
return value
@thread.spawn
def spurious_wakeup_function(value, lock):
value = value / 2
time.sleep(value)
lock.acquire()
time.sleep(value)
return value
class TestSynchronizedDecorator(unittest.TestCase):
def test_wrapper_decorator_docstring(self):
"""Synchronized docstring of the original function is preserved."""
self.assertEqual(synchronized_function.__doc__, "A docstring.")
def test_syncronized_locked(self):
"""Synchronized Lock is acquired
during execution of decorated function."""
self.assertFalse(synchronized_function())
def test_syncronized_released(self):
"""Synchronized Lock is released
during execution of decorated function."""
synchronized_function()
self.assertTrue(decorators._synchronized_lock.acquire(False))
decorators._synchronized_lock.release()
def test_custom_syncronized_locked(self):
"""Synchronized semaphore is acquired
during execution of decorated function."""
self.assertFalse(custom_synchronized_function())
def test_custom_syncronized_released(self):
"""Synchronized semaphore is acquired
during execution of decorated function."""
custom_synchronized_function()
self.assertTrue(semaphore.acquire(False))
semaphore.release()
class TestSigHandler(unittest.TestCase):
def test_wrapper_decorator_docstring(self):
"""Sighandler docstring of the original function is preserved."""
if os.name != 'nt':
self.assertEqual(signal_handler.__doc__, "A docstring.")
def test_sighandler(self):
"""Sighandler installs SIGALRM."""
if os.name != 'nt':
self.assertEqual(signal.getsignal(signal.SIGALRM).__name__,
signal_handler.__name__)
def test_sighandler_multiple(self):
"""Sighandler installs SIGFPE and SIGIO."""
if os.name != 'nt':
self.assertEqual(signal.getsignal(signal.SIGFPE).__name__,
signals_handler.__name__)
self.assertEqual(signal.getsignal(signal.SIGIO).__name__,
signals_handler.__name__)
def test_sigalarm_sighandler(self):
"""Sighandler for SIGALARM works."""
if os.name != 'nt':
os.kill(os.getpid(), signal.SIGALRM)
time.sleep(0.1)
self.assertEqual(results, 1)
class TestWaitForTasks(unittest.TestCase):
def test_waitfortasks_single(self):
"""Waitfortasks waits for a single task."""
task = concurrent_function(0.01)
self.assertEqual(list(waitfortasks([task]))[0], task)
def test_waitfortasks_multiple(self):
"""Waitfortasks waits for multiple tasks."""
tasks = []
for _ in range(5):
tasks.append(concurrent_function(0.01))
time.sleep(0.1)
self.assertEqual(list(waitfortasks(tasks)), tasks)
def test_waitfortasks_timeout(self):
"""Waitfortasks returns empty list if timeout."""
task = concurrent_function(0.1)
self.assertEqual(list(waitfortasks([task], timeout=0.01)), [])
def test_waitfortasks_restore(self):
"""Waitfortasks Task object is restored to original one."""
task = concurrent_function(0.01)
expected = sorted(dir(task))
waitfortasks([task])
self.assertEqual(sorted(dir(task)), expected)
class TestWaitForThreads(unittest.TestCase):
def test_waitforthreads_single(self):
"""Waitforthreads waits for a single thread."""
thread = thread_function(0.01)
self.assertEqual(list(waitforthreads([thread]))[0], thread)
def test_waitforthreads_multiple(self):
"""Waitforthreads waits for multiple threads."""
threads = []
for _ in range(5):
threads.append(thread_function(0.01))
time.sleep(0.1)
self.assertEqual(list(waitforthreads(threads)), threads)
def test_waitforthreads_timeout(self):
"""Waitforthreads returns empty list if timeout."""
thread = thread_function(0.1)
self.assertEqual(list(waitforthreads([thread], timeout=0.01)), [])
def test_waitforthreads_restore(self):
"""Waitforthreads get_ident is restored to original one."""
if hasattr(threading, 'get_ident'):
expected = threading.get_ident
else:
expected = threading._get_ident
thread = thread_function(0)
time.sleep(0.01)
waitforthreads([thread])
if hasattr(threading, 'get_ident'):
self.assertEqual(threading.get_ident, expected)
else:
self.assertEqual(threading._get_ident, expected)
def test_waitforthreads_spurious(self):
"""Waitforthreads tolerates spurious wakeups."""
lock = threading.RLock()
thread = spurious_wakeup_function(0.1, lock)
self.assertEqual(list(waitforthreads([thread])), [thread])
class TestWaitForQueues(unittest.TestCase):
def setUp(self):
self.queues = [Queue(), Queue(), Queue()]
def test_waitforqueues_single(self):
"""Waitforqueues waits for a single queue."""
queue_function(self.queues, 0, 0.01)
self.assertEqual(list(waitforqueues(self.queues))[0], self.queues[0])
def test_waitforqueues_multiple(self):
"""Waitforqueues waits for multiple queues."""
for index in range(3):
queue_function(self.queues, index, 0.01)
time.sleep(0.1)
self.assertEqual(list(waitforqueues(self.queues)), self.queues)
def test_waitforqueues_timeout(self):
"""Waitforqueues returns empty list if timeout."""
queue_function(self.queues, 0, 0.1)
self.assertEqual(list(waitforqueues(self.queues, timeout=0.01)), [])
def test_waitforqueues_restore(self):
"""Waitforqueues Queue object is restored to original one."""
expected = sorted(dir(self.queues[0]))
queue_function(self.queues, 0, 0)
waitforqueues(self.queues)
self.assertEqual(sorted(dir(self.queues[0])), expected)
class TestTask(unittest.TestCase):
def setUp(self):
self.task = Task(0)
def test_number(self):
"""Task number is reported correctly."""
t = Task(42)
self.assertEqual(t.number, 42)
def test_task_id(self):
"""Task ID is forwarded to it."""
t = Task(0, identifier='foo')
self.assertEqual(t.id, 'foo')
def test_ready(self):
"""Task is ready if results are seself.task."""
self.task._set(None)
self.assertTrue(self.task.ready)
def test_not_read(self):
"""Task is not ready if results are not seself.task."""
self.assertFalse(self.task.ready)
def test_cancelled(self):
"""Task is cancelled if cancel() is called."""
self.task.cancel()
self.assertTrue(self.task.cancelled)
def test_not_cancelled(self):
"""Task is not cancelled if cancel() is not called."""
self.assertFalse(self.task.cancelled)
def test_started(self):
"""Task is started if timestamp is self.task."""
self.task._timestamp = 42
self.assertTrue(self.task.started)
def test_not_started(self):
"""Task is not started if timestamp is not seself.task."""
self.assertFalse(self.task.started)
def test_success(self):
"""Task is successful if results are seself.task."""
self.task._set(42)
self.assertTrue(self.task.success)
def test_not_success(self):
"""Task is not successful if results are not seself.task."""
self.assertFalse(self.task.success)
def test_not_success_exception(self):
"""Task is not successful if results are an Exception."""
self.task._set(Exception("BOOM"))
self.assertFalse(self.task.success)
def test_wait(self):
"""Task wait returns True if results are ready."""
self.task._set(42)
self.assertTrue(self.task.wait())
def test_wait_no_timeout(self):
"""Task wait returns True if timeout does not expire."""
self.task._set(42)
self.assertTrue(self.task.wait(timeout=0))
def test_wait_timeout(self):
"""Task wait returns False if timeout expired."""
self.assertFalse(self.task.wait(timeout=0))
def test_get(self):
"""Task values are returned by get if results are set."""
self.task._set(42)
self.assertEqual(self.task.get(), 42)
def test_get_exception(self):
"""Task get raises the exception set as results."""
self.task._set(Exception("BOOM"))
self.assertRaises(Exception, self.task.get)
def test_get_timeout(self):
"""Task get raises TimeoutError if timeout expires."""
self.assertRaises(TimeoutError, self.task.get, 0)
def test_get_no_timeout(self):
"""Task values are returned by get if results are set
before timeout expires."""
self.task._set(42)
self.assertEqual(self.task.get(0), 42)
def test_get_timeout_cancelled(self):
"""Task is cancelled if Timeout expires and cancel is set."""
try:
self.task.get(timeout=0, cancel=True)
except TimeoutError:
pass
self.assertTrue(self.task.cancelled)
def test_cancel(self):
"""Task get raises TaskCancelled if task is cancelled."""
self.task.cancel()
self.assertRaises(TaskCancelled, self.task.get)
def test_set_unique(self):
"""Task _set works only once."""
self.task._set(42)
self.task._set(None)
self.assertEqual(self.task.get(), 42)
def test_set_not_overriding(self):
"""Task _set does not override a cancelled task."""
self.task.cancel()
self.task._set(42)
self.assertRaises(TaskCancelled, self.task.get)
def test_cancel_overriding(self):
"""Task cancel does not override a set task."""
self.task._set(42)
self.task.cancel()
self.assertEqual(self.task.get(), 42)
PK %CG
tests/test_thread_spawn.pyimport os
import unittest
try: # Python 2
from Queue import Queue
except: # Python 3
from queue import Queue
from pebble import thread
def undecorated(queue, argument, keyword_argument=0):
queue.put(argument + keyword_argument)
@thread.spawn(name='foo')
def decorated(queue, argument, keyword_argument=0):
"""A docstring."""
queue.put(argument + keyword_argument)
class TestThreadSpawnObj(object):
a = 0
def __init__(self):
self.b = 1
@classmethod
@thread.spawn
def clsmethod(cls, queue):
queue.put(cls.a)
@thread.spawn
def instmethod(self, queue):
queue.put(self.b)
@staticmethod
@thread.spawn
def stcmethod(queue):
queue.put(2)
class TestThreadSpawn(unittest.TestCase):
def setUp(self):
self.spawnobj = TestThreadSpawnObj()
def test_docstring(self):
"""Thread Spawn docstring is preserved."""
self.assertEqual(decorated.__doc__, "A docstring.")
def test_wrong_parameters(self):
"""Thread Spawn raises ValueError if wrong params."""
self.assertRaises(ValueError, thread.spawn, undecorated,
args=[1])
def test_thread_wrong_decoration(self):
"""Thread Spawn raises ValueError if given wrong params."""
try:
@thread.spawn(5, name='foo')
def wrong():
return
except Exception as error:
self.assertTrue(isinstance(error, ValueError))
def test_defaults(self):
"""Thread Spawn default values are preserved."""
queue = Queue()
thrd = decorated(queue, 1, 1)
thrd.join()
self.assertFalse(thrd.daemon)
def test_arguments(self):
"""Thread Spawn decorator arguments are forwarded."""
queue = Queue()
thrd = decorated(queue, 1, 1)
thrd.join()
self.assertEqual(thrd.name, 'foo')
def test_undecorated_results(self):
"""Thread Spawn undecorated results are produced."""
queue = Queue()
thrd = thread.spawn(target=decorated, args=[queue, 1],
kwargs={'keyword_argument': 1})
results = queue.get()
thrd.join()
self.assertEqual(results, 2)
def test_decorated_results(self):
"""Thread Spawn results are produced."""
queue = Queue()
thrd = decorated(queue, 1, 1)
results = queue.get()
thrd.join()
self.assertEqual(results, 2)
def test_class_method(self):
"""Thread Spawn decorated classmethods."""
queue = Queue()
thrd = TestThreadSpawnObj.clsmethod(queue)
results = queue.get()
thrd.join()
self.assertEqual(results, 0)
def test_instance_method(self):
"""Thread Spawn decorated instance methods."""
queue = Queue()
thrd = self.spawnobj.instmethod(queue)
results = queue.get()
thrd.join()
self.assertEqual(results, 1)
def test_static_method(self):
"""Thread Spawn decorated static methods."""
if os.name != 'nt':
queue = Queue()
thrd = self.spawnobj.stcmethod(queue)
results = queue.get()
thrd.join()
self.assertEqual(results, 2)
PK %CG tests/__init__.pyPK uKG W~4 ~4 tests/test_process_pool.pyimport os
import time
import signal
import unittest
import threading
try:
from queue import Queue
except ImportError:
from Queue import Queue
import pebble
from pebble import process
from pebble import TaskCancelled, TimeoutError, PoolError, ProcessExpired
event = threading.Event()
initarg = 0
results = 0
exception = None
def callback(task):
global results
global exception
try:
results = task.get()
except Exception as error:
exception = error
event.set()
def queue_factory():
return Queue(maxsize=5)
def error_callback(_):
raise Exception("BOOM!")
def initializer(value):
global initarg
initarg = value
def broken_initializer():
raise Exception("BOOM!")
def function(argument, keyword_argument=0):
"""A docstring."""
return argument + keyword_argument
def initializer_function():
return initarg
def error_function():
raise Exception("BOOM!")
def long_function():
time.sleep(1)
def pid_function():
time.sleep(0.1)
return os.getpid()
def sigterm_function():
signal.signal(signal.SIGTERM, signal.SIG_IGN)
time.sleep(100)
def suicide_function():
os._exit(1)
class TestProcessPool(unittest.TestCase):
def setUp(self):
global initarg
initarg = 0
self.event = threading.Event()
self.event.clear()
self.results = None
self.exception = None
def callback(self, task):
try:
self.results = task.get()
except Exception as error:
self.exception = error
finally:
self.event.set()
def test_process_pool_queue_factory(self):
"""Process Pool queue factory is called."""
with process.Pool(queue_factory=queue_factory) as pool:
self.assertEqual(pool._context.task_queue.maxsize, 5)
def test_process_pool_single_task(self):
"""Process Pool single task."""
with process.Pool() as pool:
task = pool.schedule(function, args=[1],
kwargs={'keyword_argument': 1})
self.assertEqual(task.get(), 2)
def test_process_pool_multiple_tasks(self):
"""Process Pool multiple tasks."""
tasks = []
with process.Pool() as pool:
for index in range(5):
tasks.append(pool.schedule(function, args=[1]))
self.assertEqual(sum([t.get() for t in tasks]), 5)
def test_process_pool_callback(self):
"""Process Pool results are forwarded to the callback."""
with process.Pool() as pool:
pool.schedule(function, args=[1], callback=self.callback,
kwargs={'keyword_argument': 1})
self.event.wait()
self.assertEqual(self.results, 2)
def test_process_pool_error(self):
"""Process Pool errors are raised by task get."""
with process.Pool() as pool:
task = pool.schedule(error_function)
self.assertRaises(Exception, task.get)
def test_process_pool_error_callback(self):
"""Process Pool errors are forwarded to callback."""
with process.Pool() as pool:
pool.schedule(error_function, callback=self.callback)
self.event.wait()
self.assertTrue(isinstance(self.exception, Exception))
def test_process_pool_timeout(self):
"""Process Pool task raises TimeoutError if so."""
with process.Pool() as pool:
task = pool.schedule(long_function, timeout=0.1)
self.assertRaises(TimeoutError, task.get)
def test_process_pool_timeout_callback(self):
"""Process Pool TimeoutError is forwarded to callback."""
with process.Pool() as pool:
pool.schedule(long_function, callback=self.callback, timeout=0.1)
self.event.wait()
self.assertTrue(isinstance(self.exception, TimeoutError))
def test_process_pool_cancel(self):
"""Process Pool task raises TaskCancelled if so."""
with process.Pool() as pool:
task = pool.schedule(long_function)
task.cancel()
self.assertRaises(TaskCancelled, task.get)
def test_process_pool_cancel_callback(self):
"""Process Pool TaskCancelled is forwarded to callback."""
with process.Pool() as pool:
task = pool.schedule(long_function, callback=self.callback)
task.cancel()
self.event.wait()
self.assertTrue(isinstance(self.exception, TaskCancelled))
def test_process_pool_different_process(self):
"""Process Pool multiple tasks are handled by different processes."""
tasks = []
with process.Pool(workers=2) as pool:
for i in range(0, 5):
tasks.append(pool.schedule(pid_function))
self.assertEqual(len(set([t.get() for t in tasks])), 2)
def test_process_pool_task_limit(self):
"""Process Pool task limit is honored."""
tasks = []
with process.Pool(task_limit=2) as pool:
for i in range(0, 4):
tasks.append(pool.schedule(pid_function))
self.assertEqual(len(set([t.get() for t in tasks])), 2)
def test_process_pool_stop_timeout(self):
"""Process Pool workers are stopped if task timeout."""
with process.Pool() as pool:
task1 = pool.schedule(pid_function)
pool.schedule(long_function, timeout=0.1)
task2 = pool.schedule(pid_function)
self.assertNotEqual(task1.get(), task2.get())
def test_process_pool_stop_cancel(self):
"""Process Pool workers are stopped if task cancelled."""
with process.Pool() as pool:
task1 = pool.schedule(pid_function)
task = pool.schedule(long_function)
# sleep if not the task will be cancelled before sending to worker
time.sleep(0.1)
task.cancel()
task2 = pool.schedule(pid_function)
self.assertNotEqual(task1.get(), task2.get())
def test_process_pool_schedule_id(self):
"""Process Pool task ID is forwarded to it."""
with process.Pool() as pool:
task = pool.schedule(function, args=[1], identifier='foo')
self.assertEqual(task.id, 'foo')
def test_process_pool_initializer(self):
"""Process Pool initializer is correctly run."""
with process.Pool(initializer=initializer, initargs=[1]) as pool:
task = pool.schedule(initializer_function)
self.assertEqual(task.get(), 1)
def test_process_pool_broken_initializer(self):
"""Process Pool broken initializer is notified."""
with self.assertRaises(PoolError):
with pebble.process.Pool(initializer=broken_initializer) as pool:
pool.schedule(function)
def test_process_pool_running(self):
"""Process Pool is active if a task is scheduled."""
with process.Pool() as pool:
pool.schedule(function, args=[1])
self.assertTrue(pool.active)
def test_process_pool_stopped(self):
"""Process Pool is not active once stopped."""
with process.Pool() as pool:
pool.schedule(function, args=[1])
self.assertFalse(pool.active)
def test_process_pool_close_tasks(self):
"""Process Pool all tasks are performed on close."""
tasks = []
pool = process.Pool()
for index in range(10):
tasks.append(pool.schedule(function, args=[index]))
pool.close()
pool.join()
map(self.assertTrue, [t.ready for t in tasks])
def test_process_pool_close_stopped(self):
"""Process Pool is stopped after close."""
pool = process.Pool()
pool.schedule(function, args=[1])
pool.close()
pool.join()
self.assertFalse(pool.active)
def test_process_pool_stop_tasks(self):
"""Process Pool not all tasks are performed on stop."""
tasks = []
pool = process.Pool()
for index in range(10):
tasks.append(pool.schedule(function, args=[index]))
pool.stop()
pool.join()
self.assertTrue(len([t for t in tasks if not t.ready]) > 0)
def test_process_pool_stop_stopped(self):
"""Process Pool is stopped after stop."""
pool = process.Pool()
pool.schedule(function, args=[1])
pool.stop()
pool.join()
self.assertFalse(pool.active)
def test_process_pool_join_workers(self):
"""Process Pool no worker is running after join."""
pool = process.Pool(workers=4)
pool.schedule(function, args=[1])
pool.stop()
pool.join()
self.assertEqual(len(pool._pool_manager.worker_manager.workers), 0)
def test_process_pool_join_running(self):
"""Process Pool RuntimeError is raised if active pool joined."""
with process.Pool() as pool:
pool.schedule(function, args=[1])
self.assertRaises(RuntimeError, pool.join)
def test_process_pool_join_tasks_timeout(self):
"""Process Pool TimeoutError is raised if join on long tasks."""
pool = process.Pool()
for index in range(2):
pool.schedule(long_function)
pool.close()
self.assertRaises(TimeoutError, pool.join, 0.4)
pool.stop()
pool.join()
def test_process_pool_callback_error(self):
"""Process Pool does not stop if error in callback."""
with process.Pool() as pool:
try:
pool.schedule(function, args=[1], callback=error_callback,
kwargs={'keyword_argument': 1})
pool.schedule(function, args=[1],
kwargs={'keyword_argument': 1})
except Exception:
self.fail("Error raised")
def test_process_pool_exception_isolated(self):
"""Process Pool an Exception does not affect other tasks."""
with process.Pool() as pool:
task = pool.schedule(error_function)
try:
task.get()
except:
pass
task = pool.schedule(function, args=[1],
kwargs={'keyword_argument': 1})
self.assertEqual(task.get(), 2)
@unittest.skipIf(os.name == 'nt', "Test won't run on Windows'.")
def test_process_pool_ignoring_sigterm(self):
"""Process Pool ignored SIGTERM signal are handled on Unix."""
with process.Pool() as pool:
task = pool.schedule(sigterm_function, timeout=0.2)
self.assertRaises(TimeoutError, task.get)
def test_process_pool_expired_worker(self):
"""Process Pool unexpect death of worker raises ProcessExpired."""
with process.Pool() as pool:
task = pool.schedule(suicide_function)
self.assertRaises(ProcessExpired, task.get)
# DEADLOCK TESTS
@process.spawn(name='worker_process', daemon=True)
def broken_worker_process_tasks(_, channel):
"""Process failing in receiving new tasks."""
with channel.mutex.reader:
os._exit(1)
@process.spawn(name='worker_process', daemon=True)
def broken_worker_process_results(_, channel):
"""Process failing in delivering results."""
for _ in pebble.process.pool.worker_get_next_task(channel, 2):
with channel.mutex.writer:
os._exit(1)
class TestProcessPoolDeadlockOnNewTasks(unittest.TestCase):
def setUp(self):
self.worker_process = pebble.process.pool.worker_process
pebble.process.pool.worker_process = broken_worker_process_tasks
pebble.process.channel.LOCK_TIMEOUT = 0.1
def tearDown(self):
pebble.process.pool.worker_process = self.worker_process
pebble.process.channel.LOCK_TIMEOUT = 60
def test_pool_deadlock(self):
"""Process Pool no deadlock if reading worker dies locking channel."""
with self.assertRaises(PoolError):
with pebble.process.Pool() as pool:
with self.assertRaises(pebble.ProcessExpired):
pool.schedule(function)
def test_pool_deadlock_stop(self):
"""Process Pool reading deadlocks are stopping the Pool."""
with self.assertRaises(PoolError):
pool = pebble.process.Pool()
for _ in range(10):
pool.schedule(function)
time.sleep(0.1)
class TestProcessPoolDeadlockOnResults(unittest.TestCase):
def setUp(self):
self.worker_process = pebble.process.pool.worker_process
pebble.process.pool.worker_process = broken_worker_process_results
pebble.process.channel.LOCK_TIMEOUT = 0.1
def tearDown(self):
pebble.process.pool.worker_process = self.worker_process
pebble.process.channel.LOCK_TIMEOUT = 60
def test_pool_deadlock(self):
"""Process Pool no deadlock if writing worker dies locking channel."""
with self.assertRaises(PoolError):
with pebble.process.Pool() as pool:
with self.assertRaises(pebble.ProcessExpired):
pool.schedule(function).get()
def test_pool_deadlock_stop(self):
"""Process Pool writing deadlocks are stopping the Pool."""
with self.assertRaises(PoolError):
pool = pebble.process.Pool()
for _ in range(10):
pool.schedule(function)
time.sleep(0.1)
PK %CG|۳@ @ tests/test_process_concurrent.pyimport os
import time
import signal
import unittest
import threading
from pebble import process, TaskCancelled, TimeoutError, ProcessExpired
event = threading.Event()
initarg = 0
results = 0
exception = None
def callback(concurrent):
global results
global exception
try:
results = concurrent.get()
except Exception as error:
exception = error
finally:
event.set()
def undecorated_simple():
return 0
def undecorated(argument, keyword_argument=0):
"""A docstring."""
return argument + keyword_argument
@process.concurrent
def decorated(argument, keyword_argument=0):
"""A docstring."""
return argument + keyword_argument
@process.concurrent
def error_decorated():
raise Exception("BOOM!")
@process.concurrent
def critical_decorated():
os._exit(123)
@process.concurrent
def long_decorated():
time.sleep(1)
@process.concurrent(timeout=0.2)
def timeout_decorated():
time.sleep(1)
@process.concurrent(callback=callback)
def decorated_callback(argument, keyword_argument=0):
"""A docstring."""
return argument + keyword_argument
@process.concurrent(callback=callback)
def error_decorated_callback():
raise Exception("BOOM!")
@process.concurrent(callback=callback)
def long_decorated_callback():
time.sleep(1)
@process.concurrent(callback=callback, timeout=0.2)
def timeout_decorated_callback():
time.sleep(1)
@process.concurrent(timeout=0.2)
def sigterm_decorated():
signal.signal(signal.SIGTERM, signal.SIG_IGN)
time.sleep(100)
class TestProcessConcurrentObj(object):
a = 0
def __init__(self):
self.b = 1
@classmethod
@process.concurrent
def clsmethod(cls):
return cls.a
@process.concurrent
def instmethod(self):
return self.b
@staticmethod
@process.concurrent
def stcmethod():
return 2
class TestProcessConcurrent(unittest.TestCase):
def setUp(self):
global results
global exception
results = 0
exception = None
event.clear()
self.results = 0
self.concurrentobj = TestProcessConcurrentObj()
def callback(self, task):
self.results = task.get()
event.set()
def test_docstring(self):
"""Process Concurrent docstring is preserved."""
self.assertEqual(decorated.__doc__, "A docstring.")
def test_wrong_parameters(self):
"""Process Concurrent raises ValueError if wrong params."""
self.assertRaises(ValueError, process.concurrent, undecorated,
args=[1])
def test_process_wrong_decoration(self):
"""Process Concurrent raises ValueError if given wrong params."""
try:
@process.concurrent(5, name='foo')
def wrong():
return
except Exception as error:
self.assertTrue(isinstance(error, ValueError))
def test_class_method(self):
"""Process Concurrent decorated classmethods."""
task = TestProcessConcurrentObj.clsmethod()
self.assertEqual(task.get(), 0)
def test_instance_method(self):
"""Process Concurrent decorated instance methods."""
task = self.concurrentobj.instmethod()
self.assertEqual(task.get(), 1)
@unittest.skipIf(os.name == 'nt', "Test won't run on Windows.")
def test_static_method(self):
"""Process Concurrent decorated static methods (Unix only)."""
task = self.concurrentobj.stcmethod()
self.assertEqual(task.get(), 2)
def test_undecorated_results(self):
"""Process Concurrent undecorated results are produced."""
task = process.concurrent(target=undecorated_simple)
self.assertEqual(task.get(), 0)
def test_undecorated_results_arguments(self):
"""Process Concurrent undecorated with args results are produced."""
task = process.concurrent(target=undecorated, args=[1],
kwargs={'keyword_argument': 1})
self.assertEqual(task.get(), 2)
def test_undecorated_started(self):
"""Process Concurrent undecorated task is set to started."""
task = process.concurrent(target=undecorated, args=[1],
kwargs={'keyword_argument': 1})
self.assertTrue(task.started)
def test_decorated_results(self):
"""Process Concurrent results are produced."""
task = decorated(1, 1)
self.assertEqual(task.get(), 2)
def test_decorated_results_callback(self):
"""Process Concurrent results are forwarded to the callback."""
decorated_callback(1, 1)
event.wait()
self.assertEqual(results, 2)
def test_error_decorated(self):
"""Process Concurrent errors are raised by concurrent get."""
task = error_decorated()
self.assertRaises(Exception, task.get)
def test_error_decorated_callback(self):
"""Process Concurrent errors are forwarded to callback."""
error_decorated_callback()
event.wait()
self.assertTrue(isinstance(exception, Exception))
def test_cancel_decorated(self):
"""Process Concurrent concurrent raises ConcurrentCancelled if so."""
task = long_decorated()
task.cancel()
self.assertRaises(TaskCancelled, task.get)
def test_cancel_decorated_callback(self):
"""Process Concurrent TaskCancelled is forwarded to callback."""
task = long_decorated_callback()
task.cancel()
event.wait()
self.assertTrue(isinstance(exception, TaskCancelled))
def test_undecorated_callback(self):
"""Process Concurrent undecorated results are forwarded to callback."""
task = process.concurrent(target=undecorated, args=[1],
kwargs={'keyword_argument': 1},
callback=self.callback)
event.wait()
self.assertEqual(task.get(), 2)
def test_timeout_decorated(self):
"""Process Concurrent concurrent raises TimeoutError if so."""
task = timeout_decorated()
self.assertRaises(TimeoutError, task.get)
def test_timeout_decorated_callback(self):
"""Process Concurrent TimeoutError is forwarded to callback."""
timeout_decorated_callback()
event.wait()
self.assertTrue(isinstance(exception, TimeoutError))
def test_undecorated_id(self):
"""Process concurrent ID is forwarded to it."""
task = process.concurrent(target=undecorated, args=[1],
kwargs={'keyword_argument': 1},
identifier='foo')
self.assertEqual(task.id, 'foo')
def test_decorated_dead_process(self):
"""Process Concurrent ProcessExpired is raised if process dies."""
task = critical_decorated()
self.assertRaises(ProcessExpired, task.get)
@unittest.skipIf(os.name == 'nt', "Test won't run on Windows.")
def test_decorated_pool_ignoring_sigterm(self):
"""Process Concurrent ignored SIGTERM signal are handled on Unix."""
task = sigterm_decorated()
self.assertRaises(TimeoutError, task.get)
PK uKG:j@.! .! tests/test_thread_pool.pyimport time
import unittest
import threading
try:
from queue import Queue
except ImportError:
from Queue import Queue
from pebble import thread
from pebble import PoolError, TaskCancelled, TimeoutError
event = threading.Event()
initarg = 0
results = 0
exception = None
def callback(task):
global results
global exception
try:
results = task.get()
except Exception as error:
exception = error
event.set()
def queue_factory():
return Queue(maxsize=5)
def error_callback(task):
raise Exception("BOOM!")
def initializer(value):
global initarg
initarg = value
def broken_initializer():
raise Exception("BOOM!")
def function(argument, keyword_argument=0):
"""A docstring."""
return argument + keyword_argument
def initializer_function():
return initarg
def error_function():
raise Exception("BOOM!")
def long_function():
time.sleep(1)
def tid_function():
time.sleep(0.1)
return threading.current_thread()
class TestThreadPool(unittest.TestCase):
def setUp(self):
global initarg
initarg = 0
self.event = threading.Event()
self.event.clear()
self.results = None
self.exception = None
def callback(self, task):
try:
self.results = task.get()
except Exception as error:
self.exception = error
finally:
self.event.set()
def test_thread_pool_queue_factory(self):
"""Thread Pool queue factory is called."""
with thread.Pool(queue_factory=queue_factory) as pool:
self.assertEqual(pool._context.task_queue.maxsize, 5)
def test_thread_pool_single_task(self):
"""Thread Pool single task."""
with thread.Pool() as pool:
task = pool.schedule(function, args=[1],
kwargs={'keyword_argument': 1})
self.assertEqual(task.get(), 2)
def test_thread_pool_multiple_tasks(self):
"""Thread Pool multiple tasks."""
tasks = []
with thread.Pool() as pool:
for index in range(5):
tasks.append(pool.schedule(function, args=[1]))
self.assertEqual(sum([t.get() for t in tasks]), 5)
def test_thread_pool_callback(self):
"""Thread Pool results are forwarded to the callback."""
with thread.Pool() as pool:
pool.schedule(function, args=[1], callback=self.callback,
kwargs={'keyword_argument': 1})
self.event.wait()
self.assertEqual(self.results, 2)
def test_thread_pool_error(self):
"""Thread Pool errors are raised by task get."""
with thread.Pool() as pool:
task = pool.schedule(error_function)
self.assertRaises(Exception, task.get)
def test_thread_pool_error_callback(self):
"""Thread Pool errors are forwarded to callback."""
with thread.Pool() as pool:
pool.schedule(error_function, callback=self.callback)
self.event.wait()
self.assertTrue(isinstance(self.exception, Exception))
def test_thread_pool_cancel(self):
"""Thread Pool task raises TaskCancelled if so."""
with thread.Pool() as pool:
task = pool.schedule(long_function)
task.cancel()
self.assertRaises(TaskCancelled, task.get)
def test_thread_pool_cancel_callback(self):
"""Thread Pool TaskCancelled is forwarded to callback."""
with thread.Pool() as pool:
task = pool.schedule(long_function, callback=self.callback)
task.cancel()
self.event.wait()
self.assertTrue(isinstance(self.exception, TaskCancelled))
def test_thread_pool_different_thread(self):
"""Thread Pool multiple tasks are handled by different threades."""
tasks = []
with thread.Pool(workers=2) as pool:
for i in range(0, 5):
tasks.append(pool.schedule(tid_function))
self.assertEqual(len(set([t.get() for t in tasks])), 2)
def test_thread_pool_task_limit(self):
"""Thread Pool task limit is honored."""
tasks = []
with thread.Pool(task_limit=2) as pool:
for i in range(0, 4):
tasks.append(pool.schedule(tid_function))
self.assertEqual(len(set([t.get() for t in tasks])), 2)
def test_thread_pool_schedule_id(self):
"""Thread Pool task ID is forwarded to it."""
with thread.Pool() as pool:
task = pool.schedule(function, args=[1], identifier='foo')
self.assertEqual(task.id, 'foo')
def test_thread_pool_initializer(self):
"""Thread Pool initializer is correctly run."""
with thread.Pool(initializer=initializer, initargs=[1]) as pool:
task = pool.schedule(initializer_function)
self.assertEqual(task.get(), 1)
def test_thread_pool_broken_initializer(self):
"""Thread Pool broken initializer is notified."""
with self.assertRaises(PoolError):
with thread.Pool(initializer=broken_initializer) as pool:
pool.schedule(function)
def test_thread_pool_running(self):
"""Thread Pool is active if a task is scheduled."""
with thread.Pool() as pool:
pool.schedule(function, args=[1])
self.assertTrue(pool.active)
def test_thread_pool_stopped(self):
"""Thread Pool is not active once stopped."""
with thread.Pool() as pool:
pool.schedule(function, args=[1])
self.assertFalse(pool.active)
def test_thread_pool_close_tasks(self):
"""Thread Pool all tasks are performed on close."""
tasks = []
pool = thread.Pool()
for index in range(10):
tasks.append(pool.schedule(function, args=[index]))
pool.close()
pool.join()
map(self.assertTrue, [t.ready for t in tasks])
def test_thread_pool_close_stopped(self):
"""Thread Pool is stopped after close."""
pool = thread.Pool()
pool.schedule(function, args=[1])
pool.close()
pool.join()
self.assertFalse(pool.active)
def test_thread_pool_stop_tasks(self):
"""Thread Pool not all tasks are performed on stop."""
tasks = []
pool = thread.Pool()
for index in range(10):
tasks.append(pool.schedule(long_function, args=[index]))
pool.stop()
pool.join()
self.assertTrue(len([t for t in tasks if not t.ready]) > 0)
def test_thread_pool_stop_stopped(self):
"""Thread Pool is stopped after stop."""
pool = thread.Pool()
pool.schedule(function, args=[1])
pool.stop()
pool.join()
self.assertFalse(pool.active)
def test_thread_pool_join_workers(self):
"""Thread Pool no worker is running after join."""
pool = thread.Pool(workers=4)
pool.schedule(function, args=[1])
pool.stop()
pool.join()
self.assertEqual(len(pool._pool_manager.workers), 0)
def test_thread_pool_join_running(self):
"""Thread Pool RuntimeError is raised if active pool joined."""
with thread.Pool() as pool:
pool.schedule(function, args=[1])
self.assertRaises(RuntimeError, pool.join)
def test_thread_pool_join_tasks_timeout(self):
"""Thread Pool TimeoutError is raised if join on long tasks."""
pool = thread.Pool()
for index in range(2):
pool.schedule(long_function)
pool.close()
self.assertRaises(TimeoutError, pool.join, 0.4)
pool.stop()
pool.join()
def test_thread_pool_callback_error(self):
"""Thread Pool stop if error in callback."""
with thread.Pool() as pool:
pool.schedule(function, args=[1], callback=error_callback,
kwargs={'keyword_argument': 1})
time.sleep(0.1)
self.assertRaises(PoolError, pool.schedule, function, args=[1])
def test_thread_pool_exception_isolated(self):
"""Thread Pool an Exception does not affect other tasks."""
with thread.Pool() as pool:
task = pool.schedule(error_function)
try:
task.get()
except:
pass
task = pool.schedule(function, args=[1],
kwargs={'keyword_argument': 1})
self.assertEqual(task.get(), 2)
PK %CG#z.- - tests/test_thread_concurrent.pyimport time
import unittest
import threading
from pebble import thread, TaskCancelled
event = threading.Event()
initarg = 0
results = 0
exception = None
def callback(task):
global results
global exception
try:
results = task.get()
except Exception as error:
exception = error
finally:
event.set()
def undecorated_simple():
return 0
def undecorated(argument, keyword_argument=0):
"""A docstring."""
return argument + keyword_argument
@thread.concurrent
def decorated(argument, keyword_argument=0):
"""A docstring."""
return argument + keyword_argument
@thread.concurrent
def error_decorated():
raise Exception("BOOM!")
@thread.concurrent
def long_decorated():
time.sleep(1)
@thread.concurrent(callback=callback)
def decorated_callback(argument, keyword_argument=0):
"""A docstring."""
return argument + keyword_argument
@thread.concurrent(callback=callback)
def error_decorated_callback():
raise Exception("BOOM!")
@thread.concurrent(callback=callback)
def long_decorated_callback():
time.sleep(1)
class TestThreadConcurrentObj(object):
a = 0
def __init__(self):
self.b = 1
@classmethod
@thread.concurrent
def clsmethod(cls):
return cls.a
@thread.concurrent
def instmethod(self):
return self.b
@staticmethod
@thread.concurrent
def stcmethod():
return 2
class TestThreadConcurrent(unittest.TestCase):
def setUp(self):
global results
global exception
results = 0
exception = None
event.clear()
self.concurrentobj = TestThreadConcurrentObj()
def callback(self, task):
self.results = task.get()
event.set()
def test_docstring(self):
"""Thread Concurrent docstring is preserved."""
self.assertEqual(decorated.__doc__, "A docstring.")
def test_wrong_parameters(self):
"""Thread Concurrent raises ValueError if wrong params."""
self.assertRaises(ValueError, thread.concurrent, undecorated,
args=[1])
def test_thread_wrong_decoration(self):
"""Thread Concurrent raises ValueError if given wrong params."""
try:
@thread.concurrent(5, name='foo')
def wrong():
return
except Exception as error:
self.assertTrue(isinstance(error, ValueError))
def test_class_method(self):
"""Thread Concurrent decorated classmethods."""
task = TestThreadConcurrentObj.clsmethod()
self.assertEqual(task.get(), 0)
def test_instance_method(self):
"""Thread Concurrent decorated instance methods."""
task = self.concurrentobj.instmethod()
self.assertEqual(task.get(), 1)
def test_static_method(self):
"""Thread Concurrent decorated static methods."""
task = self.concurrentobj.stcmethod()
self.assertEqual(task.get(), 2)
def test_undecorated_results(self):
"""Process Concurrent undecorated results are produced."""
task = thread.concurrent(target=undecorated_simple)
self.assertEqual(task.get(), 0)
def test_undecorated_results_arguments(self):
"""Process Concurrent undecorated with args results are produced."""
task = thread.concurrent(target=undecorated, args=[1],
kwargs={'keyword_argument': 1})
self.assertEqual(task.get(), 2)
def test_decorated_results(self):
"""Thread Concurrent results are produced."""
task = decorated(1, 1)
self.assertEqual(task.get(), 2)
def test_decorated_results_callback(self):
"""Thread Concurrent results are forwarded to the callback."""
decorated_callback(1, 1)
event.wait()
self.assertEqual(results, 2)
def test_error_decorated(self):
"""Thread Concurrent errors are raised by task get."""
task = error_decorated()
self.assertRaises(Exception, task.get)
def test_error_decorated_callback(self):
"""Thread Concurrent errors are forwarded to callback."""
error_decorated_callback()
event.wait()
self.assertTrue(isinstance(exception, Exception))
def test_cancel_decorated(self):
"""Thread Concurrent task raises TaskCancelled if so."""
task = long_decorated()
task.cancel()
self.assertRaises(TaskCancelled, task.get)
def test_cancel_decorated_callback(self):
"""Thread Concurrent TaskCancelled is forwarded to callback."""
task = long_decorated_callback()
task.cancel()
event.wait()
self.assertTrue(isinstance(exception, TaskCancelled))
def test_undecorated_callback(self):
"""Thread Concurrent undecorated results are forwarded to callback."""
task = thread.concurrent(target=undecorated, args=[1],
kwargs={'keyword_argument': 1},
callback=self.callback)
event.wait()
self.assertEqual(task.get(), 2)
def test_undecorated_id(self):
"""Thread Concurrent ID is forwarded to it."""
task = thread.concurrent(target=undecorated, args=[1],
kwargs={'keyword_argument': 1},
identifier='foo')
self.assertEqual(task.id, 'foo')
PK %CG^V/ tests/test_process_spawn.pyimport os
import unittest
from multiprocessing import Queue
from pebble import process
def undecorated(queue, argument, keyword_argument=0):
queue.put(argument + keyword_argument)
@process.spawn
def decorated(queue, argument, keyword_argument=0):
"""A docstring."""
queue.put(argument + keyword_argument)
@process.spawn(name='foo')
def decorated_kword(queue, argument, keyword_argument=0):
"""A docstring."""
queue.put(argument + keyword_argument)
class TestProcessSpawnObj(object):
a = 0
def __init__(self):
self.b = 1
@classmethod
@process.spawn
def clsmethod(cls, queue):
queue.put(cls.a)
@process.spawn
def instmethod(self, queue):
queue.put(self.b)
@staticmethod
@process.spawn
def stcmethod(queue):
queue.put(2)
class TestProcessSpawn(unittest.TestCase):
def setUp(self):
self.spawnobj = TestProcessSpawnObj()
def test_docstring(self):
"""Process Spawn docstring is preserved."""
self.assertEqual(decorated.__doc__, "A docstring.")
def test_wrong_parameters(self):
"""Process Spawn raises ValueError if wrong params."""
self.assertRaises(ValueError, process.spawn, undecorated,
args=[1])
def test_wrong_decoration(self):
"""Process Spawn decorator raises ValueError if wrong params."""
try:
@process.spawn(5, name='foo')
def wrong():
return
except Exception as error:
self.assertTrue(isinstance(error, ValueError))
def test_undecorated_results(self):
"""Process Spawn undecorated results are produced."""
queue = Queue()
proc = process.spawn(target=decorated_kword, args=[queue, 1])
results = queue.get()
proc.join()
self.assertEqual(results, 1)
def test_undecorated_keyworkd_results(self):
"""Process Spawn undecorated with keyword, results are produced."""
queue = Queue()
proc = process.spawn(target=decorated_kword, args=[queue, 1],
kwargs={'keyword_argument': 1})
results = queue.get()
proc.join()
self.assertEqual(results, 2)
def test_defaults(self):
"""Process Spawn default values are preserved."""
queue = Queue()
proc = decorated(queue, 1, 1)
proc.join()
self.assertFalse(proc.daemon)
def test_arguments(self):
"""Process Spawn decorator arguments are forwarded."""
queue = Queue()
proc = decorated_kword(queue, 1, 1)
proc.join()
self.assertEqual(proc.name, 'foo')
def test_decorated_results(self):
"""Process Spawn results are produced."""
queue = Queue()
proc = decorated(queue, 1, 1)
results = queue.get()
proc.join()
self.assertEqual(results, 2)
def test_class_method(self):
"""Process Spawn decorated_kword classmethods."""
queue = Queue()
proc = TestProcessSpawnObj.clsmethod(queue)
results = queue.get()
proc.join()
self.assertEqual(results, 0)
def test_instance_method(self):
"""Process Spawn decorated_kword instance methods."""
queue = Queue()
proc = self.spawnobj.instmethod(queue)
results = queue.get()
proc.join()
self.assertEqual(results, 1)
def test_static_method(self):
"""Process Spawn decorated_kword static methods."""
if os.name != 'nt':
queue = Queue()
proc = self.spawnobj.stcmethod(queue)
results = queue.get()
proc.join()
self.assertEqual(results, 2)
PK %CG
w w pebble/__init__.py__all__ = ['process',
'thread',
'waitfortasks',
'waitforthreads',
'waitforqueues',
'synchronized',
'sighandler',
'Task',
'PebbleError',
'PoolError',
'TimeoutError',
'TaskCancelled',
'ProcessExpired']
from . import thread
from . import process
from .task import Task
from .decorators import synchronized, sighandler
from .functions import waitforqueues, waitfortasks, waitforthreads
from .exceptions import TimeoutError, ProcessExpired
from .exceptions import PebbleError, PoolError, TaskCancelled
PK %CG9 pebble/decorators.py# This file is part of Pebble.
# Pebble is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# Pebble is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with Pebble. If not, see .
import signal
import threading
from functools import wraps
_synchronized_lock = threading.Lock()
def synchronized(*args):
"""A synchronized function prevents two or more callers to interleave
its execution preventing race conditions.
The synchronized decorator accepts as optional parameter a Lock, RLock or
Semaphore object which will be employed to ensure the function's atomicity.
If no synchronization object is given, a single threading.Lock will be used.
This implies that between different decorated function only one at a time
will be executed.
"""
if callable(args[0]):
return decorate_synchronized(args[0], _synchronized_lock)
else:
def wrap(function):
return decorate_synchronized(function, args[0])
return wrap
def decorate_synchronized(function, lock):
@wraps(function)
def wrapper(*args, **kwargs):
with lock:
return function(*args, **kwargs)
return wrapper
def sighandler(signals):
"""Sets the decorated function as signal handler of given *signals*.
*signals* can be either a single signal or a list/tuple
of multiple ones.
"""
def wrap(function):
set_signal_handlers(signals, function)
@wraps(function)
def wrapper(*args, **kwargs):
return function(*args, **kwargs)
return wrapper
return wrap
def set_signal_handlers(signals, function):
if isinstance(signals, (list, tuple)):
for signum in signals:
signal.signal(signum, function)
else:
signal.signal(signals, function)
PK %CG pebble/utils.py# This file is part of Pebble.
# Pebble is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# Pebble is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with Pebble. If not, see .
# Common utility functions
from time import sleep
from inspect import isclass
from itertools import count
from traceback import format_exc, print_exc
try: # Python 2
from Queue import Queue
except: # Python 3
from queue import Queue
from .exceptions import PoolError, TimeoutError
# Pool states
STOPPED = 0
RUNNING = 1
CLOSED = 2
CREATED = 3
EXPIRED = 4
ERROR = 5
def function_handler(launcher, decorator, *args, **kwargs):
"""Distinguishes between function and decorator usage of spawn and
concurrent functions.
"""
if isfunction(args, kwargs):
return launcher(kwargs.pop('target', None), **kwargs)
elif issimpledecorator(args, kwargs):
return decorator(args[0], launcher)
elif isparametrizeddecorator(args, kwargs):
def wrap(function):
return decorator(function, launcher, **kwargs)
return wrap
else:
raise ValueError("Only keyword arguments are accepted.")
def isfunction(args, kwargs):
"""spawn or concurrent used as regular function."""
if not args and kwargs and 'target' in kwargs:
return True
else:
return False
def issimpledecorator(args, kwargs):
"""spawn or concurrent used as decorator with no parameters."""
if args and not kwargs:
return True
else:
return False
def isparametrizeddecorator(args, kwargs):
"""spawn or concurrent used as decorator with parameters."""
if not args and kwargs:
return True
else:
return False
def execute(function, args=(), kwargs={}):
"""Runs the given function returning its results or exception."""
try:
return function(*args, **kwargs)
except Exception as error:
error.traceback = format_exc()
return error
PK %CG}* pebble/functions.py# This file is part of Pebble.
# Pebble is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# Pebble is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with Pebble. If not, see .
import threading
from time import time
from types import MethodType
_waitforthreads_lock = threading.Lock()
def waitfortasks(tasks, timeout=None):
"""Waits for one or more *Task* to be ready or until *timeout* expires.
*tasks* is a list containing one or more *pebble.Task* objects.
If *timeout* is not None the function will block
for the specified amount of seconds.
The function returns a list containing the ready *Tasks*.
"""
lock = threading.Condition(threading.Lock())
prepare_tasks(tasks, lock)
try:
wait_tasks(tasks, lock, timeout)
finally:
reset_tasks(tasks)
return filter(lambda t: t.ready, tasks)
def prepare_tasks(tasks, lock):
"""Replaces task._set() method in order to notify the waiting Condition."""
for task in tasks:
task._pebble_lock = lock
with task._task_ready:
task._pebble_old_method = task._set
task._set = MethodType(new_method, task)
def wait_tasks(tasks, lock, timeout):
with lock:
if not any(map(lambda t: t.ready, tasks)):
lock.wait(timeout)
def reset_tasks(tasks):
"""Resets original task._set() method."""
for task in tasks:
with task._task_ready:
task._set = task._pebble_old_method
delattr(task, '_pebble_old_method')
delattr(task, '_pebble_lock')
def waitforqueues(queues, timeout=None):
"""Waits for one or more *Queue* to be ready or until *timeout* expires.
*queues* is a list containing one or more *Queue.Queue* objects.
If *timeout* is not None the function will block
for the specified amount of seconds.
The function returns a list containing the ready *Queues*.
"""
lock = threading.Condition(threading.Lock())
prepare_queues(queues, lock)
try:
wait_queues(queues, lock, timeout)
finally:
reset_queues(queues)
return filter(lambda q: not q.empty(), queues)
def prepare_queues(queues, lock):
"""Replaces queue._put() method in order to notify the waiting Condition."""
for queue in queues:
queue._pebble_lock = lock
with queue.mutex:
queue._pebble_old_method = queue._put
queue._put = MethodType(new_method, queue)
def wait_queues(queues, lock, timeout):
with lock:
if not any(map(lambda q: not q.empty(), queues)):
lock.wait(timeout)
def reset_queues(queues):
"""Resets original queue._put() method."""
for queue in queues:
with queue.mutex:
queue._put = queue._pebble_old_method
delattr(queue, '_pebble_old_method')
delattr(queue, '_pebble_lock')
def waitforthreads(threads, timeout=None):
"""Waits for one or more *Thread* to exit or until *timeout* expires.
.. note::
Expired *Threads* are not joined by *waitforthreads*.
*threads* is a list containing one or more *threading.Thread* objects.
If *timeout* is not None the function will block
for the specified amount of seconds.
The function returns a list containing the ready *Threads*.
"""
old_function = None
lock = threading.Condition(threading.Lock())
def new_function(*args):
old_function(*args)
with lock:
lock.notify_all()
old_function = prepare_threads(new_function)
try:
wait_threads(threads, lock, timeout)
finally:
reset_threads(old_function)
return filter(lambda t: not t.is_alive(), threads)
def prepare_threads(new_function):
"""Replaces threading._get_ident() function in order to notify
the waiting Condition."""
with _waitforthreads_lock:
if hasattr(threading, 'get_ident'):
old_function = threading.get_ident
threading.get_ident = new_function
else:
old_function = threading._get_ident
threading._get_ident = new_function
return old_function
def wait_threads(threads, lock, timeout):
timestamp = time()
time_left = lambda: timeout - (time() - timestamp)
with lock:
while not any(map(lambda t: not t.is_alive(), threads)):
if timeout is None:
lock.wait()
elif time_left() > 0:
lock.wait(time_left())
else:
return
def reset_threads(old_function):
"""Resets original threading._get_ident() function."""
with _waitforthreads_lock:
if hasattr(threading, 'get_ident'):
threading.get_ident = old_function
else:
threading._get_ident = old_function
def new_method(self, *args):
self._pebble_old_method(*args)
with self._pebble_lock:
self._pebble_lock.notify_all()
PK xKGxI<= = pebble/pool.py# This file is part of Pebble.
# Pebble is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# Pebble is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with Pebble. If not, see .
import time
from itertools import count
from traceback import print_exc
from collections import namedtuple
try:
from queue import Queue
except ImportError:
from Queue import Queue
from .task import Task
from .exceptions import PoolError, TimeoutError
SLEEP_UNIT = 0.1
# Pool states
STOPPED = 0
RUNNING = 1
CLOSED = 2
CREATED = 3
EXPIRED = 4
ERROR = 5
TaskParameters = namedtuple('TaskParameters', ('function',
'args',
'kwargs'))
WorkerParameters = namedtuple('WorkerParameters', ('task_limit',
'initializer',
'initargs'))
class BasePool(object):
def __init__(self, workers, task_limit, queue_factory,
initializer, initargs):
self._context = PoolContext(workers, task_limit, queue_factory,
initializer, initargs)
self._loops = ()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
self.join()
@property
def active(self):
self._update_pool_state()
return self._context.state in (CLOSED, RUNNING)
def close(self):
"""Closes the Pool preventing new tasks from being accepted.
Pending tasks will be completed.
"""
self._context.state = CLOSED
def stop(self):
"""Stops the pool without performing any pending task."""
self._context.state = STOPPED
def join(self, timeout=None):
"""Joins the pool waiting until all workers exited.
If *timeout* is set, it block until all workers are done
or raises TimeoutError.
"""
if self._context.state == RUNNING:
raise RuntimeError('The Pool is still running')
if self._context.state == CLOSED:
self._wait_queue_depletion(timeout)
self.stop()
self.join()
else:
for loop in self._loops:
loop.join()
self._stop_pool()
def _wait_queue_depletion(self, timeout):
tick = time.time()
while self.active:
if timeout is not None and time.time() - tick > timeout:
raise TimeoutError("Tasks are still being executed")
elif self._context.task_queue.unfinished_tasks:
time.sleep(SLEEP_UNIT)
else:
return
raise PoolError()
def schedule(self, function, args=(), kwargs={}, identifier=None,
callback=None, timeout=0):
"""Schedules *function* to be run the Pool.
*args* and *kwargs* will be forwareded to the scheduled function
respectively as arguments and keyword arguments.
If *callback* is a callable it will be executed once the function
execution has completed with the returned *Task* as a parameter.
*timeout* is an integer, if expires the task will be terminated
and *Task.get()* will raise *TimeoutError*.
The *identifier* value will be forwarded to the *Task.id* attribute.
A *Task* object is returned.
"""
metadata = TaskParameters(function, args, kwargs)
return self._schedule_task(callback, timeout, identifier, metadata)
def _schedule_task(self, callback, timeout, identifier, metadata):
self._check_pool_state()
task = Task(next(self._context.task_counter), callback=callback,
timeout=timeout, identifier=identifier, metadata=metadata)
self._context.task_queue.put(task)
return task
def _check_pool_state(self):
self._update_pool_state()
if self._context.state == ERROR:
raise PoolError('Unexpected error within the Pool')
elif self._context.state != RUNNING:
raise RuntimeError('The Pool is not running')
def _update_pool_state(self):
if self._context.state == CREATED:
self._start_pool()
else:
for loop in self._loops:
if not loop.is_alive():
self._context.state = ERROR
def _start_pool(self):
raise NotImplementedError("Not implemented")
def _stop_pool(self):
raise NotImplementedError("Not implemented")
class PoolContext(object):
def __init__(self, workers, task_limit, queue_factory,
initializer, initargs):
self.state = CREATED
self.workers = workers
self.task_counter = count()
self.task_queue = create_queue(queue_factory)
self.worker_parameters = WorkerParameters(task_limit,
initializer, initargs)
@property
def alive(self):
return self.state not in (ERROR, STOPPED)
def create_queue(queue_factory):
if queue_factory is not None:
return queue_factory()
else:
return Queue()
def run_initializer(initializer, initargs):
try:
initializer(*initargs)
return True
except Exception:
print_exc()
return False
def task_limit_reached(counter, task_limit):
return task_limit > 0 and next(counter) >= task_limit
PK tJG` pebble/exceptions.py# This file is part of Pebble.
# Pebble is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# Pebble is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with Pebble. If not, see .
class PebbleError(Exception):
"""Pebble base exception."""
pass
class PoolError(PebbleError):
"""Raised if an error occurred within the Pool."""
pass
class TaskCancelled(PebbleError):
"""Raised if get is called on a cancelled task."""
pass
class ChannelError(PebbleError):
"""Error occurring within the process channel."""
pass
class TimeoutError(PebbleError):
"""Raised when a timeout expires."""
def __init__(self, msg, value=0):
super(TimeoutError, self).__init__(msg)
self.timeout = value
class ProcessExpired(PebbleError):
"""Raised when process dies unexpectedly."""
def __init__(self, msg, code=0):
super(ProcessExpired, self).__init__(msg)
self.exitcode = code
PK %CG|QN8 8 pebble/task.py# This file is part of Pebble.
# Pebble is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License
# as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# Pebble is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with Pebble. If not, see .
# Pebble's generic objects
import threading
from traceback import print_exc
from .exceptions import TimeoutError, TaskCancelled
class Task(object):
"""Handler to the ongoing task."""
def __init__(self, task_nr, callback=None, timeout=None, identifier=None,
metadata=None):
self.id = identifier
self._timeout = timeout
self._number = task_nr
self._ready = False
self._cancelled = False
self._results = None
self._task_ready = threading.Condition(threading.Lock())
self._timestamp = 0
self._callback = callback
self._metadata = metadata
def __str__(self):
return self.__repr__()
def __repr__(self):
return "%s (Task-%d, %s)" % (self.__class__, self.number, self.id)
@property
def number(self):
return self._number
@property
def ready(self):
return self._ready
@property
def cancelled(self):
return self._cancelled
@property
def started(self):
return self._timestamp > 0 and True or False
@property
def timeout(self):
return self._timeout
@property
def success(self):
return (self._ready and not
isinstance(self._results, BaseException) or False)
def get(self, timeout=None, cancel=False):
"""Retrieves the produced results, blocks until results are ready.
If the executed code raised an error it will be re-raised.
If *timeout* is set the call will block until the timeout expires
raising *TimeoutError" if results are not yet available.
If *cancel* is True while *timeout* is set *Task* will be cancelled
once the timeout expires.
"""
if self.wait(timeout=timeout):
if (isinstance(self._results, BaseException)):
raise self._results
else:
return self._results
else:
if cancel:
self.cancel()
raise TimeoutError("Task is still running")
def wait(self, timeout=None):
"""Waits until results are ready.
If *timeout* is set the call will block until the timeout expires.
Returns *True* if results are ready, *False* if timeout expired.
"""
with self._task_ready:
if not self._ready:
self._task_ready.wait(timeout)
return self._ready
def cancel(self):
"""Cancels the Task."""
self._cancelled = True
self.set_results(TaskCancelled("Task cancelled"))
def set_results(self, results):
"""Sets the results within the task and run the installed callback.
This function is meant for testing and internal use.
"""
self._set(results)
self._run_callback()
def _set(self, results):
with self._task_ready:
if not self._ready:
self._ready = True
self._results = results
self._task_ready.notify_all()
def _run_callback(self):
if self._callback is not None:
try:
self._callback(self)
except Exception:
print_exc()
PK %CG\=<