PK!rriberry/__init__.pyfrom riberry import config, plugins, model, celery, policy, services, exc, app config.config.enable() config.config.authentication.enable() PK!M[%riberry/app/__init__.pyfrom .env import current_context, current_riberry_app from .base import RiberryApplication from . import actions, context, tasks, util, addons, backends PK!WWriberry/app/actions/__init__.pyfrom . import artifacts, executions, notify, jobs, reports, shared_data, external_task PK!^Ww riberry/app/actions/artifacts.pyimport json import sys import traceback from typing import Union, Optional import riberry from riberry.exc import BaseError from riberry.model.job import ArtifactType from .. import current_context as context from ..util.events import create_event def create_artifact(filename: str, content: Union[bytes, str], name: str = None, type: Union[str, ArtifactType] = ArtifactType.output, category='Default', data: dict = None, stream: str = None, task_id: str = None, root_id: str = None): task_id = task_id or context.current.task_id root_id = root_id or context.current.root_id stream = stream or context.current.stream if name is None: name = filename if content is not None and not isinstance(content, (bytes, str)): content = json.dumps(content) if isinstance(content, str): content = content.encode() if isinstance(type, ArtifactType): type = type.value try: ArtifactType(type) except ValueError as exc: raise ValueError(f'ArtifactType enum has no value {type!r}.' f'Supported types: {", ".join(ArtifactType.__members__)}') from exc create_event( 'artifact', root_id=root_id, task_id=task_id, data={ 'name': str(name), 'type': str(type), 'category': str(category), 'data': data if isinstance(data, dict) else {}, 'stream': str(stream) if stream else None, 'filename': str(filename), }, binary=content ) def create_artifact_from_traceback( name: Optional[str] = None, filename: Optional[str] = None, category: str = 'Intercepted', type: riberry.model.job.ArtifactType = riberry.model.job.ArtifactType.error, ): exc_type, exc, tb = sys.exc_info() if not exc_type: return if isinstance(exc, BaseError): error_content = f'{traceback.format_exc()}\n\n{"-" * 32}\n\n{json.dumps(exc.output(), indent=2)}'.encode() else: error_content = traceback.format_exc().encode() task = context.current.task create_artifact( name=name if name else f'Exception {task.name}', type=type, category=category, data={ 'Error Type': exc.__class__.__name__, 'Error Message': str(exc), }, filename=filename if filename else f'{task.name}-{task.request.id}.log', content=error_content ) PK!w: : !riberry/app/actions/executions.pyimport traceback import uuid import pendulum import riberry from . import notify from .. import current_riberry_app from ..util import execution_tracker as tracker from ..util.events import create_event def queue_job_execution(execution: riberry.model.job.JobExecution): job = execution.job form = job.form try: execution.status = 'READY' execution.task_id = str(uuid.uuid4()) tracker.start_tracking_execution(root_id=execution.task_id) riberry.model.conn.commit() execution_task_id = current_riberry_app.start( execution_id=execution.id, root_id=execution.task_id, form=form.internal_name, ) except: execution.status = 'FAILURE' message = traceback.format_exc().encode() execution.artifacts.append( riberry.model.job.JobExecutionArtifact( job_execution=execution, name='Error on Startup', type='error', category='Fatal', filename='startup-error.log', size=len(message), binary=riberry.model.job.JobExecutionArtifactBinary( binary=message ) ) ) riberry.model.conn.commit() raise else: return execution_task_id def execution_complete(task_id, root_id, status, stream): job: riberry.model.job.JobExecution = riberry.model.job.JobExecution.query().filter_by( task_id=root_id).first() if not job: return cxt = current_riberry_app.context with cxt.scope(root_id=root_id, task_id=root_id, stream=None, step=None, category=None): try: current_riberry_app.context.event_registry.call( event_type=current_riberry_app.context.event_registry.types.on_completion, status=status, ) except: print('Error occurred while triggering on_completion event.') print(traceback.format_exc()) riberry.model.conn.rollback() job.task_id = root_id job.status = status job.completed = job.updated = pendulum.DateTime.utcnow() if not job.started: job.started = pendulum.DateTime.utcnow() if stream is None: stream = riberry.model.job.JobExecutionStream.query().filter_by(task_id=root_id).first() if stream is not None: stream = stream.name if stream is not None: create_event( name='stream', root_id=root_id, task_id=root_id, data={ 'stream': stream, 'state': status } ) riberry.model.conn.commit() notify.workflow_complete( task_id=task_id, root_id=root_id, status=status, ) def execution_started(task, job_id, primary_stream): root_id = task.request.root_id job: riberry.model.job.JobExecution = riberry.model.job.JobExecution.query().filter_by( id=job_id, ).one() job.started = job.updated = pendulum.DateTime.utcnow() job.status = 'ACTIVE' job.task_id = root_id riberry.model.conn.commit() create_event( name='stream', root_id=root_id, task_id=root_id, data={ 'stream': primary_stream, 'state': 'ACTIVE', } ) PK!<}J$riberry/app/actions/external_task.pyimport json from typing import Optional import uuid import riberry def create_external_task( job_execution: riberry.model.job.JobExecution, name: str = None, type: str = 'external', external_task_id: Optional[str] = None, input_data: Optional[bytes] = None ): if external_task_id is None: external_task_id = str(uuid.uuid4()) if name is None: name = external_task_id if isinstance(input_data, str): input_data = input_data.encode() if input_data is not None and not isinstance(input_data, bytes): input_data = json.dumps(input_data).encode() external_task = riberry.model.job.JobExecutionExternalTask( job_execution=job_execution, stream_id=None, task_id=external_task_id, name=name, type=type, input_data=input_data, ) riberry.model.conn.add(external_task) riberry.model.conn.commit() return external_task def mark_as_ready(external_task_id, output_data): task: riberry.model.job.JobExecutionExternalTask = riberry.model.job.JobExecutionExternalTask.query().filter_by( task_id=external_task_id, ).one() if isinstance(output_data, str): output_data = output_data.encode() if output_data is not None and not isinstance(output_data, bytes): output_data = json.dumps(output_data).encode() task.status = 'READY' task.output_data = output_data riberry.model.conn.commit() PK!lnnriberry/app/actions/jobs.pyimport riberry from ..env import current_context def create_job(form_name, job_name=None, input_values=None, input_files=None, owner=None, execute=True): form: riberry.model.interface.Form = riberry.model.interface.Form.query().filter_by( internal_name=form_name, ).first() job_execution = current_context.current.job_execution job_execution_user = owner if owner else job_execution.creator with riberry.services.policy.policy_scope(user=job_execution_user): job = riberry.services.job.create_job( form_id=form.id, name=job_name or f'Via {job_execution.job.name} / #{job_execution.id}', input_values=input_values or {}, input_files=input_files or {}, execute=execute, parent_execution=job_execution, ) riberry.model.conn.commit() return job PK!DzZriberry/app/actions/notify.pyfrom typing import Optional, List from .. import current_context as context from ..util.events import create_event def notify(notification_type, data=None, task_id=None, root_id=None): task_id = task_id or context.current.task_id root_id = root_id or context.current.root_id create_event( 'notify', root_id=root_id, task_id=task_id, data={ 'type': notification_type, 'data': data or {} }, binary=None ) def workflow_complete(task_id: str, root_id: str, status: str): notify( notification_type='workflow_complete', data=dict(status=status), task_id=task_id, root_id=root_id ) def send_email( subject: str, body: str, mime_type: Optional[str] = None, sender: Optional[str] = None, receivers: Optional[List[str]] = None ): if isinstance(receivers, str): receivers = [receivers] notify( notification_type='custom-email', data={ 'subject': subject, 'mime_type': mime_type, 'body': body, 'from': sender, 'to': receivers or [] } ) PK!riberry/app/actions/reports.pyimport riberry from typing import List from .. import env def update_all_reports(app_instance=None): app_instance = app_instance if app_instance else env.get_instance_model() reports: List[riberry.model.job.JobExecutionReport] = riberry.model.job.JobExecutionReport.query().filter( riberry.model.job.JobExecutionReport.marked_for_refresh == True, ).join( riberry.model.job.JobExecution ).join( riberry.model.job.Job ).filter_by( instance=app_instance ).all() cxt = env.current_context for report in reports: root_id = report.job_execution.task_id with cxt.scope(root_id=root_id, task_id=root_id, stream=None, step=None, category=None): cxt.event_registry.call(event_type=cxt.event_registry.types.on_report_refresh, report=report.name) if report.marked_for_refresh: report.marked_for_refresh = False riberry.model.conn.commit() PK!+M$$"riberry/app/actions/shared_data.pyimport riberry from typing import List from .. import env def update_all_data_items(app_instance=None): app_instance = app_instance if app_instance else env.get_instance_model() data_items: List[riberry.model.misc.ResourceData] = riberry.model.misc.ResourceData.query().filter( riberry.model.misc.ResourceData.marked_for_refresh == True, riberry.model.misc.ResourceData.resource_type == riberry.model.misc.ResourceType.job_execution, ).join( riberry.model.job.JobExecution, riberry.model.misc.ResourceData.resource_id == riberry.model.job.JobExecution.id, ).join( riberry.model.job.Job ).filter_by( instance=app_instance ).all() cxt = env.current_context for item in data_items: root_id = riberry.model.job.JobExecution.query().filter_by(id=item.resource_id).one().task_id with cxt.scope(root_id=root_id, task_id=root_id, stream=None, step=None, category=None): cxt.event_registry.call(event_type=cxt.event_registry.types.on_data_updated, data_name=item.name) cxt.event_registry.call(event_type=cxt.event_registry.types.on_report_refresh, data_name=item.name) if item.marked_for_refresh: item.marked_for_refresh = False riberry.model.conn.commit() PK!riberry/app/addons/__init__.pyfrom.base import Addon PK!{riberry/app/addons/base.pyimport riberry class Addon: def register(self, riberry_app: 'riberry.app.base.RiberryApplication'): raise NotImplementedError PK!Š,, riberry/app/backends/__init__.pyfrom .base import RiberryApplicationBackend PK!riberry/app/backends/base.pyimport functools import riberry from typing import Dict, AnyStr class RiberryApplicationBackend: def __init__(self, instance): self.instance = instance def task(self, func=None, **options): if callable(func): return self.register_task(func=func, **options) else: return functools.partial(self.task, **options) def initialize(self): raise NotImplementedError def default_addons(self) -> Dict[AnyStr, 'riberry.app.addons.Addon']: raise NotImplementedError def register_task(self, func, **options): raise NotImplementedError def task_by_name(self, name: AnyStr): raise NotImplementedError def start_execution(self, execution_id, root_id, entry_point) -> AnyStr: raise NotImplementedError def create_receiver_task(self, external_task_id, validator): raise NotImplementedError def active_task(self): raise NotImplementedError def __getattr__(self, item): return getattr(self.instance, item) PK!%riberry/app/backends/impl/__init__.pyPK!qIi<<,riberry/app/backends/impl/celery/__init__.pyfrom .base import CeleryBackend from . import addons, patch PK!3riberry/app/backends/impl/celery/addons/__init__.pyfrom .background import BackgroundTasks from .capacity import Capacity from .external_task_receiver import ExternalTaskReceiver from .scale import Scale PK!y5riberry/app/backends/impl/celery/addons/background.pyfrom .base import AddonStartStopStep import riberry class BackgroundTasks(riberry.app.addons.Addon): def register(self, riberry_app: 'riberry.app.base.RiberryApplication'): class ConcreteBackgroundTasksStep(BackgroundTasksStep): rib = riberry_app riberry_app.backend.steps['worker'].add(ConcreteBackgroundTasksStep) class BackgroundTasksStep(AddonStartStopStep): requires = {'celery.worker.components:Timer'} def __init__(self, worker, **_): super().__init__(worker=worker, interval=0.6) self.lock = riberry.app.util.redis_lock.RedisLock(name='step:background', on_acquired=self.on_lock_acquired, interval=500) @staticmethod def on_lock_acquired(): try: riberry.app.tasks.echo() riberry.app.tasks.poll() riberry.app.tasks.refresh() finally: riberry.model.conn.remove() riberry.model.conn.raw_engine.dispose() def should_run(self) -> bool: return True def run(self): redis_instance = riberry.celery.util.celery_redis_instance() self.lock.run(redis_instance=redis_instance) PK!4yy/riberry/app/backends/impl/celery/addons/base.pyimport time from celery import bootsteps import riberry from celery.utils.log import logger as log class AddonStartStopStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Timer'} rib: 'riberry.app.base.RiberryApplication' def __init__(self, worker, interval, priority=10, **kwargs): super().__init__(worker, interval=interval, priority=priority, **kwargs) self._timer = None self.interval = interval self.priority = priority self._worker = worker @property def worker(self): return self._worker @property def worker_state(self): return self.worker.state @property def consumer(self): return self.worker.consumer def start(self, worker): if self.should_run(): self._timer = worker.timer.call_repeatedly( self.interval, self.__execute, priority=self.priority, ) def stop(self, worker): if self._timer is not None: self._timer.cancel() self._timer = None def __execute(self): start_time = time.time() try: self.run() finally: log.debug(f'Completed {type(self).__name__} in {time.time() - start_time:2} seconds') def should_run(self) -> bool: raise NotImplementedError def run(self): raise NotImplementedError PK! <riberry/app/backends/impl/celery/addons/capacity/__init__.pyfrom sqlalchemy.util.compat import contextmanager from ..base import AddonStartStopStep from .priority_queue import PriorityQueue import riberry from celery.utils.log import logger as log class Capacity(riberry.app.addons.Addon): def __init__(self, parameter='capacity', key=None, sep='|', queue_cls=PriorityQueue, blocking: bool = True, block_retry: int = 0.5, r=None): self.parameter = parameter self.r = r or riberry.celery.util.celery_redis_instance() self.sep = sep self.queue = queue_cls(r=self.r, key=key, blocking=blocking, block_retry=block_retry) @property def last_value_key(self): return self.queue.make_key(self.queue.key, 'raw') @property def last_value(self): last_value = self.r.get(self.last_value_key) return last_value.decode() if isinstance(last_value, bytes) else None @last_value.setter def last_value(self, value): self.r.set(self.last_value_key, value='' if value is None else value) @contextmanager def borrow(self): member, score, version = self.queue.pop() try: yield member, score, version finally: self.queue.put(member=member, version=version) def register(self, riberry_app: 'riberry.app.base.RiberryApplication'): class ConcreteCapacityStep(CapacityStep): rib = riberry_app capacity = self if self.queue.key is None: self.queue.key = riberry_app.context.current.riberry_app_instance.internal_name riberry_app.backend.steps['worker'].add(ConcreteCapacityStep) class CapacityStep(AddonStartStopStep): capacity: Capacity def __init__(self, worker, **_): super().__init__(worker=worker, interval=0.5) def should_run(self) -> bool: return True def run(self): value = self.rib.context.current.riberry_app_instance.active_schedule_value(name=self.capacity.parameter) if self.capacity.last_value is not None and value == self.capacity.last_value: return self.capacity.last_value = value values = [ part.split(self.capacity.sep) if self.capacity.sep in part else (part, 0) for part in (value or '').split(' ') ] member_scores = {k: int(v) for k, v in values} self.capacity.queue.update(member_scores) log.info( f'DynamicPriorityParameter: ({self.capacity.queue.free_key}) updated {self.capacity.parameter} queue with {value!r}') PK!Tl4Briberry/app/backends/impl/celery/addons/capacity/priority_queue.pyimport time import redis import functools from celery.utils.log import logger as log class PriorityQueue: def __init__( self, r: redis.Redis, key: str, prefix: str='pq', sep: str=':', blocking: bool=True, block_retry: int=0.5): self.r: redis.Redis = r self.key = key self.prefix = prefix self.sep = sep self.blocking = blocking self.block_retry = block_retry def make_key(self, *args): sub_key = self.sep.join(map(str, args)) return f'{self.prefix}{self.sep}{sub_key}' @property def version(self): return int(self.r.get(self.version_key) or 0) @version.setter def version(self, value): self.r.set(self.version_key, value=value) @property def version_key(self): return self.make_key(self.key, 'counter') @property def free_key(self): return self.generate_free_key(version=self.version) @property def lease_key(self): return self.generate_lease_key(version=self.version) def generate_free_key(self, version): return self.make_key(self.key, f'{version:09}', 'free') def generate_lease_key(self, version): return self.make_key(self.key, f'{version:09}', 'lease') def pop(self): while True: version = self.version result = self.r.transaction( functools.partial(self.pop_transaction, version=version), self.generate_free_key(version=version), self.generate_lease_key(version=version), value_from_callable=True ) if not result and self.blocking: log.warn(f'PriorityQueue: ({self.free_key}) encountered blank key, ' f'retrying after {self.block_retry} seconds.') time.sleep(self.block_retry) continue else: break return result, self.r.zscore(self.generate_free_key(version=version), result), version def pop_transaction(self, pipe: redis.client.Pipeline, version): free_key, lease_key = self.generate_free_key(version=version), self.generate_lease_key(version=version) [(member, score)] = pipe.zrevrange(free_key, 0, 0, withscores=True) member = member.decode() pipe.multi() pipe.zincrby(free_key, value=member, amount=-1) pipe.zincrby(lease_key, value=member, amount=1) return member def put(self, member, version): self.r.transaction( functools.partial(self.put_transaction, member=member, version=version), self.generate_free_key(version=version), self.generate_lease_key(version=version), ) def put_transaction(self, pipe: redis.client.Pipeline, member, version): free_key, lease_key = self.generate_free_key(version=version), self.generate_lease_key(version=version) pipe.multi() pipe.zincrby(free_key, value=member, amount=1) pipe.zincrby(lease_key, value=member, amount=-1) def update(self, member_scores: dict): func = functools.partial(self.update_transaction, member_scores=member_scores) self.version = self.r.transaction(func, value_from_callable=True) def update_transaction(self, pipe: redis.client.Pipeline, member_scores): version = self.version + 1 pipe.multi() pipe.zadd(self.generate_free_key(version=version), mapping=member_scores) return version def items(self): return [(k.decode(), v) for k, v in self.r.zrevrange(self.free_key, 0, -1, withscores=True)] def leased_items(self): return [(k.decode(), v) for k, v in self.r.zrevrange(self.lease_key, 0, -1, withscores=True)] def clear(self): self.r.delete(self.free_key, self.lease_key) def __iter__(self): return self def __next__(self): try: return self.pop() except ValueError: raise StopIteration PK!Óx3 3 Ariberry/app/backends/impl/celery/addons/external_task_receiver.pyfrom .base import AddonStartStopStep import riberry from riberry.app.backends.impl import celery as celery_backend class ExternalTaskReceiver(riberry.app.addons.Addon): RECEIVER_QUEUE = 'rib.external' def register(self, riberry_app: 'riberry.app.base.RiberryApplication'): class ConcreteExternalTaskReceiverStep(ExternalTaskReceiverStep): rib = riberry_app riberry_app.backend.steps['worker'].add(ConcreteExternalTaskReceiverStep) riberry_app.backend.user_options['worker'].add(self.regiser_user_options) task_routes = { celery_backend.CeleryBackend.CHECK_EXTERNAL_TASK_NAME: {'queue': self.RECEIVER_QUEUE}, } if not riberry_app.backend.conf.task_routes: riberry_app.backend.conf.task_routes = {} riberry_app.backend.conf.task_routes.update(task_routes) for addon in riberry_app.addons.values(): if isinstance(addon, celery_backend.addons.Scale): addon.conf.ignore_queues.add(self.RECEIVER_QUEUE) @staticmethod def regiser_user_options(parser): parser.add_argument( '--rib-receiver', action='store_true', default=False, help='Receiver of external Riberry tasks.', ) class ExternalTaskReceiverStep(AddonStartStopStep): def __init__(self, worker, rib_receiver, **_): super().__init__(worker=worker, interval=1) self._is_receiver = bool(rib_receiver) def should_run(self) -> bool: return self._is_receiver def run(self): active = riberry.model.job.JobExecution.query().filter_by( status='ACTIVE' ).join(riberry.model.job.Job).filter_by( instance=self.rib.context.current.riberry_app_instance, ).join(riberry.model.job.JobExecutionExternalTask).filter_by( status='READY' ).count() operation = 'add' if active else 'cancel' consumer = self.worker.consumer queues = {q.name for q in consumer.task_consumer.queues} if operation == 'add' and ExternalTaskReceiver.RECEIVER_QUEUE not in queues: consumer.add_task_queue(ExternalTaskReceiver.RECEIVER_QUEUE) if operation == 'cancel' and ExternalTaskReceiver.RECEIVER_QUEUE in queues: consumer.cancel_task_queue(ExternalTaskReceiver.RECEIVER_QUEUE) PK!t't'0riberry/app/backends/impl/celery/addons/scale.pyimport math import riberry from .base import AddonStartStopStep from celery.utils.log import logger as log class Scale(riberry.app.addons.Addon): def __init__( self, active_parameter='active', concurrency_parameter='concurrency', minimum_concurrency=None, maximum_concurrency=None, check_queues=None, ): self.conf = ScaleConfiguration( active_parameter=active_parameter, concurrency_parameter=concurrency_parameter, minimum_concurrency=minimum_concurrency, maximum_concurrency=maximum_concurrency, check_queues=check_queues, ) self.active_parameter = active_parameter self.concurrency_parameter = concurrency_parameter self.minimum_concurrency = minimum_concurrency self.maximum_concurrency = maximum_concurrency def register(self, riberry_app): class ConcreteScaleStep(ScaleStep): conf = self.conf rib = riberry_app riberry_app.backend.steps['worker'].add(ConcreteScaleStep) riberry_app.backend.user_options['worker'].add(self.regiser_user_options) @staticmethod def regiser_user_options(parser): parser.add_argument( '--rib-scale', action='store_true', default=False, help='Scale concurrency depending on activity (disabled by default)', ) parser.add_argument( '--rib-scale-parameter', help='The name of the application instance parameter which stores the concurrency value', ) parser.add_argument( '--rib-scale-group', default='default', help='Concurrency is distributed amongst all active workers in the same group', ) parser.add_argument( '--rib-scale-max', default=None, help='Maximum concurrency when auto-scaling (uncapped by default)', ) parser.add_argument( '--rib-scale-min', default=None, help='Minimum concurrency when auto-scaling (zero by default)', ) feature_parser = parser.add_mutually_exclusive_group(required=False) feature_parser.add_argument( '--rib-scale-check-queues', dest='rib_scale_check_queues', action='store_true', help='Scale down if queues are empty and there are no active tasks' ) feature_parser.add_argument( '--rib-scale-ignore-queues', dest='rib_scale_check_queues', action='store_false', help='Do not scale down if queues are empty and there are no active tasks' ) parser.set_defaults(rib_scale_check_queues=True) class ScaleConfiguration: def __init__( self, scale_concurrency=False, scale_group='default', active_parameter='active', concurrency_parameter='concurrency', minimum_concurrency=None, maximum_concurrency=None, check_queues=None, ): self.active_parameter = active_parameter self.concurrency_parameter = concurrency_parameter self.scale = scale_concurrency self.scale_group = scale_group self.minimum_concurrency = minimum_concurrency self.maximum_concurrency = maximum_concurrency self.check_queues = check_queues self.ignore_queues = set() class ScaleStep(AddonStartStopStep): conf: ScaleConfiguration def __init__(self, worker, rib_scale, rib_scale_group, rib_scale_parameter, rib_scale_min, rib_scale_max, rib_scale_check_queues, **_): super().__init__(worker=worker, interval=1.0) self.lock = riberry.app.util.redis_lock.RedisLock(name='step:scale', on_acquired=self.on_lock_acquired, interval=5000) self.conf.scale = bool(rib_scale) self.conf.scale_group = rib_scale_group or self.conf.scale_group self.conf.concurrency_parameter = rib_scale_parameter or self.conf.concurrency_parameter self.conf.minimum_concurrency = int(rib_scale_min) if rib_scale_min is not None else self.conf.minimum_concurrency self.conf.maximum_concurrency = int(rib_scale_max) if rib_scale_max is not None else self.conf.maximum_concurrency self.conf.check_queues = bool(rib_scale_check_queues) if rib_scale_check_queues is not None else self.conf.check_queues self.queues = set() self.target_concurrency = None self.initial_concurrency = None self.is_active = False self._worker_uuid = self.rib.context.current.WORKER_UUID self._instance_name = self.rib.context.current.riberry_app_instance.internal_name def should_run(self) -> bool: return True def run(self): redis_instance = riberry.celery.util.celery_redis_instance() self.lock.run(redis_instance=redis_instance) if not self.consumer.task_consumer: return self.update(redis_instance=redis_instance) self.scale() def on_lock_acquired(self): r = riberry.celery.util.celery_redis_instance() epoch, _ = r.time() occurrence = set(r.zrevrangebyscore(name=self.scale_groups_log_key, max=epoch, min=epoch - 60)) if occurrence: r.delete(self.scale_groups_active_temp_key) r.sadd(self.scale_groups_active_temp_key, *occurrence) r.rename(src=self.scale_groups_active_temp_key, dst=self.scale_groups_active_key) @classmethod def _tasks_available(cls, r, state, queues): return bool( len(state.reserved_requests) or len(state.active_requests) or state.requests or not cls._queues_empty(r, queues=queues) ) @staticmethod def _queues_empty(r, queues): for queue_name in queues: for queue in r.keys(f'{queue_name}*'): try: if r.llen(queue): return False except: continue return True def report(self, r): epoch, _ = r.time() r.zadd(name=self.scale_groups_log_key, mapping={self._worker_uuid: epoch}) @property def scale_groups_active_key(self): return f'{self._instance_name}:scale-groups:{self.conf.scale_group}:active' @property def scale_groups_log_key(self): return f'{self._instance_name}:scale-groups:{self.conf.scale_group}:log' @property def scale_groups_active_temp_key(self): return f'{self._instance_name}:scale-groups:{self.conf.scale_group}:active-temp' def update(self, redis_instance): self.queues.update({q.name for q in self.worker.consumer.task_consumer.queues}) if not self.initial_concurrency: self.initial_concurrency = self.worker.consumer.pool.num_processes instance = self.rib.context.current.riberry_app_instance active_flag = instance.active_schedule_value(name=self.conf.active_parameter, default='Y') == 'Y' tasks_available = self._tasks_available(r=redis_instance, state=self.worker_state, queues=self.queues - self.conf.ignore_queues) if tasks_available: self.report(r=redis_instance) self.is_active = active_flag and (tasks_available if self.conf.check_queues else True) if not self.is_active: self.target_concurrency = 0 return scale_group = list(sorted(b.decode() for b in redis_instance.smembers(self.scale_groups_active_key))) if self.conf.scale and self.conf.concurrency_parameter: target_concurrency = instance.active_schedule_value(name=self.conf.concurrency_parameter, default=None) if target_concurrency is None: target_concurrency = self.initial_concurrency else: target_concurrency = int(target_concurrency) target_concurrency *= (1 / len(scale_group)) if self._worker_uuid in scale_group else 0 if target_concurrency: if scale_group.index(self._worker_uuid) == 0: target_concurrency = math.ceil(target_concurrency) else: target_concurrency = math.floor(target_concurrency) else: target_concurrency = self.initial_concurrency if self.conf.maximum_concurrency is not None: target_concurrency = min(target_concurrency, self.conf.maximum_concurrency) if self.conf.minimum_concurrency is not None: target_concurrency = max(target_concurrency, self.conf.minimum_concurrency) target_concurrency = int(target_concurrency) if target_concurrency != self.target_concurrency: self.target_concurrency = target_concurrency def scale(self): actual_concurrency = self.worker.consumer.pool.num_processes target_concurrency = self.target_concurrency scale_group = list(sorted(b.decode() for b in riberry.celery.util.celery_redis_instance().smembers(self.scale_groups_active_key))) log.debug(f'A: {self.is_active} C[T]: {self.target_concurrency}, C[A]: {actual_concurrency} M: {scale_group}') if target_concurrency == 0: for queue in list(self.consumer.task_consumer.queues): if queue.name not in self.conf.ignore_queues: self.consumer.cancel_task_queue(queue) else: queue_names = {q.name for q in self.consumer.task_consumer.queues} for queue in self.queues: if queue not in queue_names and queue not in self.conf.ignore_queues: self.consumer.add_task_queue(queue) if target_concurrency > actual_concurrency: if actual_concurrency == 0: self.worker.consumer.pool.grow(1) else: self.worker.consumer.pool.grow(min(target_concurrency - actual_concurrency, 8)) elif actual_concurrency > target_concurrency: self.worker.consumer.pool.shrink(min(actual_concurrency - target_concurrency, 8)) PK!|))3riberry/app/backends/impl/celery/addons/subqueue.pyfrom celery import current_task from .base import AddonStartStopStep import riberry KEY = 'rib_subqueue' def send_task_propagate_subqueue(self, *args, **kwargs): kwargs['headers'] = kwargs.get('headers', {}) if current_task and getattr(current_task.request, KEY, object) != object: kwargs['headers'][KEY] = current_task.request.get(KEY) else: kwargs['headers'][KEY] = kwargs['headers'].get(KEY) class SubQueue(riberry.app.addons.Addon): RECEIVER_QUEUE = 'rib.external' def __init__(self, amount, queues): self.amount = amount self.queues = queues def register(self, riberry_app: 'riberry.app.base.RiberryApplication'): class ConcreteSubQueueStep(SubQueueStep): rib = riberry_app amount = self.amount queues = self.queues riberry_app.backend.steps['worker'].add(ConcreteSubQueueStep) riberry.app.backends.impl.celery.patch.patch_send_task( instance=riberry_app.backend.instance, func=send_task_propagate_subqueue, ) class SubQueueStep(AddonStartStopStep): queues: list amount: int def __init__(self, worker, **_): super().__init__(worker=worker, interval=1) self.executed = False def should_run(self) -> bool: return True def run(self): if not self.executed and self.consumer: for queue in self.queues: for num in range(self.amount): self.consumer.add_task_queue(f'{queue}.{num}') self.executed = True PK!;X X (riberry/app/backends/impl/celery/base.pyfrom typing import Dict, AnyStr import celery import riberry from . import patch, tasks, addons from .executor import TaskExecutor def send_task_process_rib_kwargs(self, *args, **kwargs): riberry_properties = {} if kwargs.get('kwargs'): for key, value in kwargs['kwargs'].items(): if key.startswith('__rib_'): riberry_properties[key.replace('__rib_', '', 1)] = value class CeleryBackend(riberry.app.backends.RiberryApplicationBackend): instance: celery.Celery ENTRY_POINT_TASK_NAME = 'riberry.core.app.entry_point' CHECK_EXTERNAL_TASK_NAME = 'riberry.core.app.check_external_task' def __init__(self, instance): super().__init__(instance=instance) self.executor = TaskExecutor() def initialize(self): patch.patch_send_task(instance=self.instance, func=send_task_process_rib_kwargs) # Register "entry point" task self.task( name=self.ENTRY_POINT_TASK_NAME, )(self.executor.entry_point_executor()) # Register "external task checker" task self.task( name=self.CHECK_EXTERNAL_TASK_NAME, max_retries=None, )(self.executor.external_task_executor()) def default_addons(self) -> Dict[AnyStr, 'riberry.app.addons.Addon']: return { 'scale': addons.Scale(), 'background': addons.BackgroundTasks(), 'external-receiver': addons.ExternalTaskReceiver(), } def register_task(self, func, **options) -> celery.Task: wrapped_func, options = self.executor.riberry_task_executor_wrapper(func=func, task_options=options) return self.instance.task(**options)(wrapped_func) def task_by_name(self, name: AnyStr): return self.instance.tasks[name] def start_execution(self, execution_id, root_id, entry_point) -> AnyStr: task = self.task_by_name(riberry.app.RiberryApplication.ENTRY_POINT_TASK_NAME) task_signature = task.si(execution_id=execution_id, form=entry_point.form) callback_success = tasks.execution_complete.si(status='SUCCESS', stream=entry_point.stream) callback_failure = tasks.execution_complete.si(status='FAILURE', stream=entry_point.stream) task_signature.options['root_id'] = root_id callback_success.options['root_id'] = root_id callback_failure.options['root_id'] = root_id exec_signature = task_signature.on_error(callback_failure) | callback_success exec_signature.options['root_id'] = root_id riberry.app.util.events.create_event( name='stream', root_id=root_id, task_id=root_id, data={ 'stream': entry_point.stream, 'state': 'QUEUED', } ) return exec_signature.apply_async().id def create_receiver_task(self, external_task_id, validator): return self.task_by_name(riberry.app.RiberryApplication.CHECK_EXTERNAL_TASK_NAME).si( external_task_id=external_task_id, validator=validator, ) def active_task(self): return celery.current_task PK!еihh,riberry/app/backends/impl/celery/executor.pyfrom celery import exceptions as celery_exc, current_task import riberry from riberry.app import actions from riberry.app.misc.signals import task_prerun, task_postrun from .extension import RiberryTask class ExecutionComplete(Exception): pass IGNORE_EXCEPTIONS = ( celery_exc.Retry, celery_exc.SoftTimeLimitExceeded, celery_exc.TimeLimitExceeded, ) def _retry_types(task_options): return tuple(list(IGNORE_EXCEPTIONS) + task_options.get('autoretry_for', [])) def _attempt_fallback(exc, task_options): fallback_on_error_provided, fallback_on_error = ( 'rib_fallback' in task_options, task_options.get('rib_fallback') ) if not fallback_on_error_provided: actions.artifacts.create_artifact_from_traceback(category='Fatal') raise exc try: result = fallback_on_error() if callable(fallback_on_error) else fallback_on_error actions.artifacts.create_artifact_from_traceback(category='Intercepted') return result except: actions.artifacts.create_artifact_from_traceback(category='Fatal (intercept failed)') raise exc class TaskExecutor: @property def riberry_app(self): return riberry.app.env.current_riberry_app def external_task_executor(self): def _external_task_executor(external_task_id, validator): external_task: riberry.model.job.JobExecutionExternalTask = riberry.model.job.JobExecutionExternalTask.query().filter_by( task_id=external_task_id, ).first() task: RiberryTask = self.riberry_app.context.current.task if external_task: if external_task.status == 'WAITING': raise task.retry(countdown=1) elif external_task.status == 'READY': output_data = external_task.output_data if isinstance(output_data, bytes): output_data = output_data.decode() outcomes = self.riberry_app.context.event_registry.call( event_type=self.riberry_app.context.event_registry.types.on_external_result_received, key=validator, kwargs=dict( external_task=external_task, result=output_data, ) ) if outcomes: assert len(outcomes) == 1, f'Multiple callbacks triggered for {validator}' outcome = outcomes[0] if outcome and outcome.retry: external_task.status = 'WAITING' external_task.input_data = outcome.input_data riberry.model.conn.commit() raise task.retry(countdown=1) external_task.status = 'COMPLETE' riberry.model.conn.commit() return output_data return _external_task_executor def entry_point_executor(self): def _entry_point_executor(execution_id, form: str): entry_point = self.riberry_app.entry_points[form] actions.executions.execution_started( task=self.riberry_app.context.current.task, job_id=execution_id, primary_stream=entry_point.stream, ) entry_point.func() return _entry_point_executor def riberry_task_executor(self, func, func_args, func_kwargs, task_options): riberry_properties = {} for key, value in list(func_kwargs.items()): if key.startswith('__rib_'): riberry_properties[key.replace('__rib_', '', 1)] = func_kwargs.pop(key) with riberry.model.conn: with self.riberry_app.context.scope( root_id=current_task.request.root_id, task_id=current_task.request.id, stream=riberry_properties.get('stream'), step=riberry_properties.get('step'), category=riberry_properties.get('category'), ): state = None mark_workflow_complete = False try: task_prerun(context=self.riberry_app.context, props=riberry_properties) result = self._execute_task(func, func_args, func_kwargs) state = 'SUCCESS' return result except ExecutionComplete: state = 'FAILURE' raise except celery_exc.Ignore: state = 'IGNORED' raise except Exception as exc: state = 'FAILURE' mark_workflow_complete = True if isinstance(exc, _retry_types(task_options=task_options)) and not self._max_retries_reached(exc): state = None mark_workflow_complete = False raise result = _attempt_fallback(exc, task_options=task_options) mark_workflow_complete = False state = 'SUCCESS' return result finally: if mark_workflow_complete: actions.executions.execution_complete( task_id=self.riberry_app.context.current.task_id, root_id=self.riberry_app.context.current.root_id, status=state, stream=None, ) if state is not None: task_postrun(context=self.riberry_app.context, props=riberry_properties, state=state) def _max_retries_reached(self, exc): active_task = self.riberry_app.context.current.task return bool( not isinstance(exc, celery_exc.Ignore) and active_task.max_retries is not None and active_task.request.retries >= active_task.max_retries ) def _execute_task(self, func, args, kwargs): job_execution = self.riberry_app.context.current.job_execution if job_execution.status in ('FAILURE', 'SUCCESS'): raise ExecutionComplete(f'Execution {job_execution!r} is already marked as complete') return func(*args, **kwargs) def riberry_task_executor_wrapper(self, func, task_options): def wrapped_function(*args, **kwargs): return self.riberry_task_executor( func=func, func_args=args, func_kwargs=kwargs, task_options=task_options, ) if 'name' not in task_options: task_options['name'] = riberry.app.util.misc.function_path(func=func) task_options['base'] = task_options.get('base') or RiberryTask return wrapped_function, task_options PK!$R-riberry/app/backends/impl/celery/extension.pyimport celery import riberry class RiberryTask(celery.Task): def signature(self, args=None, *starargs, **starkwargs): sig = celery.Task.signature(self, args, *starargs, **starkwargs) context = riberry.app.current_context stream = context.flow.scoped_stream or context.current.stream if stream: sig['kwargs']['__rib_stream'] = stream if '__rib_step' not in sig['kwargs']: sig['kwargs']['__rib_step'] = self.name return sig PK!Yo??)riberry/app/backends/impl/celery/patch.pyimport types import celery def patch_send_task(instance: celery.Celery, func): send_task_original = instance.send_task def send_task(self, *args, **kwargs): func(self, *args, **kwargs) return send_task_original(*args, **kwargs) instance.send_task = types.MethodType(send_task, instance) PK!rHc{{)riberry/app/backends/impl/celery/tasks.pyimport riberry from celery import shared_task @shared_task(name='riberry.core.execution_complete', bind=True, ignore_result=True) def execution_complete(task, status, stream): with riberry.model.conn: return riberry.app.actions.executions.execution_complete( task_id=task.request.id, root_id=task.request.root_id, status=status, stream=stream ) PK! riberry/app/base.pyimport riberry class RiberryApplication: __registered__ = {} ENTRY_POINT_TASK_NAME = 'riberry.core.app.entry_point' CHECK_EXTERNAL_TASK_NAME = 'riberry.core.app.check_external_task' def __init__(self, name, backend, addons=None): self.name = name self.__registered__[self.name] = self self.context: riberry.app.context.Context = riberry.app.context.Context() self.entry_points = {} self.backend: riberry.app.backends.RiberryApplicationBackend = backend self.backend.initialize() self.addons = { **self.backend.default_addons(), **(addons or {}) } for addon in self.addons.values(): if addon is not None: addon.register(riberry_app=self) @classmethod def by_name(cls, name): return cls.__registered__[name] def entry_point(self, form, stream='Overall', step='Entry'): def wrapper(func): self.entry_points[form] = EntryPoint( form=form, func=func, stream=stream, step=step, ) return wrapper def task(self, func=None, **options): return self.backend.task(func=func, **options) def register_task(self, func, **options): return self.backend.register_task(func=func, **options) def start(self, execution_id, root_id, form) -> str: if form not in self.entry_points: raise ValueError(f'Application {self.name!r} does not have an entry point with ' f'name {form!r} registered.') entry_point: EntryPoint = self.entry_points[form] with self.context.flow.stream_scope(stream=entry_point.stream): return self.backend.start_execution(execution_id=execution_id, root_id=root_id, entry_point=entry_point) class EntryPoint: def __init__(self, form, func, stream, step): self.form = form self.func = func self.stream = stream self.step = step PK!!Kriberry/app/context/__init__.pyfrom contextlib import contextmanager import riberry from .artifact import Artifact from .current import ContextCurrent from .event_registry import EventRegistry, EventRegistryHelper from .external_task import ExternalTask from .flow import Flow from .input_mapping import InputMappings from .report import Report from .shared_data import SharedExecutionData class Context: def __init__(self): self.current = ContextCurrent(context=self) self.input = InputMappings(context=self) self.data = SharedExecutionData(context=self) self.flow = Flow(context=self) self.artifact = Artifact() self.report = Report(context=self) self.external_task = ExternalTask(context=self) self.event_registry = EventRegistry(context=self) self.on = EventRegistryHelper(context=self) @contextmanager def scope(self, root_id, task_id, stream, category, step): with self.current.scope( root_id=root_id, task_id=task_id, stream=stream, category=category, step=step, ): yield def spawn(self, form_name, job_name=None, input_values=None, input_files=None, owner=None, execute=True): return riberry.app.actions.jobs.create_job( form_name=form_name, job_name=job_name, input_files=input_files, input_values=input_values, owner=owner, execute=execute, ) PK!wZ88riberry/app/context/artifact.pyfrom typing import Union, Optional import riberry from riberry.model.job import ArtifactType class Artifact: @staticmethod def create( filename: str, content: Union[bytes, str], name: str = None, type: Union[str, ArtifactType] = ArtifactType.output, category='Default', data: dict = None, ): return riberry.app.actions.artifacts.create_artifact( filename=filename, content=content, name=name, type=type, category=category, data=data, ) @staticmethod def create_from_traceback( name: Optional[str] = None, filename: Optional[str] = None, category: str = 'Intercepted', type: riberry.model.job.ArtifactType = riberry.model.job.ArtifactType.error, ): return riberry.app.actions.artifacts.create_artifact_from_traceback( name=name, filename=filename, type=type, category=category, ) PK! = riberry/app/context/current.pyimport uuid from contextlib import contextmanager from typing import Optional import riberry class ContextCurrent: def __init__(self, context): self.context: riberry.app.context.Context = context self._task_stream = None self._task_category = None self._task_step = None self._root_id = None self._task_id = None self._worker_uuid = str(uuid.uuid4()) @property def WORKER_UUID(self): return self._worker_uuid @property def stream(self): return self._task_stream @property def step(self): return self._task_step @property def backend(self) -> 'riberry.app.backends.RiberryApplicationBackend': return self.riberry_app.backend @property def task(self): return self.backend.active_task() @property def riberry_app(self) -> 'riberry.app.RiberryApplication': return riberry.app.current_riberry_app @property def riberry_app_instance(self) -> riberry.model.application.ApplicationInstance: return riberry.app.env.get_instance_model() @property def task_id(self): return self._task_id @property def root_id(self): return self._root_id @property def job_execution(self) -> Optional[riberry.model.job.JobExecution]: return riberry.model.job.JobExecution.query().filter_by(task_id=self.root_id).first() @property def job(self) -> Optional[riberry.model.job.Job]: job_execution = self.job_execution return job_execution.job if self.job_execution else None @contextmanager def scope(self, root_id, task_id, stream, category, step): try: self._root_id, self._task_id = root_id, task_id self._task_stream, self._task_category, self._task_step = stream, category, step yield finally: self._root_id, self._task_id = None, None self._task_stream, self._task_category, self._task_step = None, None, None @property def progress(self) -> str: progress: riberry.model.job.JobExecutionProgress = riberry.model.job.JobExecutionProgress.query().filter_by( job_execution=self.job_execution, ).order_by( riberry.model.job.JobExecutionProgress.id.desc(), ).limit( 1 ).first() return progress.message if progress else None @progress.setter def progress(self, message: str): if message == self.progress: return progress = riberry.model.job.JobExecutionProgress( job_execution=self.job_execution, message=message, ) riberry.model.conn.add(progress) riberry.model.conn.commit() PK!_ln %riberry/app/context/event_registry.pyimport enum from collections import defaultdict from functools import partial import riberry class EventRegistryTypes(enum.Enum): on_completion = 'on_completion' on_data_updated = 'on_data_update' on_report_refresh = 'on_report_refresh' on_external_result_received = 'on_external_result_received' class EventRegistry: types: EventRegistryTypes = EventRegistryTypes def __init__(self, context): self.context: riberry.app.context.Context = context self._registrations = defaultdict(set) @staticmethod def _make_key(func, **key): if not key and func: return tuple([ ('key', riberry.app.util.misc.function_path(func)) ]) return tuple(sorted(key.items())) def register(self, event_type: EventRegistryTypes, **key): def inner(func): formatted_key = self._make_key(func, **key) self._registrations[(event_type, formatted_key)].add(func) return func return inner def get(self, event_type: EventRegistryTypes, **key): key = self._make_key(func=None, **key) return self._registrations[(event_type, key)] def call(self, event_type: EventRegistryTypes, args=None, kwargs=None, **key): functions = self.get(event_type=event_type, **key) return [function(*args or (), **kwargs or {}) for function in functions] class EventRegistryHelper: def __init__(self, context): self.context: riberry.app.context.Context = context @property def _register(self): return self.context.event_registry.register def _register_single_execution_function(self, func, event_type, key, registration_key=None): key = riberry.app.util.misc.internal_data_key(key=f'once.{key}') func = partial(self.context.data.execute_once, key=key, func=func) return self._register(event_type=event_type, **(registration_key or {}))(func) def execution_failed(self, func): return self._register_single_execution_function( func=func, event_type=EventRegistryTypes.on_completion, key='execution_failed', registration_key=dict( status='FAILURE', ) ) def execution_succeeded(self, func): return self._register_single_execution_function( func=func, event_type=EventRegistryTypes.on_completion, key='execution_succeeded', registration_key=dict( status='SUCCESS', ) ) def data_updated(self, name): return self._register(event_type=EventRegistryTypes.on_data_updated, data_name=name) def external_result_received(self, func): return self._register(event_type=EventRegistryTypes.on_external_result_received)(func) def report_refresh(self, report, bindings, renderer=None): def inner(func): def refresh(): self.context.report.update(report=report, body=func(), renderer=renderer) self._register(event_type=EventRegistryTypes.on_report_refresh, report=report)(refresh) for binding in bindings: self._register(event_type=EventRegistryTypes.on_data_updated, data_name=binding)(refresh) return func return inner PK!hPlj$riberry/app/context/external_task.pyimport inspect from typing import Optional import riberry class ExternalTask: def __init__(self, context): self.context: riberry.app.context.Context = context def create( self, name: str = None, type: str = 'external', external_task_id: Optional[str] = None, input_data: Optional[bytes] = None ): external_task = riberry.app.actions.external_task.create_external_task( job_execution=self.context.current.job_execution, name=name, type=type, external_task_id=external_task_id, input_data=input_data, ) return ExternalTaskCreationResult(context=self.context, instance=external_task) def create_receiver_task(self, external_task_id, validator): if inspect.isfunction(validator): validator = riberry.app.util.misc.function_path(validator) return self.context.current.riberry_app.backend.create_receiver_task( external_task_id=external_task_id, validator=validator, ) @staticmethod def mark_as_ready(external_task_id, output_data): return riberry.app.actions.external_task.mark_as_ready( external_task_id=external_task_id, output_data=output_data ) class ExternalTaskCreationResult: def __init__(self, context, instance): self.context: riberry.app.context.Context = context self.instance = instance self.task_id = instance.task_id def create_receiver_task(self, validator=None): return self.context.external_task.create_receiver_task(external_task_id=self.task_id, validator=validator) PK!PFXffriberry/app/context/flow.pyfrom contextlib import contextmanager import riberry class Flow: def __init__(self, context): self.context: riberry.app.context.Context = context self.scoped_stream = None self.scoped_category = None @contextmanager def stream_scope(self, stream=None, category=None): try: self.scoped_stream = stream self.scoped_category = category yield self finally: self.scoped_stream = None self.scoped_category = None def start(self, task, stream: str = None): return TaskWrap( task, __rib_stream_start=True, __rib_stream=self.cleanse_stream_name(stream), ) def step(self, task, step: str = None, stream: str = None): return TaskWrap( task, __rib_step=step, __rib_stream=self.cleanse_stream_name(stream), ) def end(self, task, stream: str = None): return TaskWrap( task, __rib_stream_end=True, __rib_stream=self.cleanse_stream_name(stream), ) def cleanse_stream_name(self, stream: str): stream = stream or self.scoped_stream if not stream: raise ValueError('Stream name cannot be blank') return str(stream) class TaskWrap: def __init__(self, func, **kwargs): self.func = func self.kwargs = kwargs @property def name(self): return self.func.name def _mixin_kw(self, kwargs): return {**self.kwargs, **kwargs} def s(self, *args, **kwargs): return self.func.s(*args, **self._mixin_kw(kwargs=kwargs)) def si(self, *args, **kwargs): return self.func.si(*args, **self._mixin_kw(kwargs=kwargs)) def delay(self, *args, **kwargs): return self.func.delay(*args, **self._mixin_kw(kwargs=kwargs)) PK!;&zK K $riberry/app/context/input_mapping.pyimport csv import io import json from collections import Mapping from operator import itemgetter from typing import Union, AnyStr, Iterator, Any, Dict import riberry class InputMappings: def __init__(self, context): self.values = InputValueMapping(context=context) self.files = InputFileMapping(context=context) class InputMapping(Mapping): def __init__(self, context, cls): self.context: riberry.app.context.Context = context self.cls = cls def _get_value(self, instance): raise NotImplementedError def get(self, item, default=None): if isinstance(item, str): return super().get(item, default=default) return [super(InputMapping, self).get(_, default) for _ in item] def __getitem__(self, item: Union[AnyStr, Iterator[AnyStr]]) -> Any: query = self.cls.query().filter(self.cls.job == self.context.current.job) if isinstance(item, str): instance = query.filter_by(internal_name=item).first() if not instance: raise KeyError(item) return self._get_value(instance) else: instances = query.filter(self.cls.internal_name.in_(item)).all() mapping = {instance.internal_name: self._get_value(instance) for instance in instances} return tuple(mapping[key] for key in item) def __len__(self) -> int: return self.cls.query().filter_by(job=self.context.current.job).count() def __iter__(self) -> Iterator[AnyStr]: instances = riberry.model.conn.query( self.cls.internal_name ).filter_by( job=self.context.current.job ).all() return map(itemgetter(0), instances) @property def dict(self) -> Dict[AnyStr, Any]: return { instance.internal_name: instance.value for instance in self.cls.query().filter_by(job=self.context.current.job).all() } class InputValueMapping(InputMapping): def __init__(self, context): super().__init__(context=context, cls=riberry.model.interface.InputValueInstance) def _get_value(self, instance): return instance.value class InputFileMapping(InputMapping): def __init__(self, context): super().__init__(context=context, cls=riberry.model.interface.InputFileInstance) def _get_value(self, instance): return InputFileReader(instance) def __getitem__(self, item: Union[AnyStr, Iterator[AnyStr]]) -> Union[ 'InputFileReader', Iterator['InputFileReader']]: return super().__getitem__(item) class InputFileReader: instance: riberry.model.interface.InputFileInstance def __init__(self, file_instance): self.instance = file_instance @property def filename(self): return self.instance.filename @property def size(self): return self.instance.size def bytes(self) -> bytes: return self.instance.binary def text(self, *args, **kwargs) -> str: return self.bytes().decode(*args, **kwargs) def json(self, *args, **kwargs): return json.loads(self.bytes(), *args, **kwargs) def csv(self, *args, **kwargs): reader = csv.DictReader(io.StringIO(self.text()), *args, **kwargs) for row in reader: yield row def __bool__(self): return bool(self.instance.binary) PK!x+riberry/app/context/report.pyimport json from functools import partial import riberry class Report: def __init__(self, context): self.context: riberry.app.context.Context = context def model(self, name): job_execution = self.context.current.job_execution report: riberry.model.job.JobExecutionReport = riberry.model.job.JobExecutionReport.query().filter_by( job_execution=job_execution, name=name, ).first() if not report: self.context.data.execute_once( key=riberry.app.util.misc.internal_data_key(f'once.create_report.{name}'), func=partial(self._create_report, name=name, job_execution=job_execution) ) return self.model(name=name) return report @staticmethod def _create_report(name, job_execution): report = riberry.model.job.JobExecutionReport(job_execution=job_execution, name=name) riberry.model.conn.add(report) riberry.model.conn.commit() def mark_for_refresh(self, name): report = self.model(name=name) report.marked_for_refresh = True riberry.model.conn.commit() def update(self, report, body, renderer=None): model = self.model(name=report) model.marked_for_refresh = False model.renderer = renderer or model.renderer model.report = json.dumps(body).encode() riberry.model.conn.commit() PK!Ӓ"riberry/app/context/shared_data.pyimport datetime from collections import Mapping, defaultdict from contextlib import contextmanager from operator import itemgetter from typing import AnyStr, Any, Dict, Iterator import pendulum import time from sqlalchemy.exc import IntegrityError import riberry class SharedExecutionData(Mapping): def __init__(self, context): self.context: riberry.app.context.Context = context self._lock: Dict[AnyStr, riberry.model.misc.ResourceData] = {} self._dirty = set() self._listeners = defaultdict(list) def listen(self, key, callback): self._listeners[key].append(callback) def __getitem__(self, item: AnyStr) -> Any: return self._get_instance(key=item).value def __setitem__(self, key, value): if key not in self._lock: raise PermissionError(f'No lock present for data key {key!r}') if pendulum.DateTime.utcnow() > pendulum.instance(self._lock[key].expiry): raise TimeoutError(f'Lock for data key {key!r} has expired!') self._lock[key].value = value self._dirty.add(key) def __delitem__(self, key): instance = self._get_instance(key=key) riberry.model.conn.delete(instance=instance) riberry.model.conn.commit() def __len__(self) -> int: return riberry.model.misc.ResourceData.query().filter_by( resource_id=self.context.current.job_execution.id, resource_type=riberry.model.misc.ResourceType.job_execution, ).count() def __iter__(self) -> Iterator[AnyStr]: instances = riberry.model.conn.query( riberry.model.misc.ResourceData.name ).filter_by( resource_id=self.context.current.job_execution.id, resource_type=riberry.model.misc.ResourceType.job_execution, ).all() return map(itemgetter(0), instances) def _get_instance(self, key): job_execution = self.context.current.job_execution # check if key exists instance = riberry.model.misc.ResourceData.query().filter_by( resource_id=job_execution.id, resource_type=riberry.model.misc.ResourceType.job_execution, name=key, ).first() # create if it doesn't if not instance: instance = riberry.model.misc.ResourceData( resource_id=job_execution.id, resource_type=riberry.model.misc.ResourceType.job_execution, name=key ) try: riberry.model.conn.add(instance) riberry.model.conn.commit() except IntegrityError: riberry.model.conn.rollback() return self._get_instance(key=key) return instance def _acquire_lock(self, key, ttl, poll_interval): instance = self._get_instance(key=key) lock_value = self.context.current.task_id while True: riberry.model.conn.query(riberry.model.misc.ResourceData).filter( (riberry.model.misc.ResourceData.id == instance.id) & ( (riberry.model.misc.ResourceData.lock == None) | (riberry.model.misc.ResourceData.expiry < datetime.datetime.now(tz=datetime.timezone.utc)) ) ).update({ 'lock': lock_value, 'expiry': pendulum.DateTime.utcnow().add(seconds=ttl) }) riberry.model.conn.commit() riberry.model.conn.expire(instance=instance) if instance.lock != lock_value: time.sleep(poll_interval) else: self._lock[key] = instance break def _release_lock(self, key): if self._lock: instance = self._lock.pop(key) instance.lock = None instance.expiry = None instance.marked_for_refresh = instance.marked_for_refresh or (key in self._dirty) riberry.model.conn.commit() if key in self._dirty: for listener in self._listeners[key]: listener(key) self._dirty.remove(key) @contextmanager def lock(self, key, ttl=60, poll_interval=1): try: yield self._acquire_lock(key=key, ttl=ttl, poll_interval=poll_interval) finally: self._release_lock(key=key) def execute_once(self, key, func): with self.context.data.lock(key=key): if not self.context.data[key]: self.context.data[key] = True func() def set(self, key, value, **kwargs): with self.lock(key=key, **kwargs): self[key] = value() if callable(value) else value PK!qvtriberry/app/env.pyimport os import riberry from .base import RiberryApplication from .context import Context from .util.misc import Proxy __cache = dict( app_name={}, ) __cache_app_name = __cache['app_name'] def get_instance_name(raise_on_none=True) -> str: if 'RIBERRY_INSTANCE' not in os.environ and raise_on_none: raise EnvironmentError("Environment variable 'RIBERRY_INSTANCE' not set") return os.environ.get('RIBERRY_INSTANCE') def get_application_name() -> str: instance_name = get_instance_name(raise_on_none=True) if instance_name not in __cache_app_name: __cache_app_name[instance_name] = get_instance_model().application.internal_name return __cache_app_name[instance_name] def get_instance_model() -> riberry.model.application.ApplicationInstance: return riberry.model.application.ApplicationInstance.query().filter_by( internal_name=get_instance_name(raise_on_none=True), ).one() def is_current_instance(instance_name: str) -> bool: return bool(instance_name and get_instance_name(raise_on_none=False) == instance_name) current_riberry_app: RiberryApplication = Proxy( getter=lambda: RiberryApplication.by_name(name=get_application_name()) ) current_context: Context = Proxy( getter=lambda: current_riberry_app.context ) PK!2riberry/app/misc/__init__.py PK!.9 riberry/app/misc/signals.pyfrom celery import signals import riberry from ..util.events import create_event __start_stream_cache = {} @signals.worker_process_init.connect def worker_process_init(*args, **kwargs): riberry.model.conn.raw_engine.dispose() riberry.model.conn.remove() @signals.celeryd_after_setup.connect def celeryd_after_setup(*args, **kwargs): riberry.model.conn.raw_engine.dispose() riberry.model.conn.remove() @signals.before_task_publish.connect def before_task_publish(sender, headers, body, **_): try: root_id = riberry.app.current_context.current.root_id except: return args, kwargs, *_ = body task_id = headers['id'] if '__rib_stream_start' in kwargs: create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': str(kwargs['__rib_stream']), 'state': 'QUEUED', } ) if '__rib_step' in kwargs: stream, step = kwargs['__rib_stream'], kwargs['__rib_step'] create_event( name='step', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'step': str(step), 'state': 'QUEUED', } ) def task_prerun(context, props): task_id = context.current.task_id root_id = context.current.root_id stream = context.current.stream step = context.current.step if not stream: return if 'stream_start' in props: key = root_id, stream if key not in __start_stream_cache: __start_stream_cache[key] = None if len(__start_stream_cache) > 5000: evict_key = next(iter(__start_stream_cache)) __start_stream_cache.pop(evict_key) create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'state': 'ACTIVE', } ) if step: create_event( name='step', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'step': str(step), 'state': 'ACTIVE', } ) def task_postrun(context, props, state): task_id = context.current.task_id root_id = context.current.root_id stream = context.current.stream step = context.current.step if not stream: return if 'stream_start' in props and state in ('RETRY', 'FAILURE'): create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'state': state, } ) if 'stream_end' in props: create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'state': 'SUCCESS' if state == 'IGNORED' else state or 'FAILURE', } ) if step: create_event( name='step', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'step': str(step), 'state': 'SUCCESS' if state == 'IGNORED' else state or 'FAILURE', } ) PK!}Snnriberry/app/tasks.pyimport pendulum from sqlalchemy import desc, asc import riberry from riberry.app.util import execution_tracker as tracker from . import actions, env def echo(): with riberry.model.conn: app_instance = env.get_instance_model() heartbeat = riberry.model.application.Heartbeat.query().filter_by(instance=app_instance).first() if not heartbeat: heartbeat = riberry.model.application.Heartbeat(instance=app_instance) riberry.model.conn.add(heartbeat) heartbeat.updated = pendulum.DateTime.utcnow() riberry.model.conn.commit() def poll(): with riberry.model.conn: app_instance = env.get_instance_model() tracker.check_stale_execution(app_instance=app_instance) if app_instance.status != 'online': return executions = riberry.model.job.JobExecution.query().filter( riberry.model.job.JobExecution.status == 'RECEIVED' ).join(riberry.model.job.Job).order_by( desc(riberry.model.job.JobExecution.priority), asc(riberry.model.job.JobExecution.created), asc(riberry.model.job.JobExecution.id), ).filter_by(instance=app_instance).all() for execution in executions: execution_task_id = actions.executions.queue_job_execution(execution=execution) print(f'poll - queueing task {execution_task_id}') def refresh(): with riberry.model.conn: app_instance = env.get_instance_model() actions.shared_data.update_all_data_items(app_instance=app_instance) actions.reports.update_all_reports(app_instance=app_instance) PK!~@::riberry/app/util/__init__.pyfrom . import misc, events, execution_tracker, redis_lock PK! ariberry/app/util/events.pyimport json import pendulum import riberry def create_event(name, root_id, task_id, data=None, binary=None): if not root_id: return if isinstance(binary, str): binary = binary.encode() evt = riberry.model.misc.Event( name=name, time=pendulum.DateTime.utcnow().timestamp(), task_id=task_id, root_id=root_id, data=json.dumps(data), binary=binary, ) riberry.model.conn.add(evt) riberry.model.conn.commit() riberry.model.conn.flush([evt]) PK!)=#zz%riberry/app/util/execution_tracker.pyfrom typing import List from celery.utils.log import logger from sqlalchemy import desc, asc import riberry def _tracker_key(value): return f'workflow:active:{value}' def start_tracking_execution(root_id): redis = riberry.celery.util.celery_redis_instance() instance = riberry.app.env.get_instance_name() key = _tracker_key(instance) logger.info(f'execution_tracker: Tracking workflow {root_id!r} via key {key!r}') redis.sadd(key, root_id) def check_stale_execution(app_instance): executions: List[riberry.model.job.JobExecution] = riberry.model.job.JobExecution.query().filter( riberry.model.job.JobExecution.status.in_(('ACTIVE', 'READY')) ).join(riberry.model.job.Job).order_by( desc(riberry.model.job.JobExecution.priority), asc(riberry.model.job.JobExecution.created) ).filter_by(instance=app_instance).all() if not executions: return redis = riberry.celery.util.celery_redis_instance() for execution in executions: if not redis.sismember(_tracker_key(app_instance.internal_name), execution.task_id): logger.warn( f'execution_tracker: Identified stale workflow. ' f'Root ID: {execution.task_id}, Execution ID: {execution.id}' ) riberry.app.actions.artifacts.create_artifact( name='Workflow Cancelled', type=riberry.model.job.ArtifactType.error, category='Fatal', filename='fatal.log', content=( f'The current executions\'s ID ({execution.task_id}) was not found within Redis and has ' f'therefore been cancelled. This usually occurs when Redis is flushed while an execution is ' f'in the READY or ACTIVE state.' ), task_id=execution.task_id, root_id=execution.task_id, ) riberry.app.actions.executions.execution_complete( task_id=execution.task_id, root_id=execution.task_id, status='FAILURE', stream=None ) PK!ְ({{riberry/app/util/misc.pyimport inspect class Proxy: def __init__(self, getter): self.get = getter def __getattr__(self, item): return getattr(self.get(), item) def __repr__(self): return f'Proxy({self.get()})' def function_path(func): return f'{inspect.getmodule(func).__name__}.{func.__name__}' def internal_data_key(key): return f'_internal:{key}' PK!~볎riberry/app/util/redis_lock.pyimport time from redis.exceptions import LockError class RedisLock: def __init__(self, name, on_acquired, interval, min_interval=100): self.name = name self.on_acquired = on_acquired self.interval = interval self.min_interval = min_interval @property def key_timeout(self): return f'lock:{self.name}:timeout' @property def key_lock(self): return f'lock:{self.name}:acquire' def run(self, redis_instance): if redis_instance.get(name=self.key_timeout) is None: try: self._attempt_lock(redis_instance=redis_instance) except LockError: pass def _attempt_lock(self, redis_instance): with redis_instance.lock(name=self.key_lock, blocking_timeout=0.0): print(f"{self.name}: acquired lock...") time_start = time.time() try: self.on_acquired() finally: process_time = time.time() - time_start self._set_timeout(redis_instance=redis_instance, process_time=process_time) def _set_timeout(self, redis_instance, process_time): expiry = int(max(self.interval - (process_time * 1000), self.min_interval)) redis_instance.set(name=self.key_timeout, value='1', px=expiry) print(f'{self.name}: processed task in {process_time:03}, setting lock expiry to {expiry:03} milliseconds')PK!09Triberry/celery/__init__.pyfrom . import background, util PK!%riberry/celery/background/__init__.pyfrom celery import Celery from riberry import config app = Celery(main='background-tasks') app.conf.update(config.config.celery) app.conf.beat_schedule.update({ 'process:execution': { 'task': 'riberry.celery.background.tasks.process_events', 'schedule': config.config.background.events.interval, 'kwargs': { 'event_limit': config.config.background.events.processing_limit }, 'options': {'queue': 'riberry.background.events'} }, 'process:job-schedule': { 'task': 'riberry.celery.background.tasks.job_schedules', 'schedule': config.config.background.schedules.interval, 'options': {'queue': 'riberry.background.schedules'} }, 'process:capacity': { 'task': 'riberry.celery.background.tasks.update_capacity_parameters', 'schedule': config.config.background.capacity.interval, 'options': {'queue': 'riberry.background.schedules'} }, }) app.conf.imports = list(app.conf.imports) + ['riberry.celery.background.tasks'] def register_task(task_path, schedule): app.conf.beat_schedule[task_path] = { 'task': 'riberry.celery.background.tasks.custom_task', 'schedule': schedule, 'args': [task_path], 'options': {'queue': 'riberry.background.custom'} } PK!  ,riberry/celery/background/capacity_config.pyfrom collections import Counter, defaultdict from enum import Enum from typing import Optional, Dict, List, Set import itertools from celery.utils.log import logger from riberry import model class ConsumerStatus(Enum): active = 'active' inactive = 'inactive' class CapacityConsumer: def __init__(self, name: str, status: ConsumerStatus, requested_capacity: Optional[int] = None): self.name = name self.status = status self.requested_capacity = requested_capacity def __repr__(self): return f'CapacityConsumer(name={self.name!r})' @staticmethod def total_capacity(consumers, default=0): return sum(c.requested_capacity or default for c in consumers if c.status == ConsumerStatus.active) @classmethod def distribute(cls, consumers, total_capacity) -> Dict['CapacityConsumer', List[int]]: total_requested_capacity = cls.total_capacity(consumers=consumers, default=total_capacity) or total_capacity return { consumer: ( [(consumer.requested_capacity or total_capacity) / total_requested_capacity * total_capacity] if consumer.status == ConsumerStatus.active else [0] ) for consumer in consumers } @classmethod def from_weight_parameter(cls, parameter_name: str): schedules: List[model.application.ApplicationInstanceSchedule] = \ model.application.ApplicationInstanceSchedule.query().filter_by(parameter=parameter_name).all() instances: Set[model.application.ApplicationInstance] = {sched.instance for sched in schedules} schedule_values = { instance.internal_name: int(instance.active_schedule_value(name=parameter_name, default=0)) for instance in instances } execution_count = execution_count_for_instances(instances=instances) return [ CapacityConsumer( name=instance.internal_name, status=( ConsumerStatus.active if instance.status == 'online' and execution_count[instance.internal_name] else ConsumerStatus.inactive ), requested_capacity=schedule_values[instance.internal_name] ) for instance in instances ] class CapacityProducer: def __init__(self, name: str, capacity: int): self.name = name self.capacity = capacity def __repr__(self): return f'CapacityProducer(name={self.name!r}, capacity={self.capacity})' @staticmethod def total_capacity(producers): return sum(p.capacity for p in producers) @staticmethod def producers_name_pool(producers, distribution_strategy: model.application.CapacityDistributionStrategy): name_lists = sorted( [[producer.name] * producer.capacity for producer in producers], key=len, reverse=True ) if distribution_strategy == model.application.CapacityDistributionStrategy.spread: name_lists = [filter(None, name_list) for name_list in itertools.zip_longest(*name_lists)] return list(itertools.chain.from_iterable(name_lists)) def weighted_schedules(parameter_name: str): schedules = model.application.ApplicationInstanceSchedule.query().filter_by(parameter=parameter_name).all() return schedules def execution_count_for_instances(instances, states=('ACTIVE', 'READY')): execution_count = defaultdict(int) job_executions: List[model.job.JobExecution] = model.job.JobExecution.query().filter( model.job.JobExecution.status.in_(states)).all() for job_execution in job_executions: instance = job_execution.job.instance if instance in instances: execution_count[instance.internal_name] += 1 return execution_count def update_instance_schedule( instance: model.application.ApplicationInstance, capacity: int, producer_allocations: Counter, allocation_config_name: str, capacity_config_name: str ): for sched in list(instance.schedules): if sched.parameter in (capacity_config_name, allocation_config_name): model.conn.delete(sched) schedule_capacity = model.application.ApplicationInstanceSchedule( instance=instance, parameter=capacity_config_name, value=str(capacity), priority=100, ) schedule_allocation = model.application.ApplicationInstanceSchedule( instance=instance, parameter=allocation_config_name, value=' '.join(f'{k}|{v}' for k, v in sorted(producer_allocations.items())), priority=101, ) model.conn.add(schedule_capacity) model.conn.add(schedule_allocation) def update_instance_capacities( producers, weight_parameter, capacity_parameter, producer_parameter, distribution_strategy): consumers = CapacityConsumer.from_weight_parameter(parameter_name=weight_parameter) total_capacity = CapacityProducer.total_capacity(producers=producers) capacity_distribution = CapacityConsumer.distribute(consumers=consumers, total_capacity=total_capacity) producer_name_pool = CapacityProducer.producers_name_pool( producers=producers, distribution_strategy=distribution_strategy) logger.info(f'[{weight_parameter}] Total capacity: {total_capacity}') for consumer, capacities in sorted(capacity_distribution.items(), key=lambda x: x[0].name): raw_capacity = round(sum(capacities)) allocated_names = producer_name_pool[:raw_capacity] producer_name_pool = producer_name_pool[raw_capacity:] capacity = len(allocated_names) producer_allocations = Counter(allocated_names) instance = model.application.ApplicationInstance.query().filter_by(internal_name=consumer.name).one() update_instance_schedule( instance=instance, capacity=capacity, producer_allocations=producer_allocations, allocation_config_name=producer_parameter, capacity_config_name=capacity_parameter, ) allocations_formatted = ', '.join([f'{k}: {v:2}' for k, v in sorted(producer_allocations.items())]) or '-' logger.info( f'[{weight_parameter}] {consumer.name} -> capacity: {capacity:2}, allocations: [ {allocations_formatted} ]') PK!,riberry/celery/background/events/__init__.pyPK!`8 44*riberry/celery/background/events/events.pyimport json import smtplib import traceback from collections import defaultdict from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import List import pendulum from sqlalchemy.orm.exc import NoResultFound from riberry import model, config from celery.utils.log import logger def email_notification(host, body, mime_type, subject, sender, recipients: List): try: msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = sender msg['To'] = ', '.join(recipients) msg.attach(MIMEText(body, mime_type)) s = smtplib.SMTP(host) s.sendmail(sender, recipients, msg.as_string()) s.quit() except: logger.warn(f'An error occurred while sending email notification: {traceback.format_exc()}') def handle_artifacts(events: List[model.misc.Event]): job_executions = {} streams = {} to_delete = [] for event in events: try: event_data = json.loads(event.data) stream_name = event_data['stream'] artifact = model.job.JobExecutionArtifact( name=event_data['name'] or 'Untitled', type=model.job.ArtifactType[event_data['type']], category=event_data['category'] or 'Default', filename=event_data['filename'] or 'Untitled', size=len(event.binary) if event.binary else 0, created=pendulum.from_timestamp(event.time), binary=model.job.JobExecutionArtifactBinary(binary=event.binary), data=[ model.job.JobExecutionArtifactData( title=str(title), description=str(description) ) for title, description in event_data['data'].items() if title and description ] ) if event.root_id not in job_executions: try: job_executions[event.root_id] = model.job.JobExecution.query().filter_by(task_id=event.root_id).one() except NoResultFound: to_delete.append(event) continue job_execution = job_executions[event.root_id] artifact.job_execution = job_executions[event.root_id] if stream_name: if (stream_name, event.root_id) not in streams: try: streams[(stream_name, event.root_id)] = model.job.JobExecutionStream.query().filter_by( name=stream_name, job_execution=job_execution).one() except NoResultFound: to_delete.append(event) continue stream = streams[(stream_name, event.root_id)] artifact.stream = stream model.conn.add(artifact) except: logger.warn(f'An error occurred processing artifact event {event}: {traceback.format_exc()}') else: to_delete.append(event) return to_delete def handle_steps(events: List[model.misc.Event]): to_delete = [] steps = {} streams = {} job_executions = {} for event in events: try: event_data = json.loads(event.data) event_time = pendulum.from_timestamp(event.time) stream_name = event_data['stream'] if event.root_id not in job_executions: try: job_executions[event.root_id] = model.job.JobExecution.query().filter_by( task_id=event.root_id).one() except NoResultFound: to_delete.append(event) continue job_execution = job_executions[event.root_id] if (stream_name, event.root_id) not in streams: try: streams[(stream_name, event.root_id)] = model.job.JobExecutionStream.query().filter_by( name=stream_name, job_execution=job_execution).one() except NoResultFound: to_delete.append(event) continue stream = streams[(stream_name, event.root_id)] try: if (stream_name, event.task_id) not in steps: steps[(stream_name, event.task_id)] = model.job.JobExecutionStreamStep.query().filter_by( task_id=event.task_id, stream=stream).one() step = steps[(stream_name, event.task_id)] except NoResultFound: step = model.job.JobExecutionStreamStep( name=event_data['step'], created=pendulum.from_timestamp(event.time), updated=pendulum.from_timestamp(event.time), task_id=event.task_id, stream=stream, status=event_data['state'] ) model.conn.add(step) step_updated = pendulum.instance(step.updated, tz='utc') if event_time >= step_updated: status = event_data['state'] step.status = status step.updated = event_time if status == 'ACTIVE': step.started = event_time elif status in ('SUCCESS', 'FAILURED'): step.completed = event_time except: logger.warn(f'An error occurred processing step event {event}: {traceback.format_exc()}') else: to_delete.append(event) return to_delete def handle_streams(events: List[model.misc.Event]): to_delete = [] streams = {} job_executions = {} for event in events: try: event_data = json.loads(event.data) event_time = pendulum.from_timestamp(event.time, tz='utc') stream_name = event_data['stream'] if not stream_name: logger.warn('Empty stream name provided, skipping') to_delete.append(event) continue if event.root_id not in job_executions: try: job_executions[event.root_id] = model.job.JobExecution.query().filter_by( task_id=event.root_id).one() except NoResultFound: to_delete.append(event) continue job_execution = job_executions[event.root_id] try: if (stream_name, event.root_id) not in streams: streams[(stream_name, event.root_id)] = model.job.JobExecutionStream.query().filter_by( name=stream_name, job_execution=job_execution).one() stream = streams[(stream_name, event.root_id)] except NoResultFound: existing_stream = model.job.JobExecutionStream.query().filter_by(task_id=event.task_id).first() if existing_stream: logger.warn(f'Skipping stream event {event}. Task ID {event.task_id!r} already exists against ' f'an existing stream (id={existing_stream.id}).\n' f'Details:\n' f' root_id: {event.root_id!r}\n' f' name: {stream_name!r}\n' f' data: {event_data}\n') to_delete.append(event) continue stream = model.job.JobExecutionStream( name=str(event_data['stream']), task_id=event.task_id, created=pendulum.from_timestamp(event.time, tz='utc'), updated=pendulum.from_timestamp(event.time, tz='utc'), status='QUEUED', job_execution=job_execution ) model.conn.add(stream) stream_updated = pendulum.instance(stream.updated, tz='utc') if event_time >= stream_updated: status = event_data['state'] stream.status = status stream.updated = event_time if status == 'ACTIVE' and stream.started is None: stream.started = event_time elif status in ('SUCCESS', 'FAILURE'): stream.completed = event_time except: logger.warn(f'An error occurred processing stream event {event}: {traceback.format_exc()}') else: to_delete.append(event) return to_delete def handle_notifications(events: List[model.misc.Event]): to_delete = [] for event in events: event_data = json.loads(event.data) notification_type = event_data['type'] notification_data = event_data['data'] try: execution: model.job.JobExecution = model.job.JobExecution.query().filter_by(task_id=event.root_id).one() user = execution.creator except NoResultFound: to_delete.append(event) continue if notification_type == 'custom-email' and config.config.email.enabled: try: email_notification( host=config.config.email.smtp_server, body=notification_data['body'], mime_type=notification_data.get('mime_type') or 'plain', subject=notification_data['subject'], sender=notification_data.get('from') or config.config.email.sender, recipients=[user.details.email] + notification_data.get('to', []), ) except: logger.warn(f'An error occurred processing notification type {notification_type}: ' f'{traceback.format_exc()}') elif notification_type == 'workflow_complete': status = str(notification_data['status']).lower() message = f'Completed execution #{execution.id} for job ' \ f'{execution.job.name} with status {str(status).lower()}' notification = model.misc.Notification( type=( model.misc.NotificationType.success if str(status).lower() == 'success' else model.misc.NotificationType.error ), message=message, user_notifications=[ model.misc.UserNotification(user=user) ], targets=[ model.misc.NotificationTarget(target='JobExecution', target_id=execution.id) ] ) model.conn.add(notification) if config.config.email.enabled: email_notification( host=config.config.email.smtp_server, body=message, mime_type='plain', subject=f'Riberry / {status.title()} / {execution.job.name} / execution #{execution.id}', sender=config.config.email.sender, recipients=[user.details.email], ) elif notification_type == 'workflow_started': message = f'Processing execution #{execution.id} for job {execution.job.name}' notification = model.misc.Notification( type=model.misc.NotificationType.info, message=message, user_notifications=[ model.misc.UserNotification(user=execution.creator) ], targets=[ model.misc.NotificationTarget(target='JobExecution', target_id=execution.id) ] ) model.conn.add(notification) if config.config.email.enabled: email_notification( host=config.config.email.smtp_server, body=message, mime_type='plain', subject=f'Riberry / Started / {execution.job.name} / execution #{execution.id}', sender=config.config.email.sender, recipients=[user.details.email], ) else: logger.warn(f'Received unknown notification type {notification_type}') to_delete.append(event) return to_delete handlers = { 'stream': handle_streams, 'step': handle_steps, 'artifact': handle_artifacts, 'notify': handle_notifications, } def process(event_limit=None): with model.conn: event_mapping = defaultdict(list) query = model.misc.Event.query().order_by(model.misc.Event.time.asc(), model.misc.Event.id.asc()) if event_limit: query = query.limit(event_limit) events = query.all() if not events: return for event in events: event_mapping[event.name].append(event) to_delete = [] for handler_name, handler_func in handlers.items(): handler_events = event_mapping[handler_name] if handler_events: try: to_delete += handlers[handler_name](handler_events) except: logger.warn(f'Failed to process {handler_name} events: {handler_events}') raise for event in to_delete: logger.info(f'Removing processed event {event}') model.conn.delete(event) model.conn.commit() if __name__ == '__main__': process() PK!O}"riberry/celery/background/tasks.pyimport importlib from riberry import model from riberry.celery.background import capacity_config from riberry.celery.background.events import events from . import app @app.task(ignore_result=True) def process_events(event_limit=None): events.process(event_limit) @app.task(ignore_result=True) def job_schedules(): with model.conn: for schedule in model.job.JobSchedule.query().filter_by(enabled=True).all(): schedule.run() model.conn.commit() @app.task(ignore_result=True) def update_capacity_parameters(): with model.conn: for capacity_configuration in model.application.CapacityConfiguration.query().all(): producers = [ capacity_config.CapacityProducer(producer.internal_name, producer.capacity) for producer in capacity_configuration.producers ] capacity_config.update_instance_capacities( producers=producers, weight_parameter=capacity_configuration.weight_parameter, capacity_parameter=capacity_configuration.capacity_parameter, producer_parameter=capacity_configuration.producer_parameter, distribution_strategy=capacity_configuration.distribution_strategy, ) model.conn.commit() @app.task(ignore_result=True) def custom_task(func_path): module_path, func_name = func_path.split(':') module = importlib.import_module(module_path) func = getattr(module, func_name) func() PK!vPD/D/!riberry/celery/client/__init__.pyimport base64 import functools import os import traceback import uuid import warnings from typing import Dict, Tuple import pendulum from celery import Celery, bootsteps from celery import current_task from celery import exceptions as celery_exc from celery.result import AsyncResult from riberry import model from riberry.celery.client import tasks from riberry.celery.client.dynamic import DynamicParameters from . import wf, signals, scale, dynamic, tracker, control warnings.warn( 'The module riberry.celery.client is deprecated. Please use riberry.celery.app instead', DeprecationWarning, stacklevel=2 ) IGNORE_EXCEPTIONS = ( celery_exc.Ignore, celery_exc.Retry, celery_exc.SoftTimeLimitExceeded, celery_exc.TimeLimitExceeded ) BYPASS_ARGS = ( '__ss__', '__se__', '__sb__' ) def current_instance_name(raise_on_none=False) -> str: name = os.getenv('RIBERRY_INSTANCE') if name is None and raise_on_none: raise EnvironmentError("Environment variable 'RIBERRY_INSTANCE' not set") return name def is_current_instance(instance_name: str) -> bool: return bool(instance_name) and current_instance_name(raise_on_none=False) == instance_name def queue_job_execution(execution: model.job.JobExecution): job = execution.job form = job.form app_instance = job.instance application_name = app_instance.application.internal_name workflow_app = Workflow.by_internal_name(internal_name=application_name) try: execution.status = 'READY' execution.task_id = str(uuid.uuid4()) tracker.start_tracking_execution(root_id=execution.task_id) model.conn.commit() task = workflow_app.start( execution_id=execution.id, root_id=execution.task_id, form=form.internal_name, input_values={v.internal_name: v.value for v in job.values}, input_files={v.internal_name: base64.b64encode(v.binary).decode() for v in job.files} ) except: execution.status = 'FAILURE' message = traceback.format_exc().encode() execution.artifacts.append( model.job.JobExecutionArtifact( job_execution=execution, name='Error on Startup', type='error', category='Fatal', filename='startup-error.log', size=len(message), binary=model.job.JobExecutionArtifactBinary( binary=message ) ) ) model.conn.commit() raise else: return task def workflow_complete(task_id, root_id, status, primary_stream): job: model.job.JobExecution = model.job.JobExecution.query().filter_by(task_id=root_id).first() if not job: return job.task_id = root_id job.status = status job.completed = job.updated = pendulum.DateTime.utcnow() if not job.started: job.started = pendulum.DateTime.utcnow() if primary_stream is None: stream = model.job.JobExecutionStream.query().filter_by(task_id=root_id).first() if stream is not None: primary_stream = stream.name if primary_stream is not None: tasks.create_event( name='stream', root_id=root_id, task_id=root_id, data={ 'stream': primary_stream, 'state': status } ) model.conn.commit() wf.notify( notification_type='workflow_complete', data=dict(status=status), task_id=task_id, root_id=root_id ) def is_workflow_complete(task): root_id = task.request.root_id job: model.job.JobExecution = model.job.JobExecution.query().filter_by(task_id=root_id).first() return job.status in ('FAILURE', 'SUCCESS') if job else True def workflow_started(task, job_id, primary_stream): root_id = task.request.root_id job: model.job.JobExecution = model.job.JobExecution.query().filter_by(id=job_id).one() job.started = job.updated = pendulum.DateTime.utcnow() job.status = 'ACTIVE' job.task_id = root_id task.stream = primary_stream model.conn.commit() tasks.create_event( name='stream', root_id=root_id, task_id=root_id, data={ 'stream': primary_stream, 'state': 'ACTIVE' } ) wf.notify(notification_type='workflow_started') def execute_task(func, func_args, func_kwargs, task_kwargs): # noinspection PyBroadException try: return func(*func_args, **func_kwargs) except tuple(list(IGNORE_EXCEPTIONS) + task_kwargs.get('autoretry_for', [])): raise except Exception as exc: wf.artifact_from_traceback(category='Intercepted' if 'rib_fallback' in task_kwargs else 'Fatal') if 'rib_fallback' in task_kwargs: fallback = task_kwargs.get('rib_fallback') return fallback() if callable(fallback) else fallback else: workflow_complete(current_task.request.id, current_task.request.root_id, status='FAILURE', primary_stream=None) raise def bypass(func, **task_kwargs): @functools.wraps(func) def inner(*args, **kwargs): if not task_kwargs.get('rib_task', True): return func(*args, **kwargs) with model.conn: if is_workflow_complete(current_task): AsyncResult(current_task.request.id).revoke() raise Exception('Workflow already cancelled') filtered_kwargs = {k: v for k, v in kwargs.items() if k not in BYPASS_ARGS} return execute_task( func=func, func_args=args, func_kwargs=filtered_kwargs, task_kwargs=task_kwargs ) return inner def patch_task(task): def stream_start(stream: str = None): return wf.s(task, stream=stream) def stream_end(stream: str = None): return wf.e(task, stream=stream) def step(step: str = None, stream: str = None): return wf.b(task, step=step, stream=stream) task.stream_start = stream_start task.stream_end = stream_end task.step = step return task def patch_app(app): def task_deco(*args, **kwargs): if len(args) == 1: if callable(args[0]): return patch_task(Celery.task(app, bypass(*args, **kwargs), **kwargs)) raise TypeError('argument 1 to @task() must be a callable') def inner(func): return patch_task(Celery.task(app, **kwargs)(bypass(func, **kwargs))) return inner app.task = task_deco app.task(name='check-external-task', bind=True, max_retries=None)(wf.poll_external_task) return app class WorkflowEntry: def __init__(self, name, func, primary_stream, primary_step): self.name = name self.func = func self.primary_stream = primary_stream self.primary_step = primary_step class Workflow: __registered__ = {} def __init__(self, name, app, beat_queue=None, event_queue=None, scalable_queues=None, dynamic_parameters=None): self.name = name self.beat_queue = beat_queue or 'rib.beat' self.event_queue = event_queue or 'rib.event' self.__registered__[name] = self self.app = patch_app(app) self.scale = scale.ConcurrencyScale(self.app, target_queues=scalable_queues) if scalable_queues else None self.form_entries: Dict[Tuple[str, str], WorkflowEntry] = {} self.entry_point = self._make_entry_point(self.app, self.form_entries) self._extend_cli(app) self._configure_beat_queues(app, self.beat_queue) self._configure_event_queue(app, self.event_queue) self._configure_manual_task_queue(app) self.dynamic_parameters = DynamicParameters( riberry_workflow=self, handlers=dynamic_parameters, beat_queue=self.beat_queue ) @classmethod def by_internal_name(cls, internal_name): return Workflow.__registered__[internal_name] @staticmethod def _configure_beat_queues(app, beat_queue): schedule = { 'poll-executions': { 'task': 'riberry.celery.client.tasks.poll', 'schedule': 2, 'options': {'queue': beat_queue} }, 'echo-status': { 'task': 'riberry.celery.client.tasks.echo', 'schedule': 2, 'options': {'queue': beat_queue} } } if not app.conf.beat_schedule: app.conf.beat_schedule = {} app.conf.beat_schedule.update(schedule) @staticmethod def _configure_event_queue(app, event_queue): task_routes = { 'riberry.celery.client.tasks.event': {'queue': event_queue}, } if not app.conf.task_routes: app.conf.task_routes = {} app.conf.task_routes.update(task_routes) @staticmethod def _configure_manual_task_queue(app): task_routes = { 'check-external-task': {'queue': 'rib.manual'}, } if not app.conf.task_routes: app.conf.task_routes = {} app.conf.task_routes.update(task_routes) @staticmethod def _extend_cli(app): def rib_instance_cli(parser): parser.add_argument('--rib-instance', default=None, help='Defines the Riberry instance') class RiberryInstanceStep(bootsteps.StartStopStep): def __init__(self, worker, rib_instance=None, **options): super(RiberryInstanceStep, self).__init__(worker, **options) self.rib_instance = rib_instance or os.getenv('RIBERRY_INSTANCE') def start(self, worker): os.environ['RIBERRY_INSTANCE'] = self.rib_instance if not os.environ.get('RIBERRY_TESTSUITE'): app.user_options['worker'].add(rib_instance_cli) app.steps['worker'].add(RiberryInstanceStep) @staticmethod def _make_entry_point(app, form_entries: Dict[Tuple[str, str], WorkflowEntry]): @app.task(bind=True) def entry_point(task, execution_id, name: str, values: Dict, files: Dict): with model.conn: workflow_entry = form_entries[name] workflow_started(task, execution_id, workflow_entry.primary_stream) workflow_entry.func(task, **values, **files) return entry_point def entry(self, name, primary_stream='Overall', primary_step='Entry'): def wrapper(func): self.form_entries[name] = WorkflowEntry( name=name, func=func, primary_stream=primary_stream, primary_step=primary_step, ) return wrapper def start(self, execution_id, root_id, input_name, input_values, input_files): if input_name not in self.form_entries: raise ValueError(f'Application {self.name!r} does not have an entry point with ' f'name {input_name!r} registered.') workflow_entry: WorkflowEntry = self.form_entries[input_name] with wf.stream_context(stream=workflow_entry.primary_stream): body = wf.b(self.entry_point, step=workflow_entry.primary_step).si( execution_id=execution_id, name=input_name, values=input_values, files=input_files, ) callback_success = tasks.workflow_complete.si(status='SUCCESS', primary_stream=workflow_entry.primary_stream) callback_failure = tasks.workflow_complete.si(status='FAILURE', primary_stream=workflow_entry.primary_stream) body.options['root_id'] = root_id callback_success.options['root_id'] = root_id callback_failure.options['root_id'] = root_id task = body.on_error(callback_failure) | callback_success task.options['root_id'] = root_id return task.apply_async() PK!4## riberry/celery/client/control.pyimport os from celery.worker import control @control.control_command() def toggle_external_task_queue(state, operation): if os.environ.get('RIBERRY_EXTERNAL_TASK'): queues = {q.name for q in state.consumer.task_consumer.queues} if operation == 'add' and 'rib.manual' not in queues: state.consumer.add_task_queue('rib.manual') return True if operation == 'cancel' and 'rib.manual' in queues: state.consumer.cancel_task_queue('rib.manual') return True return None PK!f3 )riberry/celery/client/dynamic/__init__.pyfrom typing import List from celery import current_app from celery.utils.log import logger from riberry import model from riberry.celery import client class DynamicParameter: def __init__(self, parameter): self.parameter = parameter def on_received(self, instance: model.application.ApplicationInstance, value: str): raise NotImplementedError def make_dynamic_parameters_task(handlers: List[DynamicParameter]): def dynamic_parameters_task(): if not handlers: return with model.conn: instance: model.application.ApplicationInstance = model.application.ApplicationInstance.query().filter_by( internal_name=client.current_instance_name(raise_on_none=True) ).one() active_schedules = instance.active_schedules handler_mapping = {h.parameter: h for h in handlers} for parameter, schedule in sorted(active_schedules.items(), key=lambda item: -item[1].priority if item[1] else 0): if parameter in handler_mapping: value = schedule.value if schedule else None logger.info(f'dynamic-parameters: updating dynamic parameter {parameter!r} with value {value!r}') try: handler_mapping[parameter].on_received(instance=instance, value=value) except: logger.exception(f'dynamic-parameters: {parameter!r} failed') dynamic_parameters_task.__name__ = 'dynamic' return dynamic_parameters_task class DynamicParameters: all_apps = {} def __init__(self, riberry_workflow, handlers, beat_queue): self.workflow = riberry_workflow self.all_apps[self.workflow.app.main] = self self.workflow.app.task(name='dynamic-parameters', rib_task=False)(make_dynamic_parameters_task(handlers=handlers)) self._configure_beat_queues(self.workflow.app, beat_queue) @staticmethod def _configure_beat_queues(app, beat_queue): schedule = { 'dynamic-parameters': { 'task': 'dynamic-parameters', 'schedule': 2, 'options': {'queue': beat_queue} }, } if not app.conf.beat_schedule: app.conf.beat_schedule = {} app.conf.beat_schedule.update(schedule) @classmethod def instance(cls) -> 'DynamicParameters': return cls.all_apps.get(current_app.main) PK!~ lG G +riberry/celery/client/dynamic/parameters.pyfrom contextlib import contextmanager from celery import current_app from celery.utils.log import logger from riberry.celery import client from riberry.celery.client.dynamic import DynamicParameter from riberry.celery.client.dynamic.util import PriorityQueue from riberry.celery.client.scale import redis_queues_empty_workers_idle, ConcurrencyScale from riberry.celery.util import celery_redis_instance class DynamicQueues(DynamicParameter): def __init__(self, parameter='active'): super(DynamicQueues, self).__init__(parameter=parameter) def on_received(self, instance, value): scale = ConcurrencyScale.instance() value = str(value).upper() if value == 'N': logger.info('DynamicQueues: app down') current_app.control.broadcast('scale_down', arguments={'instance': client.current_instance_name()}) elif scale and redis_queues_empty_workers_idle(scale.target_queues): logger.info('DynamicQueues: empty queues') current_app.control.broadcast('scale_down', arguments={'instance': client.current_instance_name()}) elif value == 'Y': logger.info('DynamicQueues: scaling up') current_app.control.broadcast('scale_up', arguments={'instance': client.current_instance_name()}) class DynamicConcurrency(DynamicParameter): def __init__(self, parameter='concurrency'): super(DynamicConcurrency, self).__init__(parameter=parameter) def on_received(self, instance, value): current_app.control.broadcast('scale_to', arguments={ 'concurrency': int(value), 'instance': client.current_instance_name() }) class DynamicPriorityParameter(client.dynamic.DynamicParameter): def __init__( self, parameter='hosts', key=None, sep='|', queue_cls=PriorityQueue, blocking: bool=True, block_retry: int=0.5, r=None): super(DynamicPriorityParameter, self).__init__(parameter=parameter) self.r = r or celery_redis_instance() self.sep = sep self.queue = queue_cls( r=self.r, key=key or client.current_instance_name(raise_on_none=True), blocking=blocking, block_retry=block_retry, ) @property def last_value_key(self): return self.queue.make_key(self.queue.key, 'raw') @property def last_value(self): last_value = self.r.get(self.last_value_key) return last_value.decode() if isinstance(last_value, bytes) else None @last_value.setter def last_value(self, value): self.r.set(self.last_value_key, value=value) def on_received(self, instance, value): if self.last_value is not None and value == self.last_value: return self.last_value = value values = [ part.split(self.sep) if self.sep in part else (part, 0) for part in (value or '').split(' ') ] member_scores = {k: int(v) for k, v in values} self.queue.update(member_scores) logger.info(f'DynamicPriorityParameter: ({self.queue.free_key}) updated {self.parameter} queue with {value!r}') @contextmanager def borrow(self): member, score, version = self.queue.pop() try: yield member, score, version finally: self.queue.put(member=member, version=version) PK!qu%riberry/celery/client/dynamic/util.pyimport time import redis import functools from celery.utils.log import logger class PriorityQueue: def __init__( self, r: redis.Redis, key: str, prefix: str='pq', sep: str=':', blocking: bool=True, block_retry: int=0.5): self.r: redis.Redis = r self.key = key self.prefix = prefix self.sep = sep self.blocking = blocking self.block_retry = block_retry def make_key(self, *args): sub_key = self.sep.join(map(str, args)) return f'{self.prefix}{self.sep}{sub_key}' @property def version(self): return int(self.r.get(self.version_key) or 0) @version.setter def version(self, value): self.r.set(self.version_key, value=value) @property def version_key(self): return self.make_key(self.key, 'counter') @property def free_key(self): return self.generate_free_key(version=self.version) @property def lease_key(self): return self.generate_lease_key(version=self.version) def generate_free_key(self, version): return self.make_key(self.key, f'{version:09}', 'free') def generate_lease_key(self, version): return self.make_key(self.key, f'{version:09}', 'lease') def pop(self): while True: version = self.version result = self.r.transaction( functools.partial(self.pop_transaction, version=version), self.generate_free_key(version=version), self.generate_lease_key(version=version), value_from_callable=True ) if not result and self.blocking: logger.warn(f'PriorityQueue: ({self.free_key}) encountered blank key, ' f'retrying after {self.block_retry} seconds.') time.sleep(self.block_retry) continue else: break return result, self.r.zscore(self.generate_free_key(version=version), result), version def pop_transaction(self, pipe: redis.client.Pipeline, version): free_key, lease_key = self.generate_free_key(version=version), self.generate_lease_key(version=version) [(member, score)] = pipe.zrevrange(free_key, 0, 0, withscores=True) member = member.decode() pipe.multi() pipe.zincrby(free_key, value=member, amount=-1) pipe.zincrby(lease_key, value=member, amount=1) return member def put(self, member, version): self.r.transaction( functools.partial(self.put_transaction, member=member, version=version), self.generate_free_key(version=version), self.generate_lease_key(version=version), ) def put_transaction(self, pipe: redis.client.Pipeline, member, version): free_key, lease_key = self.generate_free_key(version=version), self.generate_lease_key(version=version) pipe.multi() pipe.zincrby(free_key, value=member, amount=1) pipe.zincrby(lease_key, value=member, amount=-1) def update(self, member_scores: dict): func = functools.partial(self.update_transaction, member_scores=member_scores) self.version = self.r.transaction(func, value_from_callable=True) def update_transaction(self, pipe: redis.client.Pipeline, member_scores): version = self.version + 1 pipe.multi() pipe.zadd(self.generate_free_key(version=version), mapping=member_scores) return version def items(self): return [(k.decode(), v) for k, v in self.r.zrevrange(self.free_key, 0, -1, withscores=True)] def leased_items(self): return [(k.decode(), v) for k, v in self.r.zrevrange(self.lease_key, 0, -1, withscores=True)] def clear(self): self.r.delete(self.free_key, self.lease_key) def __iter__(self): return self def __next__(self): try: return self.pop() except ValueError: raise StopIteration PK!*dOmm'riberry/celery/client/scale/__init__.pyfrom urllib.parse import urlparse import redis from celery import current_app, signals from celery.utils.log import logger from celery.worker import control from riberry.celery import client class ConcurrencyScale: all_apps = {} def __init__(self, app, target_queues=None): self.all_apps[app.main] = self self.target_queues = set(target_queues or []) self.initial_queues = set() self.current_concurrency = None @property def target_worker_queues(self): return self.initial_queues & self.target_queues def scale_down(self, consumer): current_concurrency = consumer.pool.num_processes queues_to_skip = set() for queue in consumer.task_consumer.queues: self.initial_queues.add(queue.name) if queue.name not in self.target_queues: queues_to_skip.add(queue.name) else: logger.info(f'scale-down - removing queue: {queue.name}') consumer.cancel_task_queue(queue) if self.current_concurrency is None: self.current_concurrency = current_concurrency if not queues_to_skip and current_concurrency > 0: self.current_concurrency = current_concurrency logger.info(f'scale-down - shrink amount: {current_concurrency}') consumer.pool.shrink(current_concurrency) def scale_up(self, consumer): current_concurrency = consumer.pool.num_processes if self.current_concurrency is None: return queue_names = {q.name for q in consumer.task_consumer.queues} for queue in self.initial_queues: if queue not in queue_names: logger.info(f'scale-up - adding queue: {queue}') consumer.add_task_queue(queue) processes_to_add = self.current_concurrency - current_concurrency max_processes_to_add = min(processes_to_add, 8) if processes_to_add > 0: logger.info(f'scale-up - grow amount: {max_processes_to_add}') consumer.pool.grow(processes_to_add) def scale_to(self, consumer, concurrency): queue_names = {q.name for q in consumer.task_consumer.queues} if not set(queue_names) & self.target_queues: logger.debug( f'scale-to: no target queues ' f'current={", ".join(queue_names or ["N/A"])} target={", ".join(self.target_queues or ["N/A"])}') return current_concurrency = consumer.pool.num_processes process_diff = concurrency - current_concurrency process_diff = min(process_diff, 8) if process_diff > 0: logger.info(f'scale-to: +{process_diff} -> new: {current_concurrency + process_diff} target: {concurrency}') consumer.pool.grow(process_diff) elif process_diff < 0 and concurrency: logger.info(f'scale-to: {process_diff} -> new: {current_concurrency + process_diff} target: {concurrency}') consumer.qos.decrement_eventually(consumer.qos.value - 1) consumer.qos.update() try: consumer.pool.shrink(abs(process_diff)) except Exception as exc: logger.error(exc) return else: prefetch_count = (consumer.pool.num_processes * consumer.prefetch_multiplier) - consumer.qos.value consumer.qos.increment_eventually(n=prefetch_count) self.current_concurrency = process_diff + current_concurrency @classmethod def instance(cls) -> 'ConcurrencyScale': return cls.all_apps.get(current_app.main) @signals.worker_ready.connect def scale_down_on_startup(sender, **_): scale = ConcurrencyScale.instance() scale and scale.scale_down(consumer=sender) @control.control_command() def scale_down(state, instance): scale = ConcurrencyScale.instance() scale and instance and client.is_current_instance(instance_name=instance) and scale.scale_down( consumer=state.consumer) @control.control_command() def scale_up(state, instance): scale = ConcurrencyScale.instance() scale and instance and client.is_current_instance(instance_name=instance) and scale.scale_up( consumer=state.consumer) @control.control_command() def scale_to(state, concurrency, instance): scale = ConcurrencyScale.instance() scale and instance and client.is_current_instance(instance_name=instance) and scale.scale_to( consumer=state.consumer, concurrency=concurrency) @control.control_command() def worker_task_count(state, instance): if not instance or not client.is_current_instance(instance_name=instance): return 0 scale = ConcurrencyScale.instance() if not scale or not scale.target_worker_queues: return 0 return sum([len(control.scheduled(state)), len(control.active(state)), len(control.reserved(state))]) def redis_queues_empty_workers_idle(queues): broker_uri = current_app.connection().as_uri(include_password=True) url = urlparse(broker_uri) r = redis.Redis(host=url.hostname, port=url.port, password=url.password) separator = '\x06\x16' priority_steps = [0, 3, 6, 9] for queue in queues: for prio in priority_steps: queue = f'{queue}{separator}{prio}' if prio else queue queue_length = r.llen(queue) if queue_length: return False all_worker_counts = current_app.control.broadcast( 'worker_task_count', reply=True, arguments={ 'instance': client.current_instance_name() } ) if any([sum(r.values()) for r in all_worker_counts]): return False return True PK!wAn  riberry/celery/client/signals.pyfrom celery import signals, current_task from riberry import model from riberry.celery.client import tasks @signals.celeryd_after_setup.connect def setup_direct_queue(sender, instance, **kwargs): model.conn.raw_engine.dispose() @signals.before_task_publish.connect def before_task_publish(sender, headers, body, **_): if not current_task: return root_id = current_task.request.root_id args, kwargs, *_ = body task_id = headers['id'] if '__ss__' in kwargs: tasks.create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': str(kwargs['__ss__']), 'state': 'QUEUED' } ) if '__sb__' in kwargs: stream, step = kwargs['__sb__'] tasks.create_event( name='step', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'step': str(step), 'state': 'QUEUED' } ) @signals.task_prerun.connect def task_prerun(sender, kwargs, **_): task_id = current_task.request.id root_id = current_task.request.root_id current_task.stream = None current_task.step = None if '__ss__' in kwargs: stream = kwargs['__ss__'] current_task.stream = stream tasks.create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'state': 'ACTIVE' } ) if '__se__' in kwargs: stream = kwargs['__se__'] current_task.stream = stream if '__sb__' in kwargs: stream, step = kwargs['__sb__'] current_task.stream = stream current_task.step = step tasks.create_event( name='step', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'step': str(step), 'state': 'ACTIVE' } ) @signals.task_postrun.connect def task_postrun(sender, state, kwargs, **_): task_id = sender.request.id root_id = sender.request.root_id if '__ss__' in kwargs and state not in ('IGNORED', 'SUCCESS'): stream = kwargs['__ss__'] tasks.create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'state': state or 'SS_UK' } ) if '__se__' in kwargs: stream = kwargs['__se__'] tasks.create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'state': state or 'SS_SE' } ) if '__sb__' in kwargs: stream, step = kwargs['__sb__'] current_task.stream = stream current_task.step = step tasks.create_event( name='step', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'step': str(step), 'state': 'SUCCESS' if state == 'IGNORED' else state or 'FAILURE' } ) PK!88riberry/celery/client/tasks.pyimport base64 import json import pendulum from celery import shared_task, current_app from celery.utils.log import logger from sqlalchemy import desc, asc from riberry import model from riberry.celery import client from . import tracker def _toggle_external_task_queue(app_instance): active = model.job.JobExecution.query().filter_by( status='ACTIVE' ).join(model.job.Job).filter_by( instance=app_instance, ).join(model.job.JobExecutionExternalTask).filter_by( status='READY' ).count() operation = 'add' if active else 'cancel' current_app.control.broadcast('toggle_external_task_queue', reply=False, arguments={'operation': operation}) @shared_task(ignore_result=True) def poll(): with model.conn: app_instance: model.application.ApplicationInstance = model.application.ApplicationInstance.query().filter_by( internal_name=client.current_instance_name(raise_on_none=True) ).first() if not app_instance: return tracker.check_stale_execution(app_instance=app_instance) _toggle_external_task_queue(app_instance=app_instance) if app_instance.status != 'online': return executions = model.job.JobExecution.query().filter( model.job.JobExecution.status == 'RECEIVED' ).join(model.job.Job).order_by( desc(model.job.JobExecution.priority), asc(model.job.JobExecution.created), asc(model.job.JobExecution.id), ).filter_by(instance=app_instance).all() for execution in executions: task = client.queue_job_execution(execution=execution) logger.info(f'poll - queueing task {task}') @shared_task(ignore_result=True) def echo(): with model.conn: app_instance: model.application.ApplicationInstance = model.application.ApplicationInstance.query().filter_by( internal_name=client.current_instance_name(raise_on_none=True) ).first() if not app_instance: return heartbeat = model.application.Heartbeat.query().filter_by(instance=app_instance).first() if not heartbeat: heartbeat = model.application.Heartbeat(instance=app_instance) model.conn.add(heartbeat) heartbeat.updated = pendulum.DateTime.utcnow() model.conn.commit() def create_event(name, root_id, task_id, data=None, binary=None): if not root_id: return if current_app.main != 'default': event_call = event.delay else: event_call = event event_call( name=name, time=pendulum.DateTime.utcnow().timestamp(), root_id=root_id, task_id=task_id, data=data, binary=base64.b64encode(binary).decode() if binary is not None else None ) @shared_task(ignore_result=True) def event(name, time, task_id, root_id, data=None, binary=None): with model.conn: evt = model.misc.Event( name=name, time=time, task_id=task_id, root_id=root_id, data=json.dumps(data), binary=base64.b64decode(binary) if binary is not None else None ) model.conn.add(evt) model.conn.commit() @shared_task(queue='event', ignore_result=True) def workflow_step_update(root_id, stream_name, step_name, task_id, status=None, note=None): with model.conn: job = model.job.JobExecution.query().filter_by(task_id=root_id).one() stream = model.job.JobExecutionStream().query().filter_by(job_execution=job, name=stream_name).first() step = model.job.JobExecutionStreamStep.query().filter_by(stream=stream, task_id=task_id).first() if not step: step = model.job.JobExecutionStreamStep(name=step_name, task_id=task_id) stream.steps.append(step) if status is not None: step.status = status if note is not None: step.note = note model.conn.commit() @shared_task(queue='event', ignore_result=True) def workflow_stream_update(root_id, stream_name, task_id, status): with model.conn: job = model.job.JobExecution.query().filter_by(task_id=root_id).one() stream = model.job.JobExecutionStream.query().filter_by(job_execution=job, name=stream_name).first() if not stream: stream = model.job.JobExecutionStream(name=stream_name, task_id=task_id) job.streams.append(stream) stream.status = status model.conn.commit() @shared_task(bind=True, ignore_result=True) def workflow_complete(task, status, primary_stream): with model.conn: return client.workflow_complete(task.request.id, task.request.root_id, status, primary_stream) def poll_external_task(self, external_task_id): with model.conn: external_task = model.job.JobExecutionExternalTask.query().filter_by( task_id=external_task_id, ).first() if external_task: if external_task.status == 'WAITING': raise self.retry(countdown=1) elif external_task.status == 'READY': output_data = external_task.output_data if isinstance(output_data, bytes): output_data = output_data.decode() external_task.status = 'COMPLETE' model.conn.commit() return output_data PK!]gu,, riberry/celery/client/tracker.pyfrom typing import List from celery.utils.log import logger from sqlalchemy import desc, asc from riberry import model from riberry.celery import util, client def start_tracking_execution(root_id): r = util.celery_redis_instance() instance = client.current_instance_name(raise_on_none=False) logger.info(f'tracker: Tracking workflow {root_id!r}') r.sadd(f'workflow:active:{instance}', root_id) def check_stale_execution(app_instance): executions: List[model.job.JobExecution] = model.job.JobExecution.query().filter( model.job.JobExecution.status.in_(('ACTIVE', 'READY')) ).join(model.job.Job).order_by( desc(model.job.JobExecution.priority), asc(model.job.JobExecution.created) ).filter_by(instance=app_instance).all() if not executions: return r = util.celery_redis_instance() for execution in executions: if not r.sismember(f'workflow:active:{app_instance.internal_name}', execution.task_id): logger.info(f'tracker: Identified stale workflow {execution.task_id}') client.workflow_complete( task_id=execution.task_id, root_id=execution.task_id, status='FAILURE', primary_stream=None ) client.wf.artifact( name='Workflow Cancelled', type=model.job.ArtifactType.error, category='Fatal', filename='fatal.log', content=str( 'The current workflow\'s ID was not found within Redis and has therefore been cancelled. This ' 'usually occurs when Redis is flushed while an execution is active.' ).encode(), task_id=execution.task_id, root_id=execution.task_id ) PK!Priberry/celery/client/wf.pyimport json import sys import threading import traceback from contextlib import contextmanager from io import BytesIO from typing import Union, Optional, List from celery import current_task, current_app from riberry import model, config, policy, services from riberry.celery.client.tasks import create_event, poll_external_task from riberry.exc import BaseError from riberry.model.job import ArtifactType _cxt = threading.local() _cxt.stream_name = None class TaskWrap: def __init__(self, func, **kwargs): self.func = func self.kwargs = kwargs @property def name(self): return self.func.name def _mixin_kw(self, kwargs): return {**self.kwargs, **kwargs} def s(self, *args, **kwargs): return self.func.s(*args, **self._mixin_kw(kwargs=kwargs)) def si(self, *args, **kwargs): return self.func.si(*args, **self._mixin_kw(kwargs=kwargs)) def delay(self, *args, **kwargs): return self.func.delay(*args, **self._mixin_kw(kwargs=kwargs)) @contextmanager def stream_context(stream: str): _cxt.stream_name = _validate_stream_name(stream) try: yield finally: _cxt.stream_name = None def _validate_stream_name(stream: str): if not stream and not _cxt.stream_name: raise ValueError('Stream name cannot be blank') return str(stream or _cxt.stream_name) def step(task, step: str = None, stream: str = None): step = step if step else task.name stream = _validate_stream_name(stream) return TaskWrap(task, __sb__=(stream, step)) def stream_start(task, stream: str = None): return TaskWrap(task, __ss__=_validate_stream_name(stream)) def stream_end(task, stream: str = None): return TaskWrap(task, __se__=_validate_stream_name(stream)) s = stream_start e = stream_end b = step def artifact(filename: str, content: Union[bytes, str], name: str=None, type: Union[str, ArtifactType] = ArtifactType.output, category='Default', data: dict=None, stream: str=None, step=None, task_id: str=None, root_id: str=None): task_id = task_id or current_task.request.id root_id = root_id or current_task.request.root_id stream = stream or getattr(current_task, 'stream', None) step = step or getattr(current_task, 'step', None) if name is None: name = filename if isinstance(content, str): content = content.encode() if isinstance(type, ArtifactType): type = type.value try: ArtifactType(type) except ValueError as exc: raise ValueError(f'ArtifactType enum has no value {type!r}.' f'Supported types: {", ".join(ArtifactType.__members__)}') from exc create_event( 'artifact', root_id=root_id, task_id=task_id, data={ 'name': str(name), 'type': str(type), 'category': str(category), 'data': data if isinstance(data, dict) else {}, 'stream': str(stream) if stream else None, 'step': str(step) if step else None, 'filename': str(filename), }, binary=content ) def artifact_from_traceback(name=None, filename=None, category='Intercepted', type=model.job.ArtifactType.error): exc_type, exc, tb = sys.exc_info() if not exc_type: return if isinstance(exc, BaseError): error_content = f'{traceback.format_exc()}\n\n{"-"*32}\n\n{json.dumps(exc.output(), indent=2)}'.encode() else: error_content = traceback.format_exc().encode() artifact( name=name if name else f'Exception {current_task.name}', type=type, category=category, data={ 'Error Type': exc.__class__.__name__, 'Error Message': str(exc) }, filename=filename if filename else f'{current_task.name}-{current_task.request.id}.log', content=error_content ) def send_email(subject: str, body: str, mime_type: Optional[str]=None, sender: Optional[str] =None, receivers: Optional[List[str]]=None): if isinstance(receivers, str): receivers = [receivers] notify( notification_type='custom-email', data={ 'subject': subject, 'mime_type': mime_type, 'body': body, 'from': sender, 'to': receivers or [] } ) def notify(notification_type, data=None, task_id=None, root_id=None): task_id = task_id or current_task.request.id root_id = root_id or current_task.request.root_id create_event( 'notify', root_id=root_id, task_id=task_id, data={ 'type': notification_type, 'data': data or {} }, binary=None ) def current_execution() -> Optional[model.job.JobExecution]: return model.job.JobExecution.query().filter_by( task_id=current_task.request.root_id ).first() def create_job(form_internal_name, job_name=None, input_values=None, input_files=None): form: model.interface.Form = model.interface.Form.query().filter_by( internal_name=form_internal_name, ).first() if not input_files: input_files = {} cleansed_input_files = {} for attr, value in input_files.items(): if isinstance(value, str): value = value.encode() if isinstance(value, bytes): value = BytesIO(value) if not hasattr(value, 'read'): raise ValueError(f'wf.create_job:: value for input file {attr!r} must be of type str, bytes or stream') cleansed_input_files[attr] = value job_execution = current_execution() job_execution_user = job_execution.creator policy_provider = config.config.policies.provider with policy.context.scope(subject=job_execution_user, environment=None, policy_engine=policy_provider): job = services.job.create_job( form_id=form.id, name=job_name or f'Via {job_execution.job.name} / #{job_execution.id}', input_values=input_values or {}, input_files=cleansed_input_files, execute=True, parent_execution=job_execution ) model.conn.commit() return job def create_external_task(name, task_type, external_task_id, input_data: bytes = None): external_task = model.job.JobExecutionExternalTask( job_execution=current_execution(), stream_id=None, task_id=external_task_id, name=name, type=task_type, input_data=input_data, ) model.conn.add(external_task) model.conn.commit() return external_task def poll_external_task_sig(external_task_id): return current_app.tasks['check-external-task'].si(external_task_id=external_task_id) PK![ǢE%%riberry/celery/util.pyfrom urllib.parse import urlparse import redis from celery import current_app def celery_redis_instance(): broker_uri = current_app.connection().as_uri(include_password=True) url = urlparse(broker_uri) return redis.Redis(host=url.hostname, port=url.port, password=url.password) PK!P?jjriberry/config.pyimport riberry import toml import os import binascii import pathlib import warnings from riberry.util.common import variable_substitution CONF_DEFAULT_BG_SCHED_INTERVAL = 10 CONF_DEFAULT_BG_EVENT_INTERVAL = 2 CONF_DEFAULT_BG_EVENT_PROCESS_LIMIT = 1000 CONF_DEFAULT_BG_CAPACITY_INTERVAL = 5 CONF_DEFAULT_DB_ECHO = False CONF_DEFAULT_DB_CONN_PATH = (pathlib.Path(os.path.expanduser('~')) / '.riberry') / 'model.db' CONF_DEFAULT_DB_CONN_URL = f'sqlite:///{CONF_DEFAULT_DB_CONN_PATH}' CONF_DEFAULT_POLICY_PROVIDER = 'default' CONF_DEFAULT_AUTH_PROVIDER = 'default' CONF_DEFAULT_AUTH_TOKEN_PROVIDER = 'jwt' CONF_DEFAULT_AUTH_TOKEN_PATH = (pathlib.Path(os.path.expanduser('~')) / '.riberry') / 'auth.key' CONF_DEFAULT_AUTH_TOKEN_SIZE = 256 if 'RIBERRY_CONFIG_PATH' in os.environ: _config = variable_substitution(toml.load(os.environ['RIBERRY_CONFIG_PATH'])) else: warnings.warn(message=f'Environment variable \'RIBERRY_CONFIG_PATH\' not declared, ' f'defaulting to default configuration') _config = {} def load_config_value(raw_config, default=None): if 'path' in raw_config: with open(raw_config['path']) as f: return f.read() elif 'envvar' in raw_config: return os.getenv(raw_config['envvar']) elif 'value' in raw_config: return raw_config['value'] else: return default class DatabaseConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} connection_config = self.raw_config.get('connection') or {} self.connection_url = load_config_value(connection_config) if not self.connection_url: CONF_DEFAULT_DB_CONN_PATH.parent.mkdir(exist_ok=True) self.connection_url = CONF_DEFAULT_DB_CONN_URL self.echo = connection_config.get('echo', CONF_DEFAULT_DB_ECHO) self.connection_arguments = self.raw_config.get('arguments', {}) def enable(self): riberry.model.init( url=self.connection_url, echo=self.echo, connection_arguments=self.connection_arguments ) class AuthenticationTokenConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.provider = self.raw_config.get('provider') or CONF_DEFAULT_AUTH_TOKEN_PROVIDER self.secret = load_config_value(self.raw_config) if not self.secret: self.secret = self.make_secret() @staticmethod def make_secret(): CONF_DEFAULT_AUTH_TOKEN_PATH.parent.mkdir(exist_ok=True) if not CONF_DEFAULT_AUTH_TOKEN_PATH.is_file(): with open(CONF_DEFAULT_AUTH_TOKEN_PATH, 'wb') as f: f.write(binascii.hexlify(os.urandom(CONF_DEFAULT_AUTH_TOKEN_SIZE))) with open(CONF_DEFAULT_AUTH_TOKEN_PATH, 'rb') as f: return f.read().decode() class AuthenticationProviderConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.default = self.raw_config.get('default') or CONF_DEFAULT_AUTH_PROVIDER self.supported = self.raw_config.get('supported') or [self.default] class AuthenticationConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.providers = AuthenticationProviderConfig(self.raw_config.get('providers')) self.token = AuthenticationTokenConfig(self.raw_config.get('token')) self._config_cache = {} def __getitem__(self, item): if item not in self._config_cache: for provider in riberry.plugins.plugin_register['authentication']: if provider.name() == item: self._config_cache[item] = provider(self.raw_config.get(item, {})) break else: raise ValueError(f'Authentication provider {item!r} not found') return self._config_cache[item] @property def default_provider(self): return self[self.providers.default] def enable(self): for provider_name in self.providers.supported: self[provider_name].on_enabled() class PolicyProviderConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.provider_name = self.raw_config.get('provider') or CONF_DEFAULT_POLICY_PROVIDER self._provider = None @property def provider(self): if self._provider is None: for provider in riberry.plugins.plugin_register['policies']: if provider.name == self.provider_name: self._provider = provider break else: raise ValueError(f'PolicyProviderConfig.provider:: ' f'could not find register provider {self.provider_name!r}') return self._provider class EmailNotificationConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self._enabled = config_dict.get('enabled', False) self.smtp_server = config_dict.get('smtpServer') self.sender = config_dict.get('sender') @property def enabled(self): return bool(self._enabled and self.smtp_server and self.sender) class BackgroundTaskConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.events = BackgroundTaskEventsConfig(self.raw_config.get('events') or {}) self.schedules = BackgroundTaskScheduleConfig(self.raw_config.get('schedules') or {}) self.capacity = BackgroundTaskCapacityConfig(self.raw_config.get('capacity') or {}) class BackgroundTaskEventsConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.interval = config_dict.get('interval', CONF_DEFAULT_BG_EVENT_INTERVAL) self.processing_limit = config_dict.get('limit', CONF_DEFAULT_BG_EVENT_PROCESS_LIMIT) class BackgroundTaskScheduleConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.interval = config_dict.get('interval', CONF_DEFAULT_BG_SCHED_INTERVAL) class BackgroundTaskCapacityConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.interval = config_dict.get('interval', CONF_DEFAULT_BG_CAPACITY_INTERVAL) class RiberryConfig: def __init__(self, config_dict): self.raw_config = config_dict self.authentication = AuthenticationConfig(self.raw_config.get('authentication') or {}) self.policies = PolicyProviderConfig(self.raw_config.get('policies') or {}) self.database = DatabaseConfig(self.raw_config.get('database') or {}) if 'notification' in self.raw_config and isinstance(self.raw_config['notification'], dict): email_config = self.raw_config['notification'].get('email') or {} else: email_config = {} self.email = EmailNotificationConfig(email_config) self.background = BackgroundTaskConfig(self.raw_config.get('background') or {}) @property def celery(self): return self.raw_config.get('celery') or {} def enable(self): self.authentication.enable() self.database.enable() config: RiberryConfig = RiberryConfig(config_dict=_config) PK!0dmriberry/exc.py class BaseError(Exception): __msg__ = None __http_code__ = 500 def __init__(self, target=None, data=None, **args): super(BaseError, self).__init__(self._fmt_message(**args, target=target)) self.target = target self.exc_data = data or {} @classmethod def _fmt_message(cls, **args): return cls.__msg__.format(**args) if cls.__msg__ else f'{cls.__name__} was raised' def output(self): return { 'code': type(self).__name__, 'message': str(self), 'target': self.target, 'data': self.exc_data } class AuthenticationError(BaseError): __msg__ = 'Wrong username or password supplied.' __http_code__ = 401 def __init__(self): super(AuthenticationError, self).__init__(target='user') class SessionExpired(BaseError): __msg__ = 'The user\'s session has expired.' __http_code__ = 401 def __init__(self): super(SessionExpired, self).__init__(target='user') class AuthorizationError(BaseError): __msg__ = 'User does not have access to the given resource.' __http_code__ = 403 def __init__(self): super(AuthorizationError, self).__init__(target='user') class ResourceNotFound(BaseError): __msg__ = 'The requested resource does not exist.' __http_code__ = 404 def __init__(self, resource, identifier): super(ResourceNotFound, self).__init__(target=resource, data={ 'id': identifier }) class UnknownError(BaseError): __msg__ = 'An unknown error has occurred.' __http_code__ = 500 def __init__(self, error=None): super(UnknownError, self).__init__() class InputErrorGroup(BaseError): __msg__ = 'One or more errors occurred while validating the request.' __http_code__ = 400 def __init__(self, *errors): super(InputErrorGroup, self).__init__( target=None, data={'errors': []} ) self.extend(errors=errors) def extend(self, errors): self.exc_data['errors'] += [e.output() if isinstance(e, BaseError) else e for e in errors] class RequiredInputError(BaseError): __msg__ = 'Required field {field!r} for {target} not provided.' __http_code__ = 400 def __init__(self, target, field, internal_name=None): super(RequiredInputError, self).__init__( target=target, field=field, data=dict(internal=internal_name) ) class InvalidInputError(BaseError): __msg__ = 'Invalid {field!r} provided for {target}.' __http_code__ = 400 def __init__(self, target, field, internal_name=None): super(InvalidInputError, self).__init__( target=target, field=field, data=dict(internal=internal_name) ) class InvalidEnumError(BaseError): __msg__ = 'Invalid {field!r} provided for {target}. Expected: {allowed_values}.' __http_code__ = 400 def __init__(self, target, field, allowed_values, internal_name=None): super(InvalidEnumError, self).__init__( target=target, field=field, allowed_values=', '.join(repr(value) for value in allowed_values), data=dict(internal=internal_name) ) class UnknownInputError(BaseError): __msg__ = 'Unknown input field {field!r} provided for {target}.' __http_code__ = 400 def __init__(self, target, field): super(UnknownInputError, self).__init__(target=target, field=field) class UniqueInputConstraintError(BaseError): __msg__ = 'Cannot create {target} with {field}: {value!r}. This {field} is already in-use.' __http_code__ = 400 def __init__(self, target, field, value): super(UniqueInputConstraintError, self).__init__(target=target, field=field, value=value) PK!Oriberry/model/__init__.pyimport sqlalchemy import sqlalchemy.orm import sqlalchemy.pool from . import misc, application, group, auth, interface, job, base class __ModelProxy: raw_session = None raw_engine = None def __getattr__(self, item): return getattr(self.raw_session, item) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.remove() conn: sqlalchemy.orm.session.Session = __ModelProxy() def init(url='sqlite://', **config): __ModelProxy.raw_engine = sqlalchemy.create_engine( url, echo=config.get('echo', False), poolclass=sqlalchemy.pool.QueuePool, pool_use_lifo=True, pool_pre_ping=True, connect_args=config.get('connection_arguments', {}) ) __ModelProxy.raw_session = sqlalchemy.orm.scoped_session(sqlalchemy.orm.sessionmaker(bind=__ModelProxy.raw_engine)) base.Base.metadata.create_all(__ModelProxy.raw_engine) PK!f,۵,,%riberry/model/application/__init__.pyimport enum from datetime import datetime from typing import List import pendulum from sqlalchemy import Column, String, Boolean, ForeignKey, DateTime, Integer, desc, Enum from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import relationship, validates from riberry import model from riberry.model import base class Application(base.Base): """Contains basic metadata related to an application.""" __tablename__ = 'application' __reprattrs__ = ['name', 'enabled'] # columns id = base.id_builder.build() document_id = Column(base.id_builder.type, ForeignKey(column='document.id')) name: str = Column(String(64), nullable=False, unique=True, comment='The human-readable name of the application.') internal_name: str = Column(String(256), nullable=False, unique=True, comment='The internal name or secondary identifier of the application.') description: str = Column(String(256), comment='A brief description of the application\'s purpose.') type: str = Column(String(64), nullable=False, comment='The type of application.') enabled: bool = Column(Boolean(name='application_enabled'), default=True, comment='Whether or not this application and its instances are enabled (TODO).') # associations instances: List['ApplicationInstance'] = relationship( 'ApplicationInstance', cascade='save-update, merge, delete, delete-orphan', back_populates='application') forms: List['model.interface.Form'] = relationship( 'Form', cascade='save-update, merge, delete, delete-orphan', back_populates='application') document: 'model.misc.Document' = relationship('Document', cascade='save-update, merge, delete, delete-orphan', single_parent=True) class ApplicationInstance(base.Base): """ An ApplicationInstance represents a running instance of an Application. Multiple ApplicationInstances are useful when we want to separate out different types of executions for the same Application. An example of this is when we have an application which can support both long-running and short-running executions. We can spin up a separate instance with separate scheduling to ensure that the long-running jobs don't consume block the short-running jobs. """ __tablename__ = 'app_instance' __reprattrs__ = ['name', 'internal_name'] # columns id = base.id_builder.build() application_id = Column(base.id_builder.type, ForeignKey(column='application.id'), nullable=False) name: str = Column(String(64), nullable=False, unique=True, comment='The human-readable name of the application.') internal_name: str = Column(String(256), nullable=False, unique=True, comment='The internal name or secondary identifier of the application instance.') # associations application: 'Application' = relationship('Application', back_populates='instances') heartbeat: 'Heartbeat' = relationship('Heartbeat', cascade='save-update, merge, delete, delete-orphan', uselist=False, back_populates='instance') schedules: List['ApplicationInstanceSchedule'] = relationship( 'ApplicationInstanceSchedule', cascade='save-update, delete, delete-orphan', back_populates='instance', order_by=lambda: ( ApplicationInstanceSchedule.parameter, desc(ApplicationInstanceSchedule.priority), ApplicationInstanceSchedule.start_time, ) ) forms: List['model.interface.Form'] = relationship('Form', cascade='save-update, merge, delete, delete-orphan', back_populates='instance') @property def status(self): if not self.heartbeat: return 'created' diff = base.utc_now() - pendulum.instance(self.heartbeat.updated) if diff.seconds >= 10: return 'offline' if self.active_schedule_value('active', default='Y') == 'N': return 'inactive' return 'online' def active_schedule_value(self, name, default=None, current_time=None): schedule = self.active_schedule(name=name, current_time=current_time) return schedule.value if schedule else default def active_schedule(self, name, current_time=None) -> 'ApplicationInstanceSchedule': schedules = sorted( (s for s in self.schedules if s.parameter == name and s.active(current_time=current_time)), key=lambda s: (-s.priority, s.start_time) ) for schedule in schedules: return schedule @property def active_schedule_values(self): return {name: schedule.value if schedule else None for name, schedule in self.active_schedules.items()} @property def active_schedules(self): parameters = {s.parameter for s in self.schedules} return {param: self.active_schedule(name=param) for param in parameters} class Heartbeat(base.Base): """ The Heartbeat object is used by running ApplicationInstances to report back that they're alive. If an ApplicationInstance's last heartbeat was over 10 seconds ago, we assume that it is offline. """ __tablename__ = 'heartbeat_app_instance' __reprattrs__ = ['instance_id', 'updated'] # columns id = base.id_builder.build() instance_id = Column(base.id_builder.type, ForeignKey('app_instance.id'), nullable=False) created: datetime = Column(DateTime(timezone=True), default=base.utc_now, comment='The first heartbeat we received for the instance.') updated: datetime = Column(DateTime(timezone=True), default=base.utc_now, comment='The last heartbeat we received for the instance.') # associations instance: 'ApplicationInstance' = relationship('ApplicationInstance', back_populates='heartbeat') class ApplicationInstanceSchedule(base.Base): """ The ApplicationInstanceSchedule is a time-based schedule for dynamic parameters. It allows us to define parameters which change depending on the time of the day. There are currently two core parameters: `active` and `concurrency`. These allow us to automatically activate/de-activate and scale up/down our applications during different times of the day. Any custom parameter can be defined and consumed by our Celery application as long as we have a custom parameter handler in place which can interpret the parameter's values. """ __tablename__ = 'sched_app_instance' # columns id = base.id_builder.build() instance_id = Column(base.id_builder.type, ForeignKey('app_instance.id'), nullable=False) days = Column(String(27), nullable=False, default='*', comment='Comma-separated list of specific days ("MON,WED"), or "*" for every day.') start_time = Column(String(8), nullable=False, default='00:00:00', comment='The time when this schedule activates.') end_time = Column(String(8), nullable=False, default='23:59:59', comment='The time when this schedule de-activates.') timezone = Column(String(128), nullable=False, default='UTC', comment='The timezone of for the given start and end times.') parameter = Column(String(32), nullable=False, comment='The parameter which this schedule applies to.') value = Column(String(512), comment='The value of the given parameter.') priority = Column(Integer, default=64, nullable=False, comment='Priority of the schedule, where higher values mean higher priority.') # associations instance: 'ApplicationInstance' = relationship('ApplicationInstance', back_populates='schedules') # validations @validates('priority') def validate_priority(self, _, priority): assert isinstance(priority, int) and 255 >= priority >= 1, ( f'ApplicationInstanceSchedule.priority must be an integer between 1 and 255 (received {priority})') return priority @validates('start_time') def validate_start_time(self, _, start_time): return self.cleanse_time(time_=start_time) @validates('end_time') def validate_end_time(self, _, end_time): return self.cleanse_time(time_=end_time) @staticmethod def cleanse_time(time_): if isinstance(time_, int): now = pendulum.now() date = pendulum.DateTime(now.year, now.month, now.day) + pendulum.duration(seconds=time_) time_ = date.strftime('%H:%M:%S') if isinstance(time_, str): pendulum.DateTime.strptime(time_, '%H:%M:%S') return time_ @validates('timezone') def validate_timezone(self, _, timezone): pendulum.timezone(timezone) return timezone @validates('days') def validate_days(self, _, days): if days in (None, '*'): return '*' days = [o.strip() for o in str(days).lower().split(',')] invalid_days = set(days) - {'mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'} if invalid_days: raise ValueError(f'Invalid days received: {", ".join(invalid_days)}') return ','.join(days) def active(self, current_time=None): now: pendulum.DateTime = pendulum.instance(current_time) \ if current_time else pendulum.DateTime.now(tz=self.timezone) now = now.replace(microsecond=0) if self.days != '*': all_days = self.days.lower().split(',') if now.format('dd').lower() not in all_days: return False start_dt = pendulum.parse(self.start_time, tz=self.timezone)\ .replace(year=now.year, month=now.month, day=now.day) end_dt = pendulum.parse(self.end_time, tz=self.timezone)\ .replace(year=now.year, month=now.month, day=now.day) return end_dt >= now >= start_dt class CapacityDistributionStrategy(enum.Enum): spread = 'spread' binpack = 'binpack' def __repr__(self): return repr(self.value) class CapacityConfiguration(base.Base): __tablename__ = 'capacity_config' # columns id = base.id_builder.build() weight_parameter = Column(String(32), nullable=False, unique=True, comment='Parameter name which defines the requested capacity weight.') capacity_parameter = Column(String(32), nullable=False, comment='Parameter name which defines the total capacity allocation.') producer_parameter = Column(String(32), nullable=False, comment='Parameter name which defines the capacity producers.') distribution_strategy = Column( Enum(CapacityDistributionStrategy), nullable=False, default=CapacityDistributionStrategy.binpack, comment='Binpack (fill vertically) or spread (fill horizontally)' ) # associations producers: List['CapacityProducer'] = relationship( 'CapacityProducer', cascade='save-update, merge, delete, delete-orphan', back_populates='configuration' ) class CapacityProducer(base.Base): __tablename__ = 'capacity_producer' # columns id = base.id_builder.build() configuration_id = Column(base.id_builder.type, ForeignKey('capacity_config.id'), nullable=False) name = Column(String(128), nullable=False, comment='The human-readable name of the producer.') internal_name = Column(String(128), nullable=False, comment='The internal name of the producer.') capacity = Column(Integer, nullable=False, comment='The total capacity of the consumer.') # associations configuration: 'CapacityConfiguration' = relationship('CapacityConfiguration', back_populates='producers') PK!& riberry/model/auth/__init__.pyimport datetime import re from typing import AnyStr, Dict, List import jwt import pendulum from sqlalchemy import Column, String, ForeignKey, DateTime, desc from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import relationship, validates, joinedload from riberry import model, exc from riberry.model import base from riberry.config import config class User(base.Base): __tablename__ = 'users' __reprattrs__ = ['username'] id = base.id_builder.build() username = Column(String(48), nullable=False, unique=True) password = Column(String(512)) auth_provider = Column(String(32), nullable=False, default=config.authentication.providers.default) details: 'UserDetails' = relationship('UserDetails', uselist=False, back_populates='user') # associations group_associations: List['model.group.ResourceGroupAssociation'] = model.group.ResourceGroupAssociation.make_relationship( resource_id=id, resource_type=model.misc.ResourceType.user ) jobs: List['model.job.Job'] = relationship( 'Job', order_by=lambda: desc(model.job.Job.created), back_populates='creator') executions: List['model.job.JobExecution'] = relationship( 'JobExecution', order_by=lambda: desc(model.job.JobExecution.updated), back_populates='creator') notifications: List['model.misc.UserNotification'] = relationship( 'UserNotification', order_by=lambda: desc(model.misc.UserNotification.created), back_populates='user') # proxies groups: List['model.group.Group'] = association_proxy('group_associations', 'group') @property def forms(self) -> List['model.interface.Form']: return model.interface.Form.query().filter( (model.group.ResourceGroupAssociation.group_id.in_(o.group_id for o in self.group_associations)) & (model.group.ResourceGroupAssociation.resource_type == model.misc.ResourceType.form) & (model.interface.Form.id == model.group.ResourceGroupAssociation.resource_id) ).all() @property def applications(self) -> List['model.application.Application']: forms = model.interface.Form.query().filter( model.interface.Form.id.in_(form.id for form in self.forms) ).options( joinedload(model.interface.Form.application) ).all() return [form.application for form in forms] @classmethod def authenticate(cls, username, password): existing_user: cls = cls.query().filter_by(username=username).first() if existing_user: provider = config.authentication[existing_user.auth_provider] else: provider = config.authentication.default_provider if not provider.authenticate(username=username, password=password): raise exc.AuthenticationError return cls.query().filter_by(username=username).first() @validates('username') def validate_username(self, _, username): if not username or len(username) < 3: raise ValueError(f'User.username :: usernames must be 3+ characters long. Received {repr(username)}') return username @classmethod def secure_password(cls, password: str, provider_name=None) -> bytes: provider = config.authentication[provider_name or config.authentication.providers.default] password = provider.secure_password(password=password) return password class UserDetails(base.Base): __tablename__ = 'user_details' id = base.id_builder.build() user_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=False) user: 'User' = relationship('User', back_populates='details') first_name = Column(String(64)) last_name = Column(String(64)) display_name = Column(String(128)) department = Column(String(128)) email = Column(String(128)) updated: datetime = Column(DateTime(timezone=True), default=base.utc_now) @property def full_name(self): if self.first_name and self.last_name: return f'{self.first_name} {self.last_name}' else: return self.first_name or self.last_name or self.display_name or self.user.username @validates('email') def validate_email(self, _, email): if email and not re.match(r'[^@]+@[^@]+\.[^@]+', email or ''): raise ValueError(f'UserDetails.email :: Invalid email received ({repr(email)})') return email class AuthToken: @staticmethod def create(user: User, expiry_delta: datetime.timedelta=datetime.timedelta(hours=24)) -> AnyStr: iat: pendulum.DateTime = base.utc_now() exp: pendulum.DateTime = iat + expiry_delta return jwt.encode({ 'iat': iat.int_timestamp, 'exp': exp.int_timestamp, 'subject': user.username }, config.authentication.token.secret, algorithm='HS256') @staticmethod def verify(token: AnyStr) -> Dict: try: return jwt.decode(token, config.authentication.token.secret, algorithms=['HS256']) except jwt.ExpiredSignatureError: raise exc.SessionExpired except Exception: raise exc.AuthenticationError PK!n@briberry/model/base.pyimport pendulum from sqlalchemy import Column, Sequence, Integer, MetaData from sqlalchemy.ext.declarative import declarative_base from riberry import model class _BaseMixin: @classmethod def query(cls): return model.conn.query(cls) def __repr__(self): attribute_names = (['id'] if hasattr(self, 'id') else []) + getattr(self, '__reprattrs__', []) attributes = ', '.join(f'{attr}={repr(getattr(self, attr))}' for attr in attribute_names) return f'<{type(self).__name__} {attributes}>' class _IdBuilder: def __init__(self, id_type): self.type = id_type def build(self, sequence='SEQUENCE_PK'): return Column(self.type, Sequence(sequence), primary_key=True) meta = MetaData( naming_convention={ 'ix': 'ix_%(column_0_label)s', 'uq': 'uq_%(table_name)s_%(column_0_N_name)s', 'ck': 'ck_%(table_name)s_%(constraint_name)s', 'fk': 'fk_%(table_name)s_%(column_0_N_name)s_%(referred_table_name)s', 'pk': 'pk_%(table_name)s' } ) Base = declarative_base(cls=_BaseMixin, metadata=meta) id_builder = _IdBuilder(id_type=Integer) def utc_now(): return pendulum.DateTime.utcnow() PK!8w9 riberry/model/group/__init__.pyimport enum from typing import List from sqlalchemy import String, Column, Enum, ForeignKey, sql from sqlalchemy.orm import relationship from riberry import model from riberry.model import base class ResourceGroupAssociation(base.Base): __tablename__ = 'resource_group' __reprattrs__ = ['group_id', 'resource_id', 'resource_type'] # columns id = base.id_builder.build() group_id = Column(ForeignKey('groups.id'), nullable=False) resource_id = Column(base.id_builder.type, nullable=False) resource_type = Column(Enum(model.misc.ResourceType), nullable=False) # associations group: 'Group' = relationship('Group', back_populates='resource_associations') @classmethod def make_relationship(cls, resource_id, resource_type): return relationship( 'ResourceGroupAssociation', primaryjoin=lambda: sql.and_( resource_id == ResourceGroupAssociation.resource_id, ResourceGroupAssociation.resource_type == resource_type ), foreign_keys=lambda: ResourceGroupAssociation.resource_id, cascade='save-update, merge, delete, delete-orphan', ) class Group(base.Base): __tablename__ = 'groups' __reprattrs__ = ['name'] # columns id = base.id_builder.build() name: str = Column(String(128), nullable=False, unique=True) _display_name: str = Column('display_name', String(128)) description: str = Column(String(128)) # associations resource_associations: List['ResourceGroupAssociation'] = relationship( 'ResourceGroupAssociation', back_populates='group') user_associations: List['ResourceGroupAssociation'] = relationship( 'ResourceGroupAssociation', primaryjoin=lambda: sql.and_( ResourceGroupAssociation.group_id == Group.id, ResourceGroupAssociation.resource_type == model.misc.ResourceType.user ) ) form_associations: List['ResourceGroupAssociation'] = relationship( 'ResourceGroupAssociation', primaryjoin=lambda: sql.and_( ResourceGroupAssociation.group_id == Group.id, ResourceGroupAssociation.resource_type == model.misc.ResourceType.form ) ) @property def display_name(self): return self._display_name or self.name @property def users(self): return model.auth.User.query().filter( (ResourceGroupAssociation.group_id == self.id) & (ResourceGroupAssociation.resource_type == model.misc.ResourceType.user) & (model.auth.User.id == ResourceGroupAssociation.resource_id) ).all() @property def forms(self): return model.interface.Form.query().filter( (ResourceGroupAssociation.group_id == self.id) & (ResourceGroupAssociation.resource_type == model.misc.ResourceType.form) & (model.interface.Form.id == ResourceGroupAssociation.resource_id) ).all() PK!*y#riberry/model/interface/__init__.pyimport json import mimetypes from typing import List from sqlalchemy import Column, String, Boolean, ForeignKey, Integer, Binary, DateTime, desc from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import relationship, deferred from riberry import model from riberry.model import base class Form(base.Base): """ A Form is an interface to creating jobs for a given ApplicationInstance. """ __tablename__ = 'form' __reprattrs__ = ['internal_name', 'version'] # columns id = base.id_builder.build() instance_id = Column(base.id_builder.type, ForeignKey('app_instance.id'), nullable=False) application_id = Column(base.id_builder.type, ForeignKey('application.id'), nullable=False) document_id = Column(base.id_builder.type, ForeignKey(column='document.id')) name: str = Column(String(64), unique=True, nullable=False, comment='The human-readable name of the form.') internal_name: str = Column(String(256), unique=True, nullable=False, comment='The internal name or secondary identifier of the form.') description: str = Column(String(256), comment='A brief description of the form\'s purpose.') version: int = Column(Integer, nullable=False, default=1, comment='The version of the form.') enabled: bool = Column(Boolean(name='form_enabled'), nullable=False, default=True, comment='Whether or not this form is enabled.') # associations instance: 'model.application.ApplicationInstance' = relationship('ApplicationInstance', back_populates='forms') application: 'model.application.Application' = relationship('Application', back_populates='forms') schedules: List['FormSchedule'] = relationship('FormSchedule', back_populates='form') jobs: List['model.job.Job'] = relationship( 'Job', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: desc(model.job.Job.created), back_populates='form' ) group_associations: List['model.group.ResourceGroupAssociation'] = model.group.ResourceGroupAssociation.make_relationship( resource_id=id, resource_type=model.misc.ResourceType.form ) input_value_definitions: List['InputValueDefinition'] = relationship( 'InputValueDefinition', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: InputValueDefinition.id.asc(), back_populates='form' ) input_file_definitions: List['InputFileDefinition'] = relationship( 'InputFileDefinition', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: InputFileDefinition.id.asc(), back_populates='form' ) document: 'model.misc.Document' = relationship('Document', cascade='save-update, merge, delete, delete-orphan', single_parent=True) # proxies groups: List['model.group.Group'] = association_proxy('group_associations', 'group') class FormSchedule(base.Base): __tablename__ = 'sched_form' # columns id = base.id_builder.build() form_id = Column(base.id_builder.type, ForeignKey('form.id'), nullable=False) start = Column(DateTime(timezone=True), nullable=False) end = Column(DateTime(timezone=True), nullable=False) # associations form: 'Form' = relationship('Form', back_populates='schedules') class InputFileDefinition(base.Base): """The InputFileDefinition object defines the properties of an input file.""" __tablename__ = 'input_file_definition' __reprattrs__ = ['name', 'type'] # columns id = base.id_builder.build() form_id = Column(base.id_builder.type, ForeignKey('form.id'), nullable=False) name: str = Column(String(64), nullable=False) internal_name: str = Column(String(256), nullable=False) description: str = Column(String(128)) type: str = Column(String(64), nullable=False) accept: str = Column(String(256)) required: bool = Column(Boolean(name='form_required'), nullable=False, default=True) # associations form: 'Form' = relationship('Form', back_populates='input_file_definitions') class InputValueDefinition(base.Base): """The InputFileDefinition object defines the properties of an input value.""" __tablename__ = 'input_value_definition' __reprattrs__ = ['name', 'type'] # columns id = base.id_builder.build() form_id = Column(base.id_builder.type, ForeignKey('form.id'), nullable=False) name: str = Column(String(64), nullable=False) internal_name: str = Column(String(256), nullable=False) description: str = Column(String(128)) type: str = Column(String(64), nullable=False) required: bool = Column(Boolean(name='input_value_definition_required'), nullable=False, default=True) default_binary = Column('defaults', Binary) # associations form: 'Form' = relationship('Form', back_populates='input_value_definitions') allowed_value_enumerations: List['InputValueEnum'] = relationship( 'InputValueEnum', cascade='save-update, merge, delete, delete-orphan', back_populates='definition') # proxies allowed_binaries: List[bytes] = association_proxy( 'allowed_value_enumerations', 'value', creator=lambda value: InputValueEnum(value=value) ) @property def allowed_values(self): return [json.loads(v.decode()) for v in self.allowed_binaries] @hybrid_property def default_value(self): return json.loads(self.default_binary.decode()) if self.default_binary else None @default_value.setter def default_value(self, value): self.default_binary = json.dumps(value).encode() class InputValueEnum(base.Base): """The InputValueEnum object defines a valid enumeration for a given InputValueInstance.""" __tablename__ = 'input_value_enum' __reprattrs__ = ['value'] # columns id = base.id_builder.build() definition_id = Column(base.id_builder.type, ForeignKey('input_value_definition.id'), nullable=False) value = Column(Binary, nullable=False) # associations definition: 'InputValueDefinition' = relationship( 'InputValueDefinition', back_populates='allowed_value_enumerations') class InputValueInstance(base.Base): """The InputValueInstance object contains data for a InputValueDefinition and is linked to a Job.""" __tablename__ = 'input_value_instance' # columns id = base.id_builder.build() job_id = Column(base.id_builder.type, ForeignKey('job.id'), nullable=False) name: str = Column(String(256), nullable=False) internal_name: str = Column(String(256), nullable=False) raw_value: bytes = Column('value', Binary) # associations job: 'model.job.Job' = relationship('Job', back_populates='values') @property def definition(self): return self.job.form @hybrid_property def value(self): return json.loads(self.raw_value.decode()) if self.raw_value else None @value.setter def value(self, value): self.raw_value = json.dumps(value).encode() class InputFileInstance(base.Base): """The InputFileInstance object contains data for a InputFileDefinition and is linked to a Job.""" __tablename__ = 'input_file_instance' __reprattrs__ = ['filename', 'size'] # columns id = base.id_builder.build() job_id = Column(base.id_builder.type, ForeignKey('job.id'), nullable=False) name: str = Column(String(256), nullable=False) internal_name: str = Column(String(256), nullable=False) filename: str = Column(String(512), nullable=False) size: int = Column(Integer, nullable=False) binary: bytes = deferred(Column(Binary)) # associations job: 'model.job.Job' = relationship('Job', back_populates='files') @property def content_type(self): if self.filename.endswith('.log'): # quick fix for missing .log type on unix systems return 'text/plain' return mimetypes.guess_type(self.filename)[0] @property def content_encoding(self): return mimetypes.guess_type(self.filename)[1] PK!Z$KKriberry/model/job/__init__.pyimport enum import functools import json import mimetypes from datetime import datetime from typing import List, Optional import pendulum from croniter import croniter from sqlalchemy import Column, String, ForeignKey, DateTime, Boolean, Integer, Binary, Index, Enum, desc, Float, asc, \ UniqueConstraint, select, func, sql from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import relationship, deferred, validates, foreign, remote from riberry import model from riberry.model import base class ArtifactType(enum.Enum): output = 'output' error = 'error' report = 'report' def __repr__(self): return repr(self.value) class Job(base.Base): """ A Job is an object which represents a set of inputs provided to a form. A Job can have one or more JobExecutions which represent the execution of the linked ApplicationInterface on the linked ApplicationInstance for the given input stored against this Job. Jobs are immutable. If we require a different set of values, we'll need to create a new Job. """ __tablename__ = 'job' __reprattrs__ = ['name'] __table_args__ = ( Index('j__idx_form_id', 'form_id'), Index('j__idx_creator_id', 'creator_id'), ) # columns id = base.id_builder.build() form_id = Column(base.id_builder.type, ForeignKey('form.id'), nullable=False) creator_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=False) name: str = Column(String(64), nullable=False, unique=True, comment='The unique name of our job.') created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) # associations creator: 'model.auth.User' = relationship('User') form: 'model.interface.Form' = relationship('Form', back_populates='jobs') executions: List['JobExecution'] = relationship( 'JobExecution', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: desc(JobExecution.updated), back_populates='job') schedules: List['JobSchedule'] = relationship('JobSchedule', cascade='save-update, merge, delete, delete-orphan', back_populates='job') values: List['model.interface.InputValueInstance'] = relationship('InputValueInstance', cascade='save-update, merge, delete, delete-orphan', back_populates='job') files: List['model.interface.InputFileInstance'] = relationship('InputFileInstance', cascade='save-update, merge, delete, delete-orphan', back_populates='job') # proxies instance: 'model.application.ApplicationInstance' = association_proxy('form', 'instance') def execute(self, creator_id): model.conn.add(instance=JobExecution(job=self, creator_id=creator_id)) class JobSchedule(base.Base): """ A JobSchedule object defines a schedule which will trigger an execution for the linked Job. Note that a JobExecution will only be created for an ApplicationInstance which has a status of "online". Applications which are offline or inactive due to an ApplicationInstanceSchedule will not have JobExecutions created. """ __tablename__ = 'sched_job' # columns id = base.id_builder.build() job_id = Column(base.id_builder.type, ForeignKey('job.id'), nullable=False) creator_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=False) enabled: bool = Column(Boolean(name='sched_job_enabled'), default=True, nullable=False, comment='Whether or not this schedule is active.') cron: str = Column(String(24), nullable=False, comment='The cron expression which defines our schedule.') created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False, comment='The time our schedule was created.') last_run: datetime = Column(DateTime(timezone=True), default=None, comment='The last time a job execution was created from our schedule.') limit: int = Column(Integer, default=0, comment='The amount of valid runs for this schedule.') total_runs: int = Column(Integer, default=0, comment='The total amount of runs for this schedule.') # associations job: 'Job' = relationship('Job', back_populates='schedules') creator: 'model.auth.User' = relationship('User') def run(self): if not self.enabled or self.job.instance.status != 'online': return ready_run = None for cron_time in croniter(self.cron, start_time=self.last_run or self.created, ret_type=pendulum.DateTime): cron_time = pendulum.instance(cron_time) if cron_time > base.utc_now(): break ready_run = cron_time if ready_run: self.last_run = ready_run self.total_runs += 1 if self.limit and self.total_runs >= self.limit: self.enabled = False self.job.execute(creator_id=self.creator_id) @property def next_run(self): if not self.enabled: return instance = croniter(self.cron, start_time=self.last_run or self.created, ret_type=pendulum.DateTime) return pendulum.instance(next(instance)) def memoize(func): result = [] @functools.wraps(func) def inner(*args, **kwargs): if result: return result[0] result.append(func(*args, **kwargs)) return result[0] return inner @memoize def _job_execution_select_latest_progress(): return select([ func.max(JobExecutionProgress.id).label('id'), JobExecutionProgress.job_execution_id ]).group_by( JobExecutionProgress.job_execution_id ).alias() class JobExecution(base.Base): """A JobExecution represent a single execution of our Job.""" __tablename__ = 'job_execution' __reprattrs__ = ['job_id', 'task_id', 'status'] __table_args__ = ( Index('j_e__idx_job_id', 'job_id'), Index('j_e__idx_creator_id', 'creator_id'), ) # columns id = base.id_builder.build() job_id = Column(base.id_builder.type, ForeignKey('job.id'), nullable=False) creator_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=False) task_id: str = Column(String(36), unique=True, comment='The internal identifier of our job execution. This is usually the Celery root ID.') status: str = Column(String(24), default='RECEIVED', comment='The current status of our job execution.') created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) started: datetime = Column(DateTime(timezone=True)) completed: datetime = Column(DateTime(timezone=True)) updated: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) priority = Column(Integer, default=64, nullable=False, comment='The priority of this execution. This only applies to tasks in the RECEIVED state.') parent_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), comment='The id of the execution which triggered this execution.') # associations creator: 'model.auth.User' = relationship('User') job: 'Job' = relationship('Job', back_populates='executions') streams: List['JobExecutionStream'] = relationship( 'JobExecutionStream', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionStream.id), back_populates='job_execution' ) artifacts: List['JobExecutionArtifact'] = relationship( 'JobExecutionArtifact', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionArtifact.id), back_populates='job_execution') external_tasks: List['JobExecutionExternalTask'] = relationship( 'JobExecutionExternalTask', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionExternalTask.id), back_populates='job_execution' ) reports: List['JobExecutionReport'] = relationship( 'JobExecutionReport', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionReport.id), back_populates='job_execution' ) progress: List['JobExecutionProgress'] = relationship( 'JobExecutionProgress', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: JobExecutionProgress.id.asc(), back_populates='job_execution' ) data: List['model.misc.ResourceData'] = model.misc.ResourceData.make_relationship( resource_id=id, resource_type=model.misc.ResourceType.job_execution, ) latest_progress: 'JobExecutionProgress' = relationship( 'JobExecutionProgress', secondary=lambda: _job_execution_select_latest_progress(), primaryjoin=lambda: JobExecution.id == _job_execution_select_latest_progress().c.job_execution_id, secondaryjoin=lambda: JobExecutionProgress.id == _job_execution_select_latest_progress().c.id, viewonly=True, uselist=False, ) parent_execution: 'JobExecution' = relationship('JobExecution', back_populates='child_executions', remote_side=[id]) child_executions: List['JobExecution'] = relationship('JobExecution', back_populates='parent_execution') # validations @validates('priority') def validate_priority(self, _, priority): assert isinstance(priority, int) and 255 >= priority >= 1, ( f'ApplicationInstanceSchedule.priority must be an integer between 1 and 255 (received {priority})') return priority @property def stream_status_summary(self): summary = model.conn.query( model.job.JobExecutionStream.status, func.count(model.job.JobExecutionStream.status) ).filter_by( job_execution=self, ).group_by( model.job.JobExecutionStream.status, ).all() return dict(summary) class JobExecutionProgress(base.Base): __tablename__ = 'job_progress' __reprattrs__ = ['message'] # columns id = base.id_builder.build() job_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), nullable=False) created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) message: str = Column(String(256), default=None, comment='Message describing the progress of the job execution.') progress_percentage = Column(Integer, default=None, nullable=True, comment='The progress of the job execution.') # associations job_execution: 'JobExecution' = relationship('JobExecution', back_populates='progress') # validations @validates('progress_percentage') def validate_priority(self, _, progress_percentage): if progress_percentage is not None: progress_percentage = min(max(progress_percentage, 0), 100) return progress_percentage class JobExecutionStream(base.Base): __tablename__ = 'job_stream' __reprattrs__ = ['name', 'task_id', 'status'] __table_args__ = ( Index('j_s__idx_job_execution_id', 'job_execution_id'), ) # columns id = base.id_builder.build() job_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), nullable=False) task_id: str = Column(String(36), unique=True) parent_stream_id = Column(base.id_builder.type, ForeignKey('job_stream.id')) name: str = Column(String(64), nullable=False) category: str = Column(String(64), nullable=False, default='Overall') status: str = Column(String(24), default='QUEUED') created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) started: datetime = Column(DateTime(timezone=True)) completed: datetime = Column(DateTime(timezone=True)) updated: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) # associations job_execution: 'JobExecution' = relationship('JobExecution', back_populates='streams') steps: List['JobExecutionStreamStep'] = relationship( 'JobExecutionStreamStep', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionStreamStep.id), back_populates='stream', ) artifacts: List['JobExecutionArtifact'] = relationship('JobExecutionArtifact', back_populates='stream') external_tasks: List['JobExecutionExternalTask'] = relationship( 'JobExecutionExternalTask', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionExternalTask.id), back_populates='stream', ) parent_stream: 'JobExecutionStream' = relationship('JobExecutionStream', back_populates='child_streams', remote_side=[id]) child_streams: List['JobExecutionStream'] = relationship('JobExecutionStream', back_populates='parent_stream') class JobExecutionStreamStep(base.Base): __tablename__ = 'job_stream_step' __reprattrs__ = ['name', 'task_id', 'status'] __table_args__ = ( Index('j_s_s__idx_stream_id', 'stream_id'), ) # columns id = base.id_builder.build() stream_id = Column(base.id_builder.type, ForeignKey('job_stream.id'), nullable=False) task_id: str = Column(String(36), unique=True) name: str = Column(String(64)) status: str = Column(String(24), default='RECEIVED') created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) started: datetime = Column(DateTime(timezone=True)) completed: datetime = Column(DateTime(timezone=True)) updated: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) # associations stream: 'JobExecutionStream' = relationship('JobExecutionStream', back_populates='steps') class JobExecutionArtifact(base.Base): __tablename__ = 'job_artifact' __reprattrs__ = ['name', 'filename'] __table_args__ = ( Index('j_a__idx_job_execution_id', 'job_execution_id'), Index('j_a__idx_stream_id', 'stream_id'), ) # columns id = base.id_builder.build() job_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), nullable=False) stream_id = Column(base.id_builder.type, ForeignKey('job_stream.id')) name: str = Column(String(128), nullable=False) type: str = Column(Enum(ArtifactType), nullable=False) category: str = Column(String(128), nullable=False, default='Default') filename: str = Column(String(512), nullable=False) created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) size: int = Column(Integer, nullable=False) # associations job_execution: 'JobExecution' = relationship('JobExecution', back_populates='artifacts') stream: 'JobExecutionStream' = relationship('JobExecutionStream', back_populates='artifacts') binary: 'JobExecutionArtifactBinary' = relationship( 'JobExecutionArtifactBinary', cascade='save-update, merge, delete, delete-orphan', back_populates='artifact', uselist=False) data: List['JobExecutionArtifactData'] = relationship( 'JobExecutionArtifactData', cascade='save-update, merge, delete, delete-orphan', back_populates='artifact') @property def content_type(self): if self.filename.endswith('.log'): # quick fix for missing .log type on unix systems return 'text/plain' return mimetypes.guess_type(self.filename)[0] @property def content_encoding(self): return mimetypes.guess_type(self.filename)[1] class JobExecutionArtifactData(base.Base): __tablename__ = 'job_artifact_data' # columns id = base.id_builder.build() title: str = Column(String(64), nullable=False) description: str = Column(String(512), nullable=False) artifact_id = Column(base.id_builder.type, ForeignKey('job_artifact.id'), nullable=False) # associations artifact: 'JobExecutionArtifact' = relationship('JobExecutionArtifact', back_populates='data') class JobExecutionArtifactBinary(base.Base): __tablename__ = 'job_artifact_binary' # columns id = base.id_builder.build() binary: bytes = deferred(Column(Binary, nullable=True)) artifact_id = Column(base.id_builder.type, ForeignKey('job_artifact.id'), nullable=False) # associations artifact: 'JobExecutionArtifact' = relationship('JobExecutionArtifact', back_populates='binary') class JobExecutionExternalTask(base.Base): __tablename__ = 'job_external_task' __reprattrs__ = ['name', 'type'] __table_args__ = ( Index('j_e__idx_job_execution_id', 'job_execution_id'), Index('j_e__idx_stream_id', 'stream_id'), ) # columns id = base.id_builder.build() job_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), nullable=False) stream_id = Column(base.id_builder.type, ForeignKey('job_stream.id')) user_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=True) group_id = Column(base.id_builder.type, ForeignKey('groups.id'), nullable=True) task_id: str = Column(String(64), unique=True) name: str = Column(String(128), nullable=False) type: str = Column(String(24), nullable=False) status: str = Column(String(24), default='WAITING', comment='The current status of the manual task.') input_data: Optional[bytes] = deferred(Column(Binary, nullable=True)) output_data: Optional[bytes] = deferred(Column(Binary, nullable=True)) # associations job_execution: 'JobExecution' = relationship('JobExecution', back_populates='external_tasks') stream: 'JobExecutionStream' = relationship('JobExecutionStream', back_populates='external_tasks') class JobExecutionReport(base.Base): __tablename__ = 'job_report' __reprattrs__ = ['internal_name'] # columns id = base.id_builder.build() job_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), nullable=False) name: str = Column(String(128), nullable=False) renderer: str = Column(String(24), nullable=False, default='unspecified') title: str = Column(String(128), nullable=True) category: str = Column(String(128), nullable=True) key: str = Column(String(128), nullable=True) raw_input_data: Optional[bytes] = deferred(Column('input_data', Binary, nullable=True)) report: Optional[bytes] = deferred(Column(Binary, nullable=True)) marked_for_refresh: bool = Column(Boolean(name='job_report_marked_for_refresh'), nullable=False, default=False) # associations job_execution: 'JobExecution' = relationship('JobExecution', back_populates='reports') @hybrid_property def input_data(self): return json.loads(self.raw_input_data.decode()) if self.raw_input_data else None @input_data.setter def input_data(self, value): self.raw_input_data = json.dumps(value).encode() PK!'zriberry/model/misc/__init__.pyimport enum import json from datetime import datetime from typing import List, Optional from sqlalchemy import Binary, String, Column, Float, ForeignKey, Boolean, DateTime, Index, Enum, UniqueConstraint, sql from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import relationship from riberry import model from riberry.model import base class ResourceType(enum.Enum): form = 'Form' user = 'User' job = 'Job' job_execution = 'JobExecution' def __repr__(self): return repr(self.value) class Document(base.Base): __tablename__ = 'document' __reprattrs__ = ['type'] id = base.id_builder.build() type: str = Column(String(24), nullable=False, default='markdown') content: bytes = Column(Binary, nullable=False) class Event(base.Base): __tablename__ = 'event' __reprattrs__ = ['name', 'root_id'] # columns id = base.id_builder.build() name: str = Column(String(64), nullable=False) time: float = Column(Float, nullable=False) root_id: str = Column(String(36), nullable=False) task_id: str = Column(String(36), nullable=False) data: str = Column(String(1024)) binary: bytes = Column(Binary) class NotificationType(enum.Enum): info = 'info' warning = 'warning' success = 'success' error = 'error' alert = 'alert' class UserNotification(base.Base): __tablename__ = 'notification_user' __table_args__ = ( Index('u_n__idx_job_id', 'user_id', 'read'), ) # columns id = base.id_builder.build() user_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=False) notification_id = Column(base.id_builder.type, ForeignKey('notification.id'), nullable=False) created: datetime = Column(DateTime(timezone=True), default=base.utc_now) read = Column(Boolean(name='notification_user_read'), nullable=False, default=False) # associations user: 'model.auth.User' = relationship('User', back_populates='notifications') notification: 'Notification' = relationship('Notification', back_populates='user_notifications') class Notification(base.Base): __tablename__ = 'notification' # columns id = base.id_builder.build() type = Column(Enum(NotificationType), nullable=False, default=NotificationType.info) message = Column(String(128), nullable=False) # associations user_notifications: List['UserNotification'] = relationship('UserNotification', back_populates='notification') targets: List['NotificationTarget'] = relationship('NotificationTarget', back_populates='notification') class NotificationTarget(base.Base): __tablename__ = 'notification_target' # columns id = base.id_builder.build() notification_id = Column(base.id_builder.type, ForeignKey('notification.id'), nullable=False) target = Column(String(128), nullable=False) target_id = Column(String(128), nullable=False) action = Column(String(32)) # associations notification: 'Notification' = relationship('Notification', back_populates='targets') class MenuItem(base.Base): __tablename__ = 'menu_item' # columns id = base.id_builder.build() parent_id = Column(base.id_builder.type, ForeignKey('menu_item.id')) menu_type = Column(String(128), nullable=False) type = Column(String(128), nullable=False) key = Column(String(128), nullable=False) label: Optional[str] = Column(String(128), nullable=True) # associations parent: 'MenuItem' = relationship('MenuItem', back_populates='children', remote_side=[id]) children: List['MenuItem'] = relationship('MenuItem', back_populates='parent') class ResourceData(base.Base): __tablename__ = 'resource_data' __reprattrs__ = ['name'] __table_args__ = ( UniqueConstraint('resource_id', 'resource_type', 'name'), ) # columns id = base.id_builder.build() resource_id = Column(base.id_builder.type, nullable=True) resource_type = Column(Enum(ResourceType), nullable=False) name: str = Column(String(256)) raw_value: bytes = Column('value', Binary, nullable=True) lock: str = Column(String(72), nullable=True) expiry: datetime = Column(DateTime(timezone=True), nullable=True) marked_for_refresh: bool = Column(Boolean(name='resource_data_marked_for_refresh'), nullable=False, default=False) @hybrid_property def value(self): return json.loads(self.raw_value.decode()) if self.raw_value else None @value.setter def value(self, value): self.raw_value = json.dumps(value).encode() @classmethod def make_relationship(cls, resource_id, resource_type): return relationship( 'ResourceData', primaryjoin=lambda: sql.and_( resource_id == ResourceData.resource_id, ResourceData.resource_type == resource_type ), order_by=lambda: ResourceData.id.asc(), foreign_keys=lambda: ResourceData.resource_id, cascade='save-update, merge, delete, delete-orphan', ) PK!Ariberry/plugins/__init__.pyfrom . import defaults import importlib import pkgutil from collections import defaultdict plugin_register = defaultdict(set) plugin_register['authentication'].add(defaults.authentication.DefaultAuthenticationProvider) plugin_register['policies'].add(defaults.policies.default_policies) ext_plugins = { name: importlib.import_module(name) for finder, name, ispkg in pkgutil.iter_modules() if name.startswith('riberry_') } PK!d''$riberry/plugins/defaults/__init__.pyfrom . import authentication, policies PK!h^<<*riberry/plugins/defaults/authentication.pyimport hashlib import os from riberry.model import auth from riberry.plugins.interfaces import AuthenticationProvider def make_hash(text) -> bytes: hash_ = hashlib.sha512() hash_.update(text) return hash_.hexdigest().encode() def salted_hash(plain: bytes, salt: bytes): plain_hash: bytes = make_hash(plain) return make_hash(plain_hash + salt) def check_password(input_password: bytes, hashed_password: bytes): size = len(hashed_password) // 2 password_hash, salt = hashed_password[:size], hashed_password[size:] input_hash = salted_hash(input_password, salt) + salt return hashed_password == input_hash def hash_password(password: bytes) -> bytes: salt = make_hash(os.urandom(1024)) return salted_hash(password, salt) + salt class DefaultAuthenticationProvider(AuthenticationProvider): @classmethod def name(cls) -> str: return 'default' def secure_password(self, password: str) -> bytes: return hash_password(password=(password or '').encode()) def authenticate(self, username: str, password: str) -> bool: user = auth.User.query().filter_by(username=username).first() if user: return check_password(input_password=(password or '').encode(), hashed_password=user.password.encode()) else: return False PK!KB^$riberry/plugins/defaults/policies.pyfrom riberry import policy, model from riberry.policy import AttributeContext class RootPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return context.subject is not None class UserPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return True class ApplicationPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.application.Application) def condition(self, context: AttributeContext) -> bool: return True class ApplicationViewPolicy(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewApplicationUsingFormRelationshipRule(policy.Rule): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject application: model.application.Application = context.resource for instance in application.instances: for form in instance.forms: if set(user.groups) & set(form.groups): return True return False class GenericCreatePolicy(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'create' def condition(self, context: AttributeContext) -> bool: return True class RejectCreateRule(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return False class ApplicationInstancePolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.application.ApplicationInstance) def condition(self, context: AttributeContext) -> bool: return True class ApplicationInstanceViewPolicy(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewApplicationInstanceUsingFormRelationshipRule(policy.Rule): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject instance: model.application.ApplicationInstance = context.resource for form in instance.forms: if set(user.groups) & set(form.groups): return True return False class ApplicationFormPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.interface.Form) def condition(self, context: AttributeContext) -> bool: return True class ApplicationFormViewPolicy(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewFormRule(policy.Rule): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject form: model.interface.Form = context.resource if set(user.groups) & set(form.groups): return True return False class JobPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.job.Job) def condition(self, context: AttributeContext) -> bool: return True class JobViewPolicy(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewJobUsingFormRelationshipRule(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject job: model.job.Job = context.resource if set(user.groups) & set(job.form.groups): return True return False class JobExecutionPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.job.JobExecution) def condition(self, context: AttributeContext) -> bool: return True class JobExecutionViewPolicy(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewJobExecutionUsingFormRelationshipRule(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject execution: model.job.JobExecution = context.resource if set(user.groups) & set(execution.job.form.groups): return True return False class JobExecutionArtifactPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.job.JobExecutionArtifact) def condition(self, context: AttributeContext) -> bool: return True class JobExecutionArtifactViewPolicy(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewJobExecutionArtifactUsingFormRelationshipRule(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject execution: model.job.JobExecutionArtifact = context.resource if set(user.groups) & set(execution.job_execution.job.form.groups): return True return False default_policies = policy.AuthorizationEngine( 'default', RootPolicySet( UserPolicySet( ApplicationPolicySet( ApplicationViewPolicy( ViewApplicationUsingFormRelationshipRule() ), GenericCreatePolicy( RejectCreateRule() ) ), ApplicationInstancePolicySet( ApplicationInstanceViewPolicy( ViewApplicationInstanceUsingFormRelationshipRule() ), GenericCreatePolicy( RejectCreateRule() ) ), ApplicationFormPolicySet( ApplicationFormViewPolicy( ViewFormRule() ), GenericCreatePolicy( RejectCreateRule() ) ), JobPolicySet( JobViewPolicy( ViewJobUsingFormRelationshipRule() ) ), JobExecutionPolicySet( JobExecutionViewPolicy( ViewJobExecutionUsingFormRelationshipRule() ) ), JobExecutionArtifactPolicySet( JobExecutionArtifactViewPolicy( ViewJobExecutionArtifactUsingFormRelationshipRule() ) ) ) ) )PK! riberry/plugins/interfaces.pyimport abc class AuthenticationProvider(metaclass=abc.ABCMeta): def __init__(self, config_dict): self.raw_config = config_dict @classmethod def name(cls) -> str: raise NotImplementedError def authenticate(self, username: str, password: str) -> bool: raise NotImplementedError def secure_password(self, password: bytes) -> bytes: raise NotImplementedError def on_enabled(self): pass PK!j;ͬriberry/policy/__init__.pyfrom .engine import PolicyContext, AuthorizationEngine, PolicySet, Policy, Rule, AttributeContext from .helpers import policy_set, policy, rule context = PolicyContext() PK! X/riberry/policy/engine.pyimport abc import functools import inspect from contextlib import contextmanager from threading import local from typing import Set, Union, Optional, Type, Callable from riberry.exc import AuthorizationError class NotApplicable(Exception): pass class AuthorizationEngine: def __init__(self, name, *policy): self.name = name self.policies: Set[Union[PolicySet, Policy]] = set(policy) def authorize(self, context): results = [] for policy_sey in self.policies: try: results.append(policy_sey.authorize(context=context)) except NotApplicable: pass return False if False in results else True class PolicyContext: _local = local() _no_default = object() def __getitem__(self, item): return getattr(self._local, item, None) def __setitem__(self, item, value): setattr(self._local, item, value) @property def enabled(self): return self['enabled'] @enabled.setter def enabled(self, value): self['enabled'] = value @property def subject(self): return self['subject'] @subject.setter def subject(self, value): self['subject'] = value @property def environment(self): return self['environment'] @environment.setter def environment(self, value): self['environment'] = value @property def engine(self) -> AuthorizationEngine: return self['policy_engine'] @engine.setter def engine(self, value): self['policy_engine'] = value @property def on_deny(self) -> Optional[Exception]: return self['on_deny'] @on_deny.setter def on_deny(self, value): self['on_deny'] = value @contextmanager def scope(self, subject, environment, policy_engine, on_deny: Optional[Union[Type, Callable]] = AuthorizationError): self.configure(subject=subject, environment=environment, policy_engine=policy_engine, on_deny=on_deny) yield self.reset() @contextmanager def disabled_scope(self): try: self.enabled = False yield finally: self.enabled = True def configure(self, subject, environment, policy_engine, on_deny: Optional[Union[Type, Callable]] = AuthorizationError): self.enabled = True self.subject = subject self.environment = environment self.engine = policy_engine self.on_deny = on_deny def reset(self): self.enabled = True self.subject = None self.environment = None self.engine = None self.on_deny = None @classmethod def current(cls): return cls() def authorize(self, resource, action, on_deny: Optional[Union[Type, Callable]] = _no_default): if not self.enabled: return True attr_context = AttributeContext( subject=self.subject, environment=self.environment, action=action, resource=resource ) result = self.engine.authorize(attr_context) if result is False: on_deny = self.on_deny if on_deny is self._no_default else on_deny if inspect.isclass(on_deny) and issubclass(on_deny, Exception): raise on_deny elif callable(on_deny): on_deny(attr_context) return result def filter(self, resources, action): return [resource for resource in resources if self.authorize(resource, action, on_deny=None)] def post_filter(self, action): def outer(func): @functools.wraps(func) def inner(*args, **kwargs): result = func(*args, **kwargs) return self.filter(resources=result, action=action) return inner return outer def post_authorize(self, action, on_deny: Optional[Union[Type, Callable]] = _no_default): def outer(func): @functools.wraps(func) def inner(*args, **kwargs): result = func(*args, **kwargs) self.authorize(resource=result, action=action, on_deny=on_deny) return result return inner return outer class AttributeContext: def __init__(self, subject, resource, action, environment): self.subject = subject self.resource = resource self.action = action self.environment = environment class AuthorizationElement(metaclass=abc.ABCMeta): def target_clause(self, context: AttributeContext) -> bool: raise NotImplementedError def condition(self, context: AttributeContext) -> bool: raise NotImplementedError def apply(self, context: AttributeContext): if not self.target_clause(context=context): raise NotApplicable return self.condition(context=context) def authorize(self, context): raise NotImplementedError class PolicyCollection(AuthorizationElement, metaclass=abc.ABCMeta): def __init__(self, *collection): self.collection: Set[AuthorizationElement] = set(collection) def authorize(self, context): if not self.apply(context=context): return False results = set() for policy in self.collection: try: results.add(policy.authorize(context=context)) except NotApplicable: pass if any(results): return True return False if False in results else None class PolicySet(PolicyCollection, metaclass=abc.ABCMeta): pass class Policy(PolicyCollection, metaclass=abc.ABCMeta): pass class Rule(AuthorizationElement, metaclass=abc.ABCMeta): def on_permit(self, context): pass def on_deny(self, context): pass def authorize(self, context): result = self.apply(context=context) self.on_permit(context=context) if result else self.on_deny(context=context) return result PK!?iMriberry/policy/helpers.pyfrom .engine import Rule, AttributeContext, Policy class ShorthandRule(Rule): def __init__(self, func): self.func = func def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return self.func(context) class ShorthandPolicy(Policy): def __init__(self, func, *collection): super(ShorthandPolicy, self).__init__(*collection) self.func = func def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return self.func(context) class ShorthandPolicySet(Policy): def __init__(self, func, *collection): super(ShorthandPolicySet, self).__init__(*collection) self.func = func def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return self.func(context) def rule(func): return ShorthandRule(func) def policy(func): def builder(*collection): return ShorthandPolicy(func, *collection) return builder def policy_set(func): def builder(*collection): return ShorthandPolicySet(func, *collection) return builderPK!Gdriberry/services/__init__.pyfrom riberry import policy as rib_policy from . import application, application_instance, form, auth, job, job_executions, self, policy def fetch_relationship(model_object, attribute, action): resource = getattr(model_object, attribute) if isinstance(resource, list): return rib_policy.context.filter(resources=resource, action=action) if rib_policy.context.authorize(resource=resource, action=action, on_deny=None) is not False: return resource return None PK!0lriberry/services/application.pyfrom typing import List from riberry import model, policy @policy.context.post_filter(action='view') def all_applications() -> List[model.application.Application]: return model.application.Application.query().all() @policy.context.post_authorize(action='view') def application_by_id(application_id) -> model.application.Application: return model.application.Application.query().filter_by(id=application_id).one() @policy.context.post_authorize(action='view') def application_by_internal_name(internal_name) -> model.application.Application: return model.application.Application.query().filter_by(internal_name=internal_name).one() def create_application(name, internal_name, description, type, document): app = model.application.Application( name=name, internal_name=internal_name, description=description, type=type, document=model.misc.Document(content=document) if document else None ) policy.context.authorize(app, action='create') model.conn.add(app) return app def update_application(application, attributes): for attr in {'name', 'description', 'type'} & set(attributes): setattr(application, attr, attributes[attr]) return application PK!<0(riberry/services/application_instance.pyfrom typing import List, Dict from riberry import model, policy, services @policy.context.post_filter(action='view') def all_application_instances() -> List[model.application.ApplicationInstance]: return model.application.ApplicationInstance.query().all() @policy.context.post_authorize(action='view') def application_instance_by_id(application_instance_id) -> model.application.ApplicationInstance: return model.application.ApplicationInstance.query().filter_by(id=application_instance_id).one() @policy.context.post_authorize(action='view') def application_instance_by_internal_name(internal_name) -> model.application.ApplicationInstance: return model.application.ApplicationInstance.query().filter_by(internal_name=internal_name).one() @policy.context.post_filter(action='view') def instances_by_application_id(application_id) -> List[model.application.ApplicationInstance]: application = services.application.application_by_id(application_id=application_id) return application.instances def create_application_instance(application, name, internal_name, schedules: List[Dict]) -> model.application.ApplicationInstance: application_instance = model.application.ApplicationInstance( application=application, name=name, internal_name=internal_name, schedules=create_application_instance_schedules(attributes_dict=schedules), ) policy.context.authorize(application_instance, action='create') model.conn.add(application_instance) return application_instance def create_application_instance_schedules(attributes_dict): return [ model.application.ApplicationInstanceSchedule( days=schedule['days'], start_time=schedule['start_time'], end_time=schedule['end_time'], timezone=schedule['timezone'], parameter=schedule['parameter'], value=schedule['value'], priority=schedule['priority'], ) for schedule in attributes_dict ] def update_application_instance(application_instance: model.application.ApplicationInstance, attributes: Dict): for attr in {'name'} & set(attributes): setattr(application_instance, attr, attributes[attr]) return application_instance PK!,Tbbriberry/services/auth.pyfrom riberry import model from datetime import timedelta from riberry.model.group import Group def authenticate_user(username: str, password: str) -> str: user = model.auth.User.authenticate(username=username, password=password) access_token = model.auth.AuthToken.create(user, expiry_delta=timedelta(days=5)) return access_token.decode() def all_groups(): return Group.query().all() def group_by_id(group_id): return Group.query().filter_by(id=group_id).first() def users_for_group_id(group_id): group = group_by_id(group_id=group_id) return group.users def forms_for_group_id(group_id): group = group_by_id(group_id=group_id) return group.forms def remove_user_from_group(group_id, user_id): _remove_resource_from_group(group_id, user_id, model.misc.ResourceType.user) def add_user_to_group(group_id, user_id): _add_resource_to_group(group_id, user_id, model.misc.ResourceType.user) def remove_form_from_group(group_id, form_id): _remove_resource_from_group(group_id, form_id, model.misc.ResourceType.form) def add_form_to_group(group_id, form_id): _add_resource_to_group(group_id, form_id, model.misc.ResourceType.form) def _find_group_association(group_id, resource_id, resource_type): return model.group.ResourceGroupAssociation.query().filter_by( group_id=group_id, resource_id=resource_id, resource_type=resource_type ).first() def _remove_resource_from_group(group_id, resource_id, resource_type): association = _find_group_association(group_id, resource_id, resource_type) if association: model.conn.delete(association) def _add_resource_to_group(group_id, resource_id, resource_type): association = _find_group_association(group_id, resource_id, resource_type) if not association: association = model.group.ResourceGroupAssociation( group_id=group_id, resource_id=resource_id, resource_type=resource_type ) model.conn.add(association) def create_group(name): group = model.group.Group(name=name) model.conn.add(group) return groupPK!M riberry/services/form.pyfrom typing import List, Dict from sqlalchemy import desc from riberry import model, policy from riberry import services @policy.context.post_filter(action='view') def all_forms() -> List[model.interface.Form]: return model.interface.Form.query().all() @policy.context.post_authorize(action='view') def form_by_id(form_id) -> model.interface.Form: return model.interface.Form.query().filter_by(id=form_id).one() @policy.context.post_authorize(action='view') def form_by_internal_name(internal_name) -> model.interface.Form: return model.interface.Form.query().filter_by( internal_name=internal_name, ).one() @policy.context.post_filter(action='view') def forms_by_application_id(application_id): application = services.application.application_by_id(application_id=application_id) return application.forms def create_form(application, instance, name, internal_name, version, description, input_files, input_values) -> model.interface.Form: form = model.interface.Form( application=application, instance=instance, name=name, internal_name=internal_name, version=version, description=description, input_file_definitions=[model.interface.InputFileDefinition(**d) for d in input_files], input_value_definitions=[model.interface.InputValueDefinition(**d) for d in input_values] ) policy.context.authorize(form, action='create') model.conn.add(form) return form def update_form(form: model.interface.Form, attributes: Dict): for attr in {'name', 'version', 'description'} & set(attributes): setattr(form, attr, attributes[attr]) return form @policy.context.post_authorize(action='view') def file_definition_by_internal_name(form, internal_name) -> model.interface.InputFileDefinition: return model.interface.InputFileDefinition.query().filter_by( form=form, internal_name=internal_name, ).one() @policy.context.post_authorize(action='view') def value_definition_by_internal_name(form, internal_name) -> model.interface.InputValueDefinition: return model.interface.InputValueDefinition.query().filter_by( form=form, internal_name=internal_name, ).one() def update_file_definition(definition: model.interface.InputFileDefinition, attributes: Dict): for attr in {'required', 'type', 'name', 'description', 'accept'} & set(attributes): setattr(definition, attr, attributes[attr]) return definition def update_value_definition(definition: model.interface.InputValueDefinition, attributes: Dict): for attr in {'required', 'type', 'name', 'description', 'default_binary'} & set(attributes): setattr(definition, attr, attributes[attr]) if 'allowed_binaries' in attributes: current = set(definition.allowed_binaries) present = set(attributes['allowed_binaries']) removed = current - present if current != present: definition.allowed_binaries = [enum for enum in attributes['allowed_binaries'] if enum not in removed] elif definition.allowed_binaries: definition.allowed_binaries = [] return definition def job_executions_by_id(form_id, limit=50): form = form_by_id(form_id=form_id) return model.job.JobExecution.query().filter( (model.job.JobExecution.job_id == model.job.Job.id) & (model.job.Job.form_id == form.id) ).order_by(desc(model.job.JobExecution.updated)).limit(limit).all() PK!F UÌriberry/services/job.pyimport json from io import BytesIO from typing import Dict from riberry import model, services, policy, exc class InputFileProxy(BytesIO): def __init__(self, obj, filename=None): self.filename = filename super().__init__(obj) @classmethod def from_object(cls, obj, filename='input'): if isinstance(obj, bytes): binary = obj elif isinstance(obj, str): binary = obj.encode() if not filename.endswith('.txt'): filename += '.txt' else: binary = json.dumps(obj).encode() if not filename.endswith('.json'): filename += '.json' return cls(binary, filename) def jobs_by_form_id(form_id): return model.job.Job.query().filter_by(form_id=form_id).all() def verify_inputs(input_value_definitions, input_file_definitions, input_values, input_files): value_map_definitions: Dict[str, 'model.interface.InputValueDefinition'] = {input_def.name: input_def for input_def in input_value_definitions} file_map_definitions: Dict[str, 'model.interface.InputValueDefinition'] = {input_def.name: input_def for input_def in input_file_definitions} value_mapping = {d.internal_name: d.name for d in value_map_definitions.values()} file_mapping = {d.internal_name: d.name for d in file_map_definitions.values()} input_values = {value_mapping.get(k, k): v for k, v in input_values.items()} input_files = {file_mapping.get(k, k): v for k, v in input_files.items()} input_value_mapping = {} input_file_mapping = {} errors = [] for name, definition in value_map_definitions.items(): if name in input_values: value = input_values.pop(name) else: value = definition.default_binary if definition.required and not value: err = exc.RequiredInputError(target='job', field=definition.name, internal_name=definition.internal_name) errors.append(err) continue if definition.allowed_binaries and value: values = value if isinstance(value, str): values = [value] if isinstance(values, list): values = [json.dumps(v).encode() if v else v for v in values] if set(values) - set(definition.allowed_binaries) or definition.type != 'text-multiple' and len(values) > 1: err = exc.InvalidEnumError( target='job', field=definition.name, allowed_values=definition.allowed_values, internal_name=definition.internal_name ) errors.append(err) continue input_value_mapping[definition] = value for name in list(input_values): if name in file_mapping and name not in input_files: input_files[file_mapping[name]] = input_values.pop(name) for name, definition in file_map_definitions.items(): if name in input_files: value = input_files.pop(name) else: value = None if definition.required and not value: err = exc.RequiredInputError(target='job', field=definition.name, internal_name=definition.internal_name) errors.append(err) continue input_file_mapping[definition] = value unexpected_inputs = set(input_values) | set(input_files) if unexpected_inputs: for input_ in unexpected_inputs: err = exc.UnknownInputError(target='job', field=input_) errors.append(err) if errors: raise exc.InputErrorGroup(*errors) return input_value_mapping, input_file_mapping def create_job(form_id, name, input_values, input_files, execute, parent_execution=None): form = services.form.form_by_id(form_id=form_id) policy.context.authorize(form, action='view') errors = [] if not name: err = exc.RequiredInputError(target='job', field='name') errors.append(err) else: if model.job.Job.query().filter_by(name=name).first(): err = exc.UniqueInputConstraintError(target='job', field='name', value=name) errors.append(err) try: values_mapping, files_mapping = verify_inputs( input_value_definitions=form.input_value_definitions, input_file_definitions=form.input_file_definitions, input_values=input_values, input_files=input_files ) except exc.InputErrorGroup as e: e.extend(errors) raise else: if errors: raise exc.InputErrorGroup(*errors) input_value_instances = [] input_file_instances = [] values_mapping = { k: (json.dumps(v).encode() if v and not isinstance(v, bytes) else v) for k, v in values_mapping.items() } for definition, value in values_mapping.items(): input_value_instance = model.interface.InputValueInstance( name=definition.name, internal_name=definition.internal_name, raw_value=value ) input_value_instances.append(input_value_instance) for definition, value in files_mapping.items(): filename = definition.internal_name if not hasattr(value, 'read'): value = InputFileProxy.from_object(obj=value, filename=filename) binary = value.read() if isinstance(binary, str): binary = binary.encode() if hasattr(value, 'filename'): filename = value.filename input_file_instance = model.interface.InputFileInstance( name=definition.name, internal_name=definition.internal_name, filename=filename, binary=binary, size=len(binary) if binary else 0 ) input_file_instances.append(input_file_instance) job = model.job.Job( form=form, name=name, files=input_file_instances, values=input_value_instances, creator=policy.context.subject ) policy.context.authorize(job, action='create') if execute: create_job_execution(job, parent_execution=parent_execution) model.conn.add(job) return job @policy.context.post_authorize(action='view') def job_by_id(job_id): return model.job.Job.query().filter_by(id=job_id).one() @policy.context.post_filter(action='view') def job_executions_by_id(job_id): return model.job.JobExecution.query().filter_by(job_id=job_id).all() def create_job_execution_by_job_id(job_id): job = job_by_id(job_id=job_id) return create_job_execution(job=job) def create_job_execution(job, parent_execution=None): execution = model.job.JobExecution(job=job, creator=policy.context.subject, parent_execution=parent_execution) policy.context.authorize(execution, action='create') model.conn.add(execution) return execution def input_file_instance_by_id(input_file_instance_id) -> model.interface.InputFileInstance: return model.interface.InputFileInstance.query().filter_by(id=input_file_instance_id).one() def delete_job_by_id(job_id): delete_job(job=job_by_id(job_id=job_id)) @policy.context.post_authorize(action='view') def delete_job(job): model.conn.delete(job) PK!JJ"riberry/services/job_executions.pyfrom riberry.celery import client from riberry import model, policy @policy.context.post_authorize(action='view') def job_execution_by_id(execution_id): return model.job.JobExecution.query().filter_by(id=execution_id).one() @policy.context.post_authorize(action='view') def job_artifact_by_id(artifact_id): return model.job.JobExecutionArtifact.query().filter_by(id=artifact_id).one() @policy.context.post_authorize(action='view') def job_stream_by_id(stream_id): return model.job.JobExecutionStream.query().filter_by(id=stream_id).one() @policy.context.post_authorize(action='view') def delete_job_execution_by_id(execution_id): delete_job_execution(execution=job_execution_by_id(execution_id=execution_id)) @policy.context.post_authorize(action='view') def delete_job_execution(execution): model.conn.delete(execution) @policy.context.post_authorize(action='view') def cancel_job_execution_by_id(execution_id): cancel_job_execution(execution=job_execution_by_id(execution_id=execution_id)) @policy.context.post_authorize(action='view') def cancel_job_execution(execution): if execution.status in ('SUCCESS', 'FAILURE'): return user = policy.context.subject message = 'The current execution was manually cancelled by user {} ({}).'.format( user.username, user.details.display_name ).encode() artifact = model.job.JobExecutionArtifact( name=f'Workflow cancelled by user {user.username}', job_execution=execution, type=model.job.ArtifactType.error, category='Fatal', filename='fatal.log', size=len(message), binary=model.job.JobExecutionArtifactBinary(binary=message), ) model.conn.add(artifact) client.workflow_complete( task_id=execution.task_id, root_id=execution.task_id, status='FAILURE', primary_stream=None) PK!緔riberry/services/policy.pyfrom contextlib import contextmanager from riberry import model, config, policy @contextmanager def policy_scope(user=None, environment=None): if isinstance(user, str): user = model.auth.User.query().filter_by(username=user).one() policy_provider = config.config.policies.provider with policy.context.scope(subject=user, environment=environment, policy_engine=policy_provider): yield PK! riberry/services/self.pyfrom typing import List from sqlalchemy import desc from riberry import model, policy def profile() -> model.auth.User: return policy.context.subject def latest_notifications(): user = policy.context.subject return model.misc.UserNotification.query().filter_by(user=user).order_by( desc(model.misc.UserNotification.created) ).limit(32).all() def unread_notification_count(): user = policy.context.subject return model.misc.UserNotification.query().filter_by(read=False, user=user).count() def mark_notifications_as_read(notification_ids: List): user = policy.context.subject notifications: List[model.misc.UserNotification] = model.misc.UserNotification.query().filter( (model.misc.UserNotification.id.in_(notification_ids)) & (model.misc.UserNotification.user == user) & (model.misc.UserNotification.read == False) ).all() for notification in notifications: notification.read = True def mark_all_notifications_as_read(): user = policy.context.subject notifications: List[model.misc.UserNotification] = model.misc.UserNotification.query().filter_by( user=user, read=False ).all() for notification in notifications: notification.read = True PK!riberry/util/__init__.pyPK!_ _ riberry/util/__main__.pyimport json import click from . import user, config_importer, groups @click.group() def cli(): pass @cli.command('import') @click.option('--config-path', '-p', prompt='Configuration path', help='YAML file containing application references.') @click.option('--commit/--rollback', '-c/-r', prompt='Dry run', is_flag=True, help='Gathers all database changes without commits.', default=False) @click.option('--app', '-a', 'apps', help='Restricts the import to the specified app', multiple=True) def importer(config_path, commit, apps): changes = config_importer.import_from_file(config_path, dry_run=not commit, restrict_apps=apps) print(json.dumps(changes, indent=2)) @cli.command('add-user') @click.option('--username', prompt='Username', help="User's username") @click.option('--password', prompt='Password', help="User's password", hide_input=True, confirmation_prompt=True) @click.option('--first-name', prompt='First name', help="User's first name") @click.option('--last-name', prompt='Last name', help="User's last name") @click.option('--display-name', prompt='Full name', help="User's last name") @click.option('--department', prompt='Department', help="User's department") @click.option('--email', prompt='Email', help="User's email") def add_user(username, password, first_name, last_name, display_name, department, email): user_id = user.add_user( username=username, password=password, first_name=first_name, last_name=last_name, display_name=display_name, department=department, email=email, ) print(f'Created user {username} (User ID: {user_id})') @cli.command('user-groups') @click.argument('action', type=click.Choice(['add', 'remove'])) @click.option('--username', '-u', prompt='Username', help="User's username") @click.option('--group', '-g', prompt='Username', help="Group's name") def modify_user_groups(action, username, group): try: if action == 'add': groups.add_user_to_group(username=username, group_name=group) print(f'Added {username} to group {group}') elif action == 'remove': groups.remove_user_from_group(username=username, group_name=group) print(f'Removed {username} from group {group}') except Exception as exc: print(str(exc)) exit(1) if __name__ == '__main__': cli() PK!W@xxriberry/util/common.pyimport os from string import Template def variable_substitution(obj): if isinstance(obj, str): try: return Template(template=obj).substitute(os.environ) except KeyError as exc: key = exc.args[0] raise ValueError(f'Environment variable substitution failed for {key!r}. ' f'Does the environment variable exist?') elif isinstance(obj, dict): return {k: variable_substitution(v) for k, v in obj.items()} elif isinstance(obj, (tuple, list, set)): return type(obj)(map(variable_substitution, obj)) else: return obj PK!Pb"?"?riberry/util/config_importer.pyimport json import os import uuid import yaml from sqlalchemy.inspection import inspect from sqlalchemy.orm.exc import NoResultFound from riberry import model, services, policy from riberry.util.common import variable_substitution class Loader(yaml.SafeLoader): """ https://stackoverflow.com/a/9577670 """ def __init__(self, stream): self._root = os.path.split(stream.name)[0] super(Loader, self).__init__(stream) def include(self, node): filename = variable_substitution(os.path.join(self._root, self.construct_scalar(node))) with open(filename, 'r') as f: if filename.endswith('.yaml') or filename.endswith('.yml'): return yaml.load(f, Loader) else: return f.read() Loader.add_constructor('!include', Loader.include) def collection_diff(obj, collection_name, loader): current_collection = set(getattr(obj, collection_name)) new_collection = set(loader()) for stale in current_collection - new_collection: model.conn.delete(stale) setattr(obj, collection_name, list(new_collection)) return obj def model_diff(obj): object_info = inspect(obj) return { name: attr.history for name, attr in object_info.attrs.items() if attr.history.has_changes() } def session_diff(): diff = {} for obj in sorted(model.conn.dirty | model.conn.new | model.conn.deleted, key=lambda o: str(o)): type_ = 'Modified' if obj in model.conn.dirty else 'Added' if obj in model.conn.new else 'Deleted' if obj in model.conn.deleted else 'REF' diff[obj] = type_, model_diff(obj) return diff def import_applications(applications, restrict=None): existing_apps = {a.internal_name: a for a in model.application.Application.query().all()} if not restrict: for stale in set(existing_apps) - set(applications): model.conn.delete(existing_apps[stale]) apps = [] for application, properties in applications.items(): if restrict and application not in restrict: continue app = import_application(internal_name=application, attributes=properties) apps.append(app) return apps def import_application(internal_name, attributes): try: app = services.application.application_by_internal_name(internal_name=internal_name) app = services.application.update_application(app, attributes) except NoResultFound: app = services.application.create_application( internal_name=internal_name, name=attributes.get('name'), description=attributes.get('description'), type=attributes.get('type'), document=None ) if attributes.get('document'): if not app.document: app.document = model.misc.Document() model.conn.add(app.document) app.document.content = attributes['document'].encode() else: if app.document: app.document = None import_instances(app, attributes.get('instances') or {}) import_forms(app, attributes.get('forms') or {}) return app def import_instances(app, instances): return collection_diff( obj=app, collection_name='instances', loader=lambda: { import_instance(app, name, attrs) for name, attrs in instances.items() } ) def import_forms(app, forms): return collection_diff( obj=app, collection_name='forms', loader=lambda: { import_form(app, name, attrs) for name, attrs in forms.items() } ) def import_form(app, internal_name, attributes): try: form = services.form.form_by_internal_name(internal_name=internal_name) form = services.form.update_form(form, attributes) except NoResultFound: instance_internal_name = attributes['instance'] instances = [instance for instance in app.instances if instance.internal_name == instance_internal_name] if not instances: raise ValueError(f'Could not find instance {instance_internal_name} for form {internal_name}') form = services.form.create_form( application=app, instance=instances[0], name=attributes.get('name'), internal_name=internal_name, version=attributes.get('version'), description=attributes.get('description'), input_files=[], input_values=[] ) if attributes.get('document'): if not form.document: form.document = model.misc.Document() model.conn.add(form.document) form.document.content = attributes['document'].encode() else: if form.document: form.document = None import_form_inputs( form=form, input_files=attributes.get('inputFiles') or {}, input_values=attributes.get('inputValues') or {}, ) return form def import_input_file_definition(form, internal_name, attributes): try: if not form.id: raise NoResultFound definition = services.form.file_definition_by_internal_name(form=form, internal_name=internal_name) definition = services.form.update_file_definition(definition, attributes) except NoResultFound: definition = model.interface.InputFileDefinition(internal_name=internal_name, **attributes) model.conn.add(definition) return definition def import_input_value_definition(form, internal_name, attributes): mapping = { 'enumerations': ('allowed_binaries', lambda values: [json.dumps(v).encode() for v in values]), 'default': ('default_binary', lambda v: json.dumps(v).encode()), } attributes = dict( (mapping[k][0], mapping[k][1](v)) if k in mapping else (k, v) for k, v in attributes.items() ) try: if not form.id: raise NoResultFound definition = services.form.value_definition_by_internal_name(form=form, internal_name=internal_name) definition = services.form.update_value_definition(definition, attributes) except NoResultFound: definition = model.interface.InputValueDefinition(internal_name=internal_name, **attributes) model.conn.add(definition) return definition def import_form_inputs(form, input_files, input_values): collection_diff( obj=form, collection_name='input_file_definitions', loader=lambda: { import_input_file_definition(form, name, attrs) for name, attrs in input_files.items() } ) collection_diff( obj=form, collection_name='input_value_definitions', loader=lambda: { import_input_value_definition(form, name, attrs) for name, attrs in input_values.items() } ) def import_instance(app, internal_name, attributes): try: instance = services.application_instance.application_instance_by_internal_name(internal_name=internal_name) instance = services.application_instance.update_application_instance(instance, attributes) except NoResultFound: instance = services.application_instance.create_application_instance( application=app, internal_name=internal_name, name=attributes.get('name'), schedules=[] ) current_schedules = {} for schedule in instance.schedules: current_schedules[( schedule.days, schedule.start_time, schedule.end_time, schedule.timezone, schedule.parameter, schedule.value, schedule.priority, )] = schedule loaded_schedules = set(( sched['days'].lower() if isinstance(sched.get('days'), str) else sched.get('days', '*'), model.application.ApplicationInstanceSchedule.cleanse_time( sched.get('startTime') or model.application.ApplicationInstanceSchedule.start_time.default.arg), model.application.ApplicationInstanceSchedule.cleanse_time( sched.get('endTime') or model.application.ApplicationInstanceSchedule.end_time.default.arg), sched.get('timeZone') or model.application.ApplicationInstanceSchedule.timezone.default.arg, sched['parameter'], str(sched['value']) if sched['value'] not in ('', None) else None, sched.get('priority') or model.application.ApplicationInstanceSchedule.priority.default.arg, ) for sched in attributes.get('schedules', [])) for stale in set(current_schedules) - loaded_schedules: model.conn.delete(current_schedules[stale]) new_schedules = [] for schedule in loaded_schedules: if schedule in current_schedules: continue days, start_time, end_time, timezone, parameter, value, priority = schedule new_schedules.append( dict( days=days, start_time=start_time, end_time=end_time, timezone=timezone, parameter=parameter, value=value, priority=priority, ) ) instance.schedules += services.application_instance.create_application_instance_schedules(new_schedules) return instance def import_groups(applications, restrict): created_groups = {} for application, properties in applications.items(): if restrict and application not in restrict: continue for form_internal_name, form_info in properties.get('forms', {}).items(): groups = form_info.get('groups') or [] form: model.interface.Form = model.interface.Form.query().filter_by( internal_name=form_internal_name, ).first() if not form: print(f'import_groups:: Form {form_internal_name} not found, skipping {application}') continue associations = {assoc.group.name: assoc for assoc in form.group_associations} stale_assocs = set(associations) - set(groups) for stale in stale_assocs: model.conn.delete(associations[stale]) for group_name in groups: if group_name in associations: continue group = model.group.Group.query().filter_by(name=group_name).first() if not group: group = created_groups.get(group_name) if not group: group = services.auth.create_group(name=group_name) created_groups[group_name] = group association = model.group.ResourceGroupAssociation( resource_id=form.id, resource_type=model.misc.ResourceType.form, group=group ) model.conn.add(association) def _convert_value(value): if isinstance(value, list): return [_convert_value(v) for v in value] if isinstance(value, model.base.Base): return dict( name=type(value).__name__, id=getattr(value, 'id', None) ) if isinstance(value, str) and len(value) > 512: return f'(trimmed) {value[:512]}' if isinstance(value, bytes): return f'bytes (size: {len(value)})' import enum if isinstance(value, enum.Enum): return value.name return value def json_diff(diff): output = {'Modified': [], 'Added': [], 'Deleted': []} for obj, (diff_type, changes) in diff.items(): if not changes and diff_type not in ('Added', 'Deleted'): continue entry = {} for k, v in changes.items(): attr = {} if v.deleted: attr['Deleted'] = _convert_value(v.deleted[0]) if v.added and v.added[0]: attr['Added'] = _convert_value(v.added[0]) entry[k] = attr obj_dict = _convert_value(obj) output[diff_type].append({ 'id': obj_dict['id'], 'type': obj_dict['name'], 'attributes': entry }) return output def import_menu_for_forms(menu): for item in model.misc.MenuItem.query().all(): model.conn.delete(item) for item in menu: menu_item = import_menu_item(item, menu_type='forms', parent=None) model.conn.add(menu_item) def import_menu_item(item, menu_type, parent=None): menu_item = model.misc.MenuItem(parent=parent, menu_type=menu_type) if isinstance(item, dict): menu_item.key = str(uuid.uuid4()) menu_item.type = 'branch' menu_item.label = list(item.keys())[0] menu_item.children = [ import_menu_item(child, menu_type, menu_item) for child in list(item.values())[0] ] elif isinstance(item, str): menu_item.key = item menu_item.type = 'leaf' return menu_item def import_config(config, formatter=None, restrict_apps=None): applications = config.get('applications') or {} import_applications(applications=applications, restrict=restrict_apps) if not restrict_apps: import_capacities(config.get('capacity-configuration') or {}) diff = session_diff() model.conn.flush() import_menu_for_forms(menu=config.get('menues', {}).get('forms', {})) import_groups(applications=applications, restrict=restrict_apps) for k, v in session_diff().items(): if k in diff: diff[k] = diff[k][0], {**diff[k][1], **v[1]} else: diff[k] = v model.conn.flush() return formatter(diff) if formatter else diff def import_capacities(capacities): existing = {c.weight_parameter: c for c in model.application.CapacityConfiguration.query().all()} for stale in set(existing) - set(capacities): model.conn.delete(existing[stale]) for weight_name, properties in capacities.items(): import_capacity(weight_name, properties) def import_capacity(weight_parameter, properties): capacity_config: model.application.CapacityConfiguration = \ model.application.CapacityConfiguration.query().filter_by(weight_parameter=weight_parameter).first() if not capacity_config: capacity_config = model.application.CapacityConfiguration(weight_parameter=weight_parameter) capacity_config.capacity_parameter = properties['parameters']['capacity'] capacity_config.producer_parameter = properties['parameters']['producer'] capacity_config.distribution_strategy = ( model.application.CapacityDistributionStrategy(properties['strategy']) if 'strategy' in properties else model.application.CapacityConfiguration.distribution_strategy.default.arg ) new_producers = {p['internalName']: p for p in properties['producers']} # Update existing, delete old for producer in capacity_config.producers: if producer.internal_name not in new_producers: model.conn.delete(producer) else: producer_config = new_producers.pop(producer.internal_name) producer.name = producer_config.get('name') or producer.internal_name producer.capacity = producer_config['capacity'] # Add new for internal_name, producer_config in new_producers.items(): capacity_config.producers.append( model.application.CapacityProducer( internal_name=internal_name, name=producer_config.get('name') or internal_name, capacity=producer_config['capacity'], ) ) model.conn.add(capacity_config) def import_from_file(config_path, dry_run=True, formatter=json_diff, restrict_apps=None): with open(config_path) as f: config = variable_substitution(yaml.load(f, Loader) or {}) with model.conn.no_autoflush, policy.context.disabled_scope(): output = import_config(config, formatter=formatter, restrict_apps=restrict_apps) if dry_run: model.conn.rollback() else: model.conn.commit() return output PK!+ ]==riberry/util/groups.pyfrom typing import Tuple from riberry import model, services from sqlalchemy.orm.exc import NoResultFound def _find_user_and_group(username: str, group_name: str) -> Tuple[model.auth.User, model.group.Group]: try: user = model.auth.User.query().filter_by(username=username).one() except NoResultFound: raise ValueError(f'Could not find user with username {username!r}') try: group = model.group.Group.query().filter_by(name=group_name).one() except NoResultFound: raise ValueError(f'Could not find group with name {group_name!r}') return user, group def add_user_to_group(username: str, group_name: str): user, group = _find_user_and_group(username=username, group_name=group_name) services.auth.add_user_to_group(group_id=group.id, user_id=user.id) model.conn.commit() def remove_user_from_group(username: str, group_name: str): user, group = _find_user_and_group(username=username, group_name=group_name) services.auth.remove_user_from_group(group_id=group.id, user_id=user.id) model.conn.commit() PK!Ȟ66riberry/util/user.pyfrom riberry import model def add_user(username, password, first_name, last_name, display_name, department, email): user = model.auth.User( username=username, password=model.auth.User.secure_password(password).decode(), auth_provider='default', details=model.auth.UserDetails( first_name=first_name, last_name=last_name, display_name=display_name, department=department, email=email ) ) model.conn.add(user) model.conn.commit() return user.id PK!((riberry-0.9.2.dist-info/LICENSEMIT License Copyright (c) 2018 srafehi Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTUriberry-0.9.2.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H s riberry-0.9.2.dist-info/METADATAKS0tCflyOeEI/@#I{LK+W9zK?yiMNGs 9ukTNX,kszՆʚpr>{%h0ՐXiBnjJvNs)xd//7h%HOʮJ(nT$ r9]Vhb+T9qj,๑|VNNlL,#SQA 2C$zlLkcK&fc8{|Cgŕ@[a`Ne11~{Sқ< >/6V:&`Ϥmo?{*.vK۩tBh!6!7.eQ78nt̯> Nq/!7"`Vl{άh}t PK!H3jL#riberry-0.9.2.dist-info/RECORDrڒE[͠ {3A{oH(z}gRsd E߾_Ô0=/ ŬNOb^ z$~/2|+ dGLZkx5v aV1mO_EyAdmXȺ3iB!NG%q+Tĉ><8rLj YeDY=BM2O>h?ݸK7DT*!-cNVHwnGy@zF"y_̧aIk2D3ײbJby)DlG`?x^)FcN(x@^d\'f9aWp? A6>AlP8"G\CLRx$*{ /s sN({3֍5IX܅[ kxnWcZ( LdKFs楱#qJFM"\QsZ~V[vcЭ]EsImo\1e=5w ]SV|Έm{)%`0v3N[ʌ3 O´Ϳ=\F{+EҢ2鑽4Jol?igА& J0Yd)돃-p޻reܺO#tTbW,.<K<&5'+y|5x}K盄nWn37Z"2qw¦%f9v?\Э!M݌T2P071euvYVN'@-< ((#8ٕLȈe lܞ)/qIo*H6RF8MY $*8CIdK vuY I"L ˨?> ܣcJ\˔x Y S~Vm_L@Ү/(%hlF R{[2r+[%㹻է>OwFeBTt;wjޓO%P=#o8^V+ ,-mbеEQi:>rG>WRa;$u]pc#uOh^l*ZϢoo my4D˄RO#BrL˗lgߑ) 8Xx3`!=(I\g|u ֟ k)mXe! ?=7g:;wCYCUw;[90-#Ч;85]ʋɯUIV^Ӧ6r{.ӆz`:ڵE5k@ūxNTN Ỏ&$m{%A_,ۥt53-ˣ_cH2ߪ[C!# "|VxDC#$u"l.}Yr n\q yQ[N^Qsُ7uFm ˔k{k_0mHXкhR)E`]̢;`+:bј)rr+vZ|JB,Nv A:UuW!cWf6Fe}8 {kznJQEAQC "#7F-%U:}f ^v<|c$M|2^%+wZ-#u N>WsRln*o%Ƈ,$vb֬a*'XQr֝Uːb"LkGH M#|[2|.tYFߤ/TSUDG9!V&~>x_0ҚF^HybGI뤶#_7:)4^[;%W a@I >EwVЂ[3(+pu|tnH:op$9Vn.%N"#f$}b_˜㺚[jDG3l)=WT"tv^fuMx!]1Ҕ-z:\u !+wgP7:Qע90_Sb_^@_SMn eeS3.dE$g=,Iû=`7^PL#ym+ǮɉjA> 'i]0ݜ5i$ݹk|.Itڜj+O~ A{jONmY~|تxB>'d~^ }:5|F^g%7BY n0q'B #JL}ȩDķUBNO(Y9HbJY<p̓l4 _Lrn;9TYhFv=HGgS2ŞwzE"\'|qrXje%{@_f} zxFZ hJ(+C aXjS|)x栍^;Z al\}8WR%ǁ{U~lw_EW> @ڍlŸ%a!PPeB/OSI^AGS(0 ?@frǯvXމ]_4MT T#([)9g:6ס6#~(ғ5$GthJ0Y>ZCcbdU0>MEΛ$ ?Ogfu"W cY0P qJW+ KQPd6hȅ#W9߅u5[n`mE;r6[ϱWh΃馥$|Tu uM}ZÌ]UivFKUQ٨9Hhk^wI{~||qgT/UF{z.jPI$mdV=GQƍ! wo1}jB O %z@M=C;UH}|(p,eys]s<搾@G>'>Ϛu?`j~i6-\Os\=-kzF4&MPH^IEfpא3o\²£y7JI-:#LbTL$`O1{74C/uqX=Jd:o3p|OMvۇE5쿁5$kPK!rriberry/__init__.pyPK!M[%riberry/app/__init__.pyPK!WWriberry/app/actions/__init__.pyPK!^Ww !riberry/app/actions/artifacts.pyPK!w: : !0 riberry/app/actions/executions.pyPK!<}J$riberry/app/actions/external_task.pyPK!lnnriberry/app/actions/jobs.pyPK!DzZc#riberry/app/actions/notify.pyPK!S(riberry/app/actions/reports.pyPK!+M$$"V,riberry/app/actions/shared_data.pyPK!1riberry/app/addons/__init__.pyPK!{2riberry/app/addons/base.pyPK!Š,, 2riberry/app/backends/__init__.pyPK!=3riberry/app/backends/base.pyPK!%7riberry/app/backends/impl/__init__.pyPK!qIi<<,7riberry/app/backends/impl/celery/__init__.pyPK!3Z8riberry/app/backends/impl/celery/addons/__init__.pyPK!y5D9riberry/app/backends/impl/celery/addons/background.pyPK!4yy/>riberry/app/backends/impl/celery/addons/base.pyPK! <Criberry/app/backends/impl/celery/addons/capacity/__init__.pyPK!Tl4BNriberry/app/backends/impl/celery/addons/capacity/priority_queue.pyPK!Óx3 3 A"^riberry/app/backends/impl/celery/addons/external_task_receiver.pyPK!t't'0griberry/app/backends/impl/celery/addons/scale.pyPK!|))3vriberry/app/backends/impl/celery/addons/subqueue.pyPK!;X X (riberry/app/backends/impl/celery/base.pyPK!еihh,riberry/app/backends/impl/celery/executor.pyPK!$R-@riberry/app/backends/impl/celery/extension.pyPK!Yo??)riberry/app/backends/impl/celery/patch.pyPK!rHc{{)riberry/app/backends/impl/celery/tasks.pyPK! riberry/app/base.pyPK!!Kriberry/app/context/__init__.pyPK!wZ88riberry/app/context/artifact.pyPK! = riberry/app/context/current.pyPK!_ln %riberry/app/context/event_registry.pyPK!hPlj$riberry/app/context/external_task.pyPK!PFXffriberry/app/context/flow.pyPK!;&zK K $friberry/app/context/input_mapping.pyPK!x+ riberry/app/context/report.pyPK!Ӓ"riberry/app/context/shared_data.pyPK!qvt#riberry/app/env.pyPK!2(riberry/app/misc/__init__.pyPK!.9 )riberry/app/misc/signals.pyPK!}Snn6riberry/app/tasks.pyPK!~@::=riberry/app/util/__init__.pyPK! a=riberry/app/util/events.pyPK!)=#zz%M@riberry/app/util/execution_tracker.pyPK!ְ({{ Iriberry/app/util/misc.pyPK!~볎Jriberry/app/util/redis_lock.pyPK!09TPriberry/celery/__init__.pyPK!%Priberry/celery/background/__init__.pyPK!  ,QVriberry/celery/background/capacity_config.pyPK!,oriberry/celery/background/events/__init__.pyPK!`8 44*oriberry/celery/background/events/events.pyPK!O}""riberry/celery/background/tasks.pyPK!vPD/D/!Triberry/celery/client/__init__.pyPK!4## riberry/celery/client/control.pyPK!f3 )8riberry/celery/client/dynamic/__init__.pyPK!~ lG G + riberry/celery/client/dynamic/parameters.pyPK!qu%riberry/celery/client/dynamic/util.pyPK!*dOmm'riberry/celery/client/scale/__init__.pyPK!wAn  9riberry/celery/client/signals.pyPK!88|(riberry/celery/client/tasks.pyPK!]gu,, =riberry/celery/client/tracker.pyPK!PZEriberry/celery/client/wf.pyPK![ǢE%%@`riberry/celery/util.pyPK!P?jjariberry/config.pyPK!0dm2~riberry/exc.pyPK!O]riberry/model/__init__.pyPK!f,۵,,%Iriberry/model/application/__init__.pyPK!& Ariberry/model/auth/__init__.pyPK!n@b riberry/model/base.pyPK!8w9 riberry/model/group/__init__.pyPK!*y#riberry/model/interface/__init__.pyPK!Z$KKriberry/model/job/__init__.pyPK!'zOriberry/model/misc/__init__.pyPK!A driberry/plugins/__init__.pyPK!d''$eriberry/plugins/defaults/__init__.pyPK!h^<<*bfriberry/plugins/defaults/authentication.pyPK!KB^$kriberry/plugins/defaults/policies.pyPK! riberry/plugins/interfaces.pyPK!j;ͬriberry/policy/__init__.pyPK! X/riberry/policy/engine.pyPK!?iMtriberry/policy/helpers.pyPK!Gdriberry/services/__init__.pyPK!0lriberry/services/application.pyPK!<0(riberry/services/application_instance.pyPK!,Tbbriberry/services/auth.pyPK!M riberry/services/form.pyPK!F UÌriberry/services/job.pyPK!JJ"Vriberry/services/job_executions.pyPK!緔riberry/services/policy.pyPK! riberry/services/self.pyPK!riberry/util/__init__.pyPK!_ _ riberry/util/__main__.pyPK!W@xxriberry/util/common.pyPK!Pb"?"?` riberry/util/config_importer.pyPK!+ ]==Iriberry/util/groups.pyPK!Ȟ660Nriberry/util/user.pyPK!((Priberry-0.9.2.dist-info/LICENSEPK!HڽTUTriberry-0.9.2.dist-info/WHEELPK!H s Uriberry-0.9.2.dist-info/METADATAPK!H3jL#Wriberry-0.9.2.dist-info/RECORDPKffKi