PK!!דriberry/__init__.pyimport logging from riberry import log, config, plugins, model, celery, policy, services, exc, app __version__ = '0.10.14' log.root_name = __name__ log.logger = logging.getLogger(log.root_name) log.init() config.config.enable() config.config.authentication.enable() PK!driberry/app/__init__.pyfrom .env import current_context, current_riberry_app from .base import RiberryApplication, RiberryApplicationConfig from . import actions, context, tasks, util, addons, backends PK!WWriberry/app/actions/__init__.pyfrom . import artifacts, executions, notify, jobs, reports, shared_data, external_task PK!f;U riberry/app/actions/artifacts.pyimport json import sys import traceback from typing import Union, Optional import riberry from riberry.exc import BaseError from riberry.model.job import ArtifactType from .. import current_context as context from ..util.events import create_event def create_artifact(filename: str, content: Union[bytes, str], name: str = None, type: Union[str, ArtifactType] = ArtifactType.output, category='Default', data: dict = None, stream: str = None, task_id: str = None, root_id: str = None): task_id = task_id or context.current.task_id root_id = root_id or context.current.root_id stream = stream or context.current.stream if name is None: name = filename if content is not None and not isinstance(content, (bytes, str)): content = json.dumps(content) if isinstance(content, str): content = content.encode() if isinstance(type, ArtifactType): type = type.value try: ArtifactType(type) except ValueError as exc: raise ValueError(f'ArtifactType enum has no value {type!r}.' f'Supported types: {", ".join(ArtifactType.__members__)}') from exc create_event( 'artifact', root_id=root_id, task_id=task_id, data={ 'name': str(name), 'type': str(type), 'category': str(category), 'data': data if isinstance(data, dict) else {}, 'stream': str(stream) if stream else None, 'filename': str(filename), }, binary=content ) def create_artifact_from_traceback( name: Optional[str] = None, filename: Optional[str] = None, category: str = 'Intercepted', type: riberry.model.job.ArtifactType = riberry.model.job.ArtifactType.error, ): exc_type, exc, tb = sys.exc_info() if not exc_type: return if isinstance(exc, BaseError): error_content = f'{traceback.format_exc()}\n\n{"-" * 32}\n\n{json.dumps(exc.output(), indent=2)}'.encode() else: error_content = traceback.format_exc().encode() create_artifact( name=name if name else f'Exception {context.current.task_name}', type=type, category=category, data={ 'Error Type': exc.__class__.__name__, 'Error Message': str(exc), }, filename=filename if filename else f'{context.current.task_name}-{context.current.task_id}.log', content=error_content ) PK!yg' !riberry/app/actions/executions.pyimport traceback import uuid import pendulum import riberry from . import notify from .. import current_riberry_app from ..util import execution_tracker as tracker from ..util.events import create_event log = riberry.log.make(__name__) def queue_job_execution(execution: riberry.model.job.JobExecution, track_executions: bool = True): job = execution.job form = job.form try: execution.status = 'READY' execution.task_id = str(uuid.uuid4()) if track_executions: tracker.start_tracking_execution(root_id=execution.task_id) riberry.model.conn.commit() execution_task_id = current_riberry_app.start( execution_id=execution.id, root_id=execution.task_id, form=form.internal_name, ) except: execution.status = 'FAILURE' message = traceback.format_exc().encode() execution.artifacts.append( riberry.model.job.JobExecutionArtifact( job_execution=execution, name='Error on Startup', type='error', category='Fatal', filename='startup-error.log', size=len(message), binary=riberry.model.job.JobExecutionArtifactBinary( binary=message ) ) ) riberry.model.conn.commit() raise else: return execution_task_id def execution_complete(task_id, root_id, status, stream, context): job: riberry.model.job.JobExecution = riberry.model.job.JobExecution.query().filter_by(task_id=root_id).first() if not job: return if context: with context.scope(root_id=root_id, task_id=root_id, task_name=None, stream=None, step=None, category=None): try: context.event_registry.call( event_type=context.event_registry.types.on_completion, status=status, ) except: log.exception('Error occurred while triggering on_completion event.') riberry.model.conn.rollback() job.task_id = root_id job.status = status job.completed = job.updated = pendulum.DateTime.utcnow() if not job.started: job.started = pendulum.DateTime.utcnow() if stream is None: stream = riberry.model.job.JobExecutionStream.query().filter_by(task_id=root_id).first() if stream is not None: stream = stream.name if stream is not None: create_event( name='stream', root_id=root_id, task_id=root_id, data={ 'stream': stream, 'state': status } ) riberry.model.conn.commit() notify.workflow_complete( task_id=task_id, root_id=root_id, status=status, ) def execution_started(task_id, root_id, job_id, primary_stream): job: riberry.model.job.JobExecution = riberry.model.job.JobExecution.query().filter_by( id=job_id, ).one() job.started = job.updated = pendulum.DateTime.utcnow() job.status = 'ACTIVE' job.task_id = root_id riberry.model.conn.commit() create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': primary_stream, 'state': 'ACTIVE', } ) notify.workflow_started(task_id=task_id, root_id=root_id) PK!<}J$riberry/app/actions/external_task.pyimport json from typing import Optional import uuid import riberry def create_external_task( job_execution: riberry.model.job.JobExecution, name: str = None, type: str = 'external', external_task_id: Optional[str] = None, input_data: Optional[bytes] = None ): if external_task_id is None: external_task_id = str(uuid.uuid4()) if name is None: name = external_task_id if isinstance(input_data, str): input_data = input_data.encode() if input_data is not None and not isinstance(input_data, bytes): input_data = json.dumps(input_data).encode() external_task = riberry.model.job.JobExecutionExternalTask( job_execution=job_execution, stream_id=None, task_id=external_task_id, name=name, type=type, input_data=input_data, ) riberry.model.conn.add(external_task) riberry.model.conn.commit() return external_task def mark_as_ready(external_task_id, output_data): task: riberry.model.job.JobExecutionExternalTask = riberry.model.job.JobExecutionExternalTask.query().filter_by( task_id=external_task_id, ).one() if isinstance(output_data, str): output_data = output_data.encode() if output_data is not None and not isinstance(output_data, bytes): output_data = json.dumps(output_data).encode() task.status = 'READY' task.output_data = output_data riberry.model.conn.commit() PK!lnnriberry/app/actions/jobs.pyimport riberry from ..env import current_context def create_job(form_name, job_name=None, input_values=None, input_files=None, owner=None, execute=True): form: riberry.model.interface.Form = riberry.model.interface.Form.query().filter_by( internal_name=form_name, ).first() job_execution = current_context.current.job_execution job_execution_user = owner if owner else job_execution.creator with riberry.services.policy.policy_scope(user=job_execution_user): job = riberry.services.job.create_job( form_id=form.id, name=job_name or f'Via {job_execution.job.name} / #{job_execution.id}', input_values=input_values or {}, input_files=input_files or {}, execute=execute, parent_execution=job_execution, ) riberry.model.conn.commit() return job PK!qqriberry/app/actions/notify.pyfrom typing import Optional, List from .. import current_context as context from ..util.events import create_event def notify(notification_type, data=None, task_id=None, root_id=None): task_id = task_id or context.current.task_id root_id = root_id or context.current.root_id create_event( 'notify', root_id=root_id, task_id=task_id, data={ 'type': notification_type, 'data': data or {} }, binary=None, ) def workflow_complete(task_id: str, root_id: str, status: str): notify( notification_type='workflow_complete', data=dict(status=status), task_id=task_id, root_id=root_id, ) def workflow_started(task_id: str, root_id: str): notify( notification_type='workflow_started', data=None, task_id=task_id, root_id=root_id, ) def send_email( subject: str, body: str, mime_type: Optional[str] = None, sender: Optional[str] = None, receivers: Optional[List[str]] = None ): if isinstance(receivers, str): receivers = [receivers] notify( notification_type='custom-email', data={ 'subject': subject, 'mime_type': mime_type, 'body': body, 'from': sender, 'to': receivers or [], } ) PK!v9riberry/app/actions/reports.pyimport riberry from typing import List from .. import env def update_all_reports(app_instance=None): app_instance = app_instance if app_instance else env.get_instance_model() reports: List[riberry.model.job.JobExecutionReport] = riberry.model.job.JobExecutionReport.query().filter( riberry.model.job.JobExecutionReport.marked_for_refresh == True, ).join( riberry.model.job.JobExecution ).join( riberry.model.job.Job ).filter_by( instance=app_instance ).all() cxt = env.current_context for report in reports: root_id = report.job_execution.task_id with cxt.scope(root_id=root_id, task_id=root_id, task_name=None, stream=None, step=None, category=None): cxt.event_registry.call(event_type=cxt.event_registry.types.on_report_refresh, report=report.name) if report.marked_for_refresh: report.marked_for_refresh = False riberry.model.conn.commit() PK!44"riberry/app/actions/shared_data.pyimport riberry from typing import List from .. import env def update_all_data_items(app_instance=None): app_instance = app_instance if app_instance else env.get_instance_model() data_items: List[riberry.model.misc.ResourceData] = riberry.model.misc.ResourceData.query().filter( riberry.model.misc.ResourceData.marked_for_refresh == True, riberry.model.misc.ResourceData.resource_type == riberry.model.misc.ResourceType.job_execution, ).join( riberry.model.job.JobExecution, riberry.model.misc.ResourceData.resource_id == riberry.model.job.JobExecution.id, ).join( riberry.model.job.Job ).filter_by( instance=app_instance ).all() cxt = env.current_context for item in data_items: root_id = riberry.model.job.JobExecution.query().filter_by(id=item.resource_id).one().task_id with cxt.scope(root_id=root_id, task_id=root_id, task_name=None, stream=None, step=None, category=None): cxt.event_registry.call(event_type=cxt.event_registry.types.on_data_updated, data_name=item.name) cxt.event_registry.call(event_type=cxt.event_registry.types.on_report_refresh, data_name=item.name) if item.marked_for_refresh: item.marked_for_refresh = False riberry.model.conn.commit() PK!riberry/app/addons/__init__.pyfrom.base import Addon PK!{riberry/app/addons/base.pyimport riberry class Addon: def register(self, riberry_app: 'riberry.app.base.RiberryApplication'): raise NotImplementedError PK!Š,, riberry/app/backends/__init__.pyfrom .base import RiberryApplicationBackend PK!Mg__riberry/app/backends/base.pyimport functools import riberry from typing import Dict, AnyStr class RiberryApplicationBackend: default_stream_name = 'Overall' default_step_name = 'Entry' def __init__(self, instance): self.instance = instance def task(self, func=None, **options): if callable(func): return self.register_task(func=func, **options) else: return functools.partial(self.task, **options) def initialize(self): raise NotImplementedError def default_addons(self) -> Dict[AnyStr, 'riberry.app.addons.Addon']: raise NotImplementedError def register_task(self, func, **options): raise NotImplementedError def task_by_name(self, name: AnyStr): raise NotImplementedError def start_execution(self, execution_id, root_id, entry_point) -> AnyStr: raise NotImplementedError def create_receiver_task(self, external_task_id, validator): raise NotImplementedError def active_task(self): raise NotImplementedError def __getattr__(self, item): return getattr(self.instance, item) PK!%riberry/app/backends/impl/__init__.pyPK!qIi<<,riberry/app/backends/impl/celery/__init__.pyfrom .base import CeleryBackend from . import addons, patch PK!3riberry/app/backends/impl/celery/addons/__init__.pyfrom .background import BackgroundTasks from .capacity import Capacity from .external_task_receiver import ExternalTaskReceiver from .scale import Scale PK!gЂ5riberry/app/backends/impl/celery/addons/background.pyfrom .base import AddonStartStopStep import riberry class BackgroundTasks(riberry.app.addons.Addon): def register(self, riberry_app: 'riberry.app.base.RiberryApplication'): class ConcreteBackgroundTasksStep(BackgroundTasksStep): rib = riberry_app riberry_app.backend.steps['worker'].add(ConcreteBackgroundTasksStep) class BackgroundTasksStep(AddonStartStopStep): requires = {'celery.worker.components:Timer'} def __init__(self, worker, **_): super().__init__(worker=worker, interval=1.0) self.lock = riberry.app.util.redis_lock.RedisLock(name='step:background', on_acquired=self.on_lock_acquired, interval=900) @staticmethod def on_lock_acquired(): try: riberry.app.tasks.echo() riberry.app.tasks.poll() riberry.app.tasks.refresh() finally: riberry.model.conn.remove() riberry.model.conn.raw_engine.dispose() def should_run(self) -> bool: return True def run(self): redis_instance = riberry.celery.util.celery_redis_instance() self.lock.run(redis_instance=redis_instance) PK!0\/riberry/app/backends/impl/celery/addons/base.pyimport time from celery import bootsteps from redis import RedisError import riberry from celery.utils.log import logger as log class AddonStartStopStep(bootsteps.StartStopStep): requires = {'celery.worker.components:Timer'} rib: 'riberry.app.base.RiberryApplication' def __init__(self, worker, interval, priority=10, **kwargs): super().__init__(worker, interval=interval, priority=priority, **kwargs) self._timer = None self.interval = interval self.priority = priority self._worker = worker @property def worker(self): return self._worker @property def worker_state(self): return self.worker.state @property def consumer(self): return self.worker.consumer def start(self, worker): if self.should_run(): self._timer = worker.timer.call_repeatedly( self.interval, self.__execute, priority=self.priority, ) def stop(self, worker): if self._timer is not None: self._timer.cancel() self._timer = None def __execute(self): start_time = time.time() try: log.debug(f'Started {self.step_name}') riberry.model.conn.raw_engine.dispose() with riberry.model.conn: self.run() except RedisError: log.exception(f'Encountered redis exception while executing {self.step_name}') finally: log.debug(f'Completed {self.step_name} in {time.time() - start_time:2} seconds') def should_run(self) -> bool: raise NotImplementedError def run(self): raise NotImplementedError @property def step_name(self): return type(self).__name__ PK!Y*4 4 <riberry/app/backends/impl/celery/addons/capacity/__init__.pyfrom sqlalchemy.util.compat import contextmanager from ..base import AddonStartStopStep from .priority_queue import PriorityQueue import riberry from celery.utils.log import logger as log class Capacity(riberry.app.addons.Addon): def __init__(self, parameter='capacity', key=None, sep='|', queue_cls=PriorityQueue, blocking: bool = True, block_retry: int = 0.5, r=None): self.parameter = parameter self.r = r or riberry.celery.util.celery_redis_instance() self.sep = sep self.queue = queue_cls(r=self.r, key=key, blocking=blocking, block_retry=block_retry) @property def last_value_key(self): return self.queue.make_key(self.queue.key, 'raw') @property def last_value(self): last_value = self.r.get(self.last_value_key) return last_value.decode() if isinstance(last_value, bytes) else None @last_value.setter def last_value(self, value): self.r.set(self.last_value_key, value=value) @contextmanager def borrow(self): member, score, version = self.queue.pop() try: yield member, score, version finally: self.queue.put(member=member, version=version) def register(self, riberry_app: 'riberry.app.base.RiberryApplication'): class ConcreteCapacityStep(CapacityStep): rib = riberry_app capacity = self if self.queue.key is None: self.queue.key = riberry_app.context.current.riberry_app_instance.internal_name riberry_app.backend.steps['worker'].add(ConcreteCapacityStep) class CapacityStep(AddonStartStopStep): capacity: Capacity def __init__(self, worker, **_): super().__init__(worker=worker, interval=1.0) self.lock = riberry.app.util.redis_lock.RedisLock(name='step:capacity', on_acquired=self.on_lock_acquired, interval=900) def should_run(self) -> bool: return True def on_lock_acquired(self): value = self.rib.context.current.riberry_app_instance.active_schedule_value(name=self.capacity.parameter) or '' if self.capacity.last_value is not None and value == self.capacity.last_value: log.warn(f'DynamicPriorityParameter: ({self.capacity.queue.free_key}) is unchanged') return self.capacity.last_value = value values = [ part.split(self.capacity.sep) if self.capacity.sep in part else (part, 0) for part in value.split(' ') ] member_scores = {k: int(v) for k, v in values} self.capacity.queue.update(member_scores) log.warn(f'DynamicPriorityParameter: ({self.capacity.queue.free_key}) updated {self.capacity.parameter} queue with {value!r}') def run(self): redis_instance = riberry.celery.util.celery_redis_instance() self.lock.run(redis_instance=redis_instance) PK!Tl4Briberry/app/backends/impl/celery/addons/capacity/priority_queue.pyimport time import redis import functools from celery.utils.log import logger as log class PriorityQueue: def __init__( self, r: redis.Redis, key: str, prefix: str='pq', sep: str=':', blocking: bool=True, block_retry: int=0.5): self.r: redis.Redis = r self.key = key self.prefix = prefix self.sep = sep self.blocking = blocking self.block_retry = block_retry def make_key(self, *args): sub_key = self.sep.join(map(str, args)) return f'{self.prefix}{self.sep}{sub_key}' @property def version(self): return int(self.r.get(self.version_key) or 0) @version.setter def version(self, value): self.r.set(self.version_key, value=value) @property def version_key(self): return self.make_key(self.key, 'counter') @property def free_key(self): return self.generate_free_key(version=self.version) @property def lease_key(self): return self.generate_lease_key(version=self.version) def generate_free_key(self, version): return self.make_key(self.key, f'{version:09}', 'free') def generate_lease_key(self, version): return self.make_key(self.key, f'{version:09}', 'lease') def pop(self): while True: version = self.version result = self.r.transaction( functools.partial(self.pop_transaction, version=version), self.generate_free_key(version=version), self.generate_lease_key(version=version), value_from_callable=True ) if not result and self.blocking: log.warn(f'PriorityQueue: ({self.free_key}) encountered blank key, ' f'retrying after {self.block_retry} seconds.') time.sleep(self.block_retry) continue else: break return result, self.r.zscore(self.generate_free_key(version=version), result), version def pop_transaction(self, pipe: redis.client.Pipeline, version): free_key, lease_key = self.generate_free_key(version=version), self.generate_lease_key(version=version) [(member, score)] = pipe.zrevrange(free_key, 0, 0, withscores=True) member = member.decode() pipe.multi() pipe.zincrby(free_key, value=member, amount=-1) pipe.zincrby(lease_key, value=member, amount=1) return member def put(self, member, version): self.r.transaction( functools.partial(self.put_transaction, member=member, version=version), self.generate_free_key(version=version), self.generate_lease_key(version=version), ) def put_transaction(self, pipe: redis.client.Pipeline, member, version): free_key, lease_key = self.generate_free_key(version=version), self.generate_lease_key(version=version) pipe.multi() pipe.zincrby(free_key, value=member, amount=1) pipe.zincrby(lease_key, value=member, amount=-1) def update(self, member_scores: dict): func = functools.partial(self.update_transaction, member_scores=member_scores) self.version = self.r.transaction(func, value_from_callable=True) def update_transaction(self, pipe: redis.client.Pipeline, member_scores): version = self.version + 1 pipe.multi() pipe.zadd(self.generate_free_key(version=version), mapping=member_scores) return version def items(self): return [(k.decode(), v) for k, v in self.r.zrevrange(self.free_key, 0, -1, withscores=True)] def leased_items(self): return [(k.decode(), v) for k, v in self.r.zrevrange(self.lease_key, 0, -1, withscores=True)] def clear(self): self.r.delete(self.free_key, self.lease_key) def __iter__(self): return self def __next__(self): try: return self.pop() except ValueError: raise StopIteration PK!Óx3 3 Ariberry/app/backends/impl/celery/addons/external_task_receiver.pyfrom .base import AddonStartStopStep import riberry from riberry.app.backends.impl import celery as celery_backend class ExternalTaskReceiver(riberry.app.addons.Addon): RECEIVER_QUEUE = 'rib.external' def register(self, riberry_app: 'riberry.app.base.RiberryApplication'): class ConcreteExternalTaskReceiverStep(ExternalTaskReceiverStep): rib = riberry_app riberry_app.backend.steps['worker'].add(ConcreteExternalTaskReceiverStep) riberry_app.backend.user_options['worker'].add(self.regiser_user_options) task_routes = { celery_backend.CeleryBackend.CHECK_EXTERNAL_TASK_NAME: {'queue': self.RECEIVER_QUEUE}, } if not riberry_app.backend.conf.task_routes: riberry_app.backend.conf.task_routes = {} riberry_app.backend.conf.task_routes.update(task_routes) for addon in riberry_app.addons.values(): if isinstance(addon, celery_backend.addons.Scale): addon.conf.ignore_queues.add(self.RECEIVER_QUEUE) @staticmethod def regiser_user_options(parser): parser.add_argument( '--rib-receiver', action='store_true', default=False, help='Receiver of external Riberry tasks.', ) class ExternalTaskReceiverStep(AddonStartStopStep): def __init__(self, worker, rib_receiver, **_): super().__init__(worker=worker, interval=1) self._is_receiver = bool(rib_receiver) def should_run(self) -> bool: return self._is_receiver def run(self): active = riberry.model.job.JobExecution.query().filter_by( status='ACTIVE' ).join(riberry.model.job.Job).filter_by( instance=self.rib.context.current.riberry_app_instance, ).join(riberry.model.job.JobExecutionExternalTask).filter_by( status='READY' ).count() operation = 'add' if active else 'cancel' consumer = self.worker.consumer queues = {q.name for q in consumer.task_consumer.queues} if operation == 'add' and ExternalTaskReceiver.RECEIVER_QUEUE not in queues: consumer.add_task_queue(ExternalTaskReceiver.RECEIVER_QUEUE) if operation == 'cancel' and ExternalTaskReceiver.RECEIVER_QUEUE in queues: consumer.cancel_task_queue(ExternalTaskReceiver.RECEIVER_QUEUE) PK!ݨJ..0riberry/app/backends/impl/celery/addons/scale.pyimport math import riberry from .base import AddonStartStopStep from celery.utils.log import logger as log class Scale(riberry.app.addons.Addon): def __init__( self, active_parameter='active', concurrency_parameter='concurrency', minimum_concurrency=None, maximum_concurrency=None, check_queues=None, ): self.conf = ScaleConfiguration( active_parameter=active_parameter, concurrency_parameter=concurrency_parameter, minimum_concurrency=minimum_concurrency, maximum_concurrency=maximum_concurrency, check_queues=check_queues, ) self.active_parameter = active_parameter self.concurrency_parameter = concurrency_parameter self.minimum_concurrency = minimum_concurrency self.maximum_concurrency = maximum_concurrency def register(self, riberry_app): class ConcreteScaleStep(ScaleStep): conf = self.conf rib = riberry_app riberry_app.backend.steps['worker'].add(ConcreteScaleStep) riberry_app.backend.user_options['worker'].add(self.regiser_user_options) @staticmethod def regiser_user_options(parser): parser.add_argument( '--rib-scale', action='store_true', default=False, help='Scale concurrency depending on activity (disabled by default)', ) parser.add_argument( '--rib-scale-parameter', help='The name of the application instance parameter which stores the concurrency value', ) parser.add_argument( '--rib-scale-group', default='default', help='Concurrency is distributed amongst all active workers in the same group', ) parser.add_argument( '--rib-scale-max', default=None, help='Maximum concurrency when auto-scaling (uncapped by default)', ) parser.add_argument( '--rib-scale-min', default=None, help='Minimum concurrency when auto-scaling (zero by default)', ) feature_parser = parser.add_mutually_exclusive_group(required=False) feature_parser.add_argument( '--rib-scale-check-queues', dest='rib_scale_check_queues', action='store_true', help='Scale down if queues are empty and there are no active tasks' ) feature_parser.add_argument( '--rib-scale-ignore-queues', dest='rib_scale_check_queues', action='store_false', help='Do not scale down if queues are empty and there are no active tasks' ) parser.set_defaults(rib_scale_check_queues=True) class ScaleConfiguration: def __init__( self, scale_concurrency=False, scale_group='default', active_parameter='active', concurrency_parameter='concurrency', minimum_concurrency=None, maximum_concurrency=None, check_queues=None, ): self.active_parameter = active_parameter self.concurrency_parameter = concurrency_parameter self.scale = scale_concurrency self.scale_group = scale_group self.minimum_concurrency = minimum_concurrency self.maximum_concurrency = maximum_concurrency self.check_queues = check_queues self.ignore_queues = set() class ScaleStep(AddonStartStopStep): conf: ScaleConfiguration def __init__(self, worker, rib_scale, rib_scale_group, rib_scale_parameter, rib_scale_min, rib_scale_max, rib_scale_check_queues, **_): super().__init__(worker=worker, interval=1.0) self.conf.scale = bool(rib_scale) self.conf.scale_group = rib_scale_group or self.conf.scale_group self.conf.concurrency_parameter = rib_scale_parameter or self.conf.concurrency_parameter self.conf.minimum_concurrency = int(rib_scale_min) if rib_scale_min is not None else self.conf.minimum_concurrency self.conf.maximum_concurrency = int(rib_scale_max) if rib_scale_max is not None else self.conf.maximum_concurrency self.conf.check_queues = bool(rib_scale_check_queues) if rib_scale_check_queues is not None else self.conf.check_queues self.lock = riberry.app.util.redis_lock.RedisLock(name=f'step:scale:{self.conf.scale_group}', on_acquired=self.on_lock_acquired, interval=5000) self.queues = set() self.target_concurrency = None self.initial_concurrency = None self.is_active = False self._worker_uuid = self.rib.context.current.WORKER_UUID self._instance_name = self.rib.context.current.riberry_app_instance.internal_name self.idle_counter = 0 def should_run(self) -> bool: return True def run(self): redis_instance = riberry.celery.util.celery_redis_instance() self.lock.run(redis_instance=redis_instance) if not self.consumer.task_consumer: return self.update(redis_instance=redis_instance) self.scale() def on_lock_acquired(self): r = riberry.celery.util.celery_redis_instance() epoch, _ = r.time() occurrence = set(r.zrevrangebyscore(name=self.scale_groups_log_key, max=epoch, min=epoch - 60)) if occurrence: r.delete(self.scale_groups_active_temp_key) r.sadd(self.scale_groups_active_temp_key, *occurrence) r.rename(src=self.scale_groups_active_temp_key, dst=self.scale_groups_active_key) @classmethod def _tasks_available(cls, r, state, queues): return bool( len(state.reserved_requests) or len(state.active_requests) or state.requests or not cls._queues_empty(r, queues=queues) ) @staticmethod def _queues_empty(r, queues): for queue_name in queues: for queue in r.scan_iter(f'{queue_name}*'): try: queue_length = r.llen(queue) log.debug(f'ScaleStep:: Queue length of {queue.decode()!r} is {queue_length}') if queue_length: return False except: continue return True def report(self, r): epoch, _ = r.time() r.zadd(name=self.scale_groups_log_key, mapping={self._worker_uuid: epoch}) @property def scale_groups_active_key(self): return f'{self._instance_name}:scale-groups:{self.conf.scale_group}:active' @property def scale_groups_log_key(self): return f'{self._instance_name}:scale-groups:{self.conf.scale_group}:log' @property def scale_groups_active_temp_key(self): return f'{self._instance_name}:scale-groups:{self.conf.scale_group}:active-temp' def update(self, redis_instance): self.queues.update({q.name for q in self.worker.consumer.task_consumer.queues}) if not self.initial_concurrency: self.initial_concurrency = self.worker.consumer.pool.num_processes instance = self.rib.context.current.riberry_app_instance active_flag = instance.active_schedule_value(name=self.conf.active_parameter, default='Y') == 'Y' tasks_available = self._tasks_available(r=redis_instance, state=self.worker_state, queues=self.queues - self.conf.ignore_queues) if tasks_available: self.report(r=redis_instance) self.is_active = active_flag and (tasks_available if self.conf.check_queues else True) if not self.is_active: if self.idle_counter > 10 or self.target_concurrency is None: self.target_concurrency = 0 self.idle_counter += 1 return self.idle_counter = 0 scale_group = list(sorted(b.decode() for b in redis_instance.smembers(self.scale_groups_active_key))) if self.conf.scale and self.conf.concurrency_parameter: target_concurrency = instance.active_schedule_value(name=self.conf.concurrency_parameter, default=None) if target_concurrency is None: target_concurrency = self.initial_concurrency else: target_concurrency = int(target_concurrency) target_concurrency *= (1 / len(scale_group)) if self._worker_uuid in scale_group else 0 if target_concurrency: if scale_group.index(self._worker_uuid) == 0: target_concurrency = math.ceil(target_concurrency) else: target_concurrency = math.floor(target_concurrency) else: target_concurrency = self.initial_concurrency if self.conf.maximum_concurrency is not None: target_concurrency = min(target_concurrency, self.conf.maximum_concurrency) if self.conf.minimum_concurrency is not None: target_concurrency = max(target_concurrency, self.conf.minimum_concurrency) target_concurrency = int(target_concurrency) if target_concurrency != self.target_concurrency: self.target_concurrency = target_concurrency def scale(self): actual_concurrency = self.worker.consumer.pool.num_processes target_concurrency = self.target_concurrency scale_group = list(sorted(b.decode() for b in riberry.celery.util.celery_redis_instance().smembers(self.scale_groups_active_key))) log.debug(f'A: {self.is_active} C[T]: {self.target_concurrency}, C[A]: {actual_concurrency}, P: {self.worker.consumer.qos.value}, M: {scale_group}') if target_concurrency == 0: for queue in list(self.consumer.task_consumer.queues): if queue.name not in self.conf.ignore_queues: self.consumer.cancel_task_queue(queue) else: queue_names = {q.name for q in self.consumer.task_consumer.queues} for queue in self.queues: if queue not in queue_names and queue not in self.conf.ignore_queues: self.consumer.add_task_queue(queue) if target_concurrency > actual_concurrency: if actual_concurrency == 0: self.worker.consumer.pool.grow(1) else: self.worker.consumer.pool.grow(min(target_concurrency - actual_concurrency, 8)) log.info(f'ScaleStep:: Scaled concurrency up to {self.worker.consumer.pool.num_processes} concurrency (target: {self.target_concurrency}, prefetch: {self.worker.consumer.qos.value})') elif actual_concurrency > target_concurrency: self.worker.consumer.qos.decrement_eventually(n=1) self.worker.consumer.qos.update() self.worker.consumer.pool.shrink(min(actual_concurrency - target_concurrency, 8)) log.info(f'ScaleStep:: Scaled concurrency down to {self.worker.consumer.pool.num_processes} concurrency (target: {self.target_concurrency}, prefetch: {self.worker.consumer.qos.value})') prefetch_target = self.worker.consumer.pool.num_processes * self.worker.consumer.prefetch_multiplier prefetch_difference = prefetch_target - self.worker.consumer.qos.value if prefetch_difference > 0: self.worker.consumer.qos.increment_eventually(n=abs(prefetch_difference)) self.worker.consumer.qos.update() log.info(f'ScaleStep:: Scaled prefetch up to {self.worker.consumer.qos.value} (+{prefetch_difference})') elif prefetch_difference < 0 and self.worker.consumer.qos.value != 1: self.worker.consumer.qos.decrement_eventually(n=abs(prefetch_difference)) self.worker.consumer.qos.update() log.info(f'ScaleStep:: Scaled prefetch down to {self.worker.consumer.qos.value} ({prefetch_difference})') PK!m$ $ (riberry/app/backends/impl/celery/base.pyfrom typing import Dict, AnyStr import celery import riberry from . import patch, tasks, addons from .executor import TaskExecutor def send_task_process_rib_kwargs(self, *args, **kwargs): riberry_properties = {} if kwargs.get('kwargs'): for key, value in kwargs['kwargs'].items(): if key.startswith('__rib_'): riberry_properties[key.replace('__rib_', '', 1)] = value class CeleryBackend(riberry.app.backends.RiberryApplicationBackend): instance: celery.Celery ENTRY_POINT_TASK_NAME = 'riberry.core.app.entry_point' CHECK_EXTERNAL_TASK_NAME = 'riberry.core.app.check_external_task' def __init__(self, instance): super().__init__(instance=instance) self.executor = TaskExecutor() def initialize(self): patch.patch_send_task(instance=self.instance, func=send_task_process_rib_kwargs) # Register "entry point" task self.task( name=self.ENTRY_POINT_TASK_NAME, )(self.executor.entry_point_executor()) # Register "external task checker" task self.task( name=self.CHECK_EXTERNAL_TASK_NAME, max_retries=None, )(self.executor.external_task_executor()) def default_addons(self) -> Dict[AnyStr, 'riberry.app.addons.Addon']: return { 'scale': addons.Scale(), 'background': addons.BackgroundTasks(), 'external-receiver': addons.ExternalTaskReceiver(), } def register_task(self, func, **options) -> celery.Task: wrapped_func, options = self.executor.riberry_task_executor_wrapper(func=func, task_options=options) return self.instance.task(**options)(wrapped_func) def task_by_name(self, name: AnyStr): return self.instance.tasks[name] def start_execution(self, execution_id, root_id, entry_point) -> AnyStr: task = self.task_by_name(self.ENTRY_POINT_TASK_NAME) task_signature = task.si(execution_id=execution_id, form=entry_point.form) callback_success = tasks.execution_complete.si(status='SUCCESS', stream=entry_point.stream) callback_failure = tasks.execution_complete.si(status='FAILURE', stream=entry_point.stream) task_signature.options['root_id'] = root_id callback_success.options['root_id'] = root_id callback_failure.options['root_id'] = root_id exec_signature = task_signature.on_error(callback_failure) | callback_success exec_signature.options['root_id'] = root_id riberry.app.util.events.create_event( name='stream', root_id=root_id, task_id=root_id, data={ 'stream': entry_point.stream, 'state': 'QUEUED', } ) return exec_signature.apply_async().id def create_receiver_task(self, external_task_id, validator): return self.task_by_name(self.CHECK_EXTERNAL_TASK_NAME).si( external_task_id=external_task_id, validator=validator, ) def active_task(self): return celery.current_task PK!+:ڦ,riberry/app/backends/impl/celery/executor.pyfrom celery import exceptions as celery_exc, current_task import riberry from riberry.app import actions from riberry.app.misc.signals import task_prerun, task_postrun from .extension import RiberryTask class ExecutionComplete(Exception): pass IGNORE_EXCEPTIONS = ( celery_exc.Retry, celery_exc.SoftTimeLimitExceeded, celery_exc.TimeLimitExceeded, ) def _retry_types(task_options): return tuple(list(IGNORE_EXCEPTIONS) + task_options.get('autoretry_for', [])) def _attempt_fallback(exc, task_options): fallback_on_error_provided, fallback_on_error = ( 'rib_fallback' in task_options, task_options.get('rib_fallback') ) if not fallback_on_error_provided: actions.artifacts.create_artifact_from_traceback(category='Fatal') raise exc try: result = fallback_on_error() if callable(fallback_on_error) else fallback_on_error actions.artifacts.create_artifact_from_traceback(category='Intercepted') return result except: actions.artifacts.create_artifact_from_traceback(category='Fatal (intercept failed)') raise exc class TaskExecutor: @property def riberry_app(self): return riberry.app.env.current_riberry_app def external_task_executor(self): def _external_task_executor(external_task_id, validator): external_task: riberry.model.job.JobExecutionExternalTask = riberry.model.job.JobExecutionExternalTask.query().filter_by( task_id=external_task_id, ).first() task: RiberryTask = self.riberry_app.context.current.task if external_task: if external_task.status == 'WAITING': raise task.retry(countdown=1) elif external_task.status == 'READY': output_data = external_task.output_data if isinstance(output_data, bytes): output_data = output_data.decode() outcomes = self.riberry_app.context.event_registry.call( event_type=self.riberry_app.context.event_registry.types.on_external_result_received, key=validator, kwargs=dict( external_task=external_task, result=output_data, ) ) if outcomes: assert len(outcomes) == 1, f'Multiple callbacks triggered for {validator}' outcome = outcomes[0] if outcome and outcome.retry: external_task.status = 'WAITING' external_task.input_data = outcome.input_data riberry.model.conn.commit() raise task.retry(countdown=1) external_task.status = 'COMPLETE' riberry.model.conn.commit() return output_data return _external_task_executor def entry_point_executor(self): def _entry_point_executor(execution_id, form: str): entry_point = self.riberry_app.entry_points[form] actions.executions.execution_started( task_id=self.riberry_app.context.current.task_id, root_id=self.riberry_app.context.current.root_id, job_id=execution_id, primary_stream=entry_point.stream, ) entry_point.func() return _entry_point_executor def riberry_task_executor(self, func, func_args, func_kwargs, task_options): riberry_properties = {} for key, value in list(func_kwargs.items()): if key.startswith('__rib_'): riberry_properties[key.replace('__rib_', '', 1)] = func_kwargs.pop(key) with riberry.model.conn: with self.riberry_app.context.scope( root_id=current_task.request.root_id, task_id=current_task.request.id, task_name=current_task.name, stream=riberry_properties.get('stream'), step=riberry_properties.get('step'), category=riberry_properties.get('category'), ): state = None mark_workflow_complete = False try: task_prerun(context=self.riberry_app.context, props=riberry_properties) result = self._execute_task(func, func_args, func_kwargs) state = 'SUCCESS' return result except ExecutionComplete: state = 'FAILURE' raise except celery_exc.Ignore: state = 'IGNORED' raise except Exception as exc: state = 'FAILURE' mark_workflow_complete = True if isinstance(exc, _retry_types(task_options=task_options)) and not self._max_retries_reached(exc): state = None mark_workflow_complete = False raise result = _attempt_fallback(exc, task_options=task_options) mark_workflow_complete = False state = 'SUCCESS' return result finally: if mark_workflow_complete: actions.executions.execution_complete( task_id=self.riberry_app.context.current.task_id, root_id=self.riberry_app.context.current.root_id, status=state, stream=None, context=self.riberry_app.context, ) if state is not None: task_postrun(context=self.riberry_app.context, props=riberry_properties, state=state) def _max_retries_reached(self, exc): active_task = self.riberry_app.context.current.task return bool( not isinstance(exc, celery_exc.Ignore) and active_task.max_retries is not None and active_task.request.retries >= active_task.max_retries ) def _execute_task(self, func, args, kwargs): job_execution = self.riberry_app.context.current.job_execution if job_execution.status in ('FAILURE', 'SUCCESS'): raise ExecutionComplete(f'Execution {job_execution!r} is already marked as complete') return func(*args, **kwargs) def riberry_task_executor_wrapper(self, func, task_options): def wrapped_function(*args, **kwargs): return self.riberry_task_executor( func=func, func_args=args, func_kwargs=kwargs, task_options=task_options, ) if 'name' not in task_options: task_options['name'] = riberry.app.util.misc.function_path(func=func) task_options['base'] = task_options.get('base') or RiberryTask return wrapped_function, task_options PK!fM,,-riberry/app/backends/impl/celery/extension.pyimport celery import riberry class RiberryTask(celery.Task): def signature(self, args=None, *starargs, **starkwargs): sig = celery.Task.signature(self, args, *starargs, **starkwargs) context = riberry.app.current_context stream = sig.get('kwargs', {}).get('__rib_stream') or context.flow.scoped_stream or context.current.stream if stream: sig['kwargs']['__rib_stream'] = stream if '__rib_step' not in sig['kwargs']: sig['kwargs']['__rib_step'] = self.name return sig PK!Yo??)riberry/app/backends/impl/celery/patch.pyimport types import celery def patch_send_task(instance: celery.Celery, func): send_task_original = instance.send_task def send_task(self, *args, **kwargs): func(self, *args, **kwargs) return send_task_original(*args, **kwargs) instance.send_task = types.MethodType(send_task, instance) PK!;yD)riberry/app/backends/impl/celery/tasks.pyimport riberry from celery import shared_task @shared_task(name='riberry.core.execution_complete', bind=True, ignore_result=True) def execution_complete(task, status, stream): with riberry.model.conn: return riberry.app.actions.executions.execution_complete( task_id=task.request.id, root_id=task.request.root_id, status=status, stream=stream, context=riberry.app.current_context, ) PK!(*riberry/app/backends/impl/pool/__init__.pyfrom . import log from .base import RiberryPoolBackend, entry_point from .log import configure as configure_logging from .exc import Defer PK!]̍11&riberry/app/backends/impl/pool/base.pyimport os import signal import threading from typing import AnyStr, Dict, Optional import riberry from riberry.app import RiberryApplication, current_context as cxt from . import tasks from .task_queue import TaskQueue, Task, TaskDefinition log = riberry.log.make(__name__) def entry_point(form_name, **kwargs): form: riberry.model.interface.Form = riberry.model.interface.Form.query().filter_by(internal_name=form_name).one() try: app: RiberryApplication = RiberryApplication.by_name(form.application.internal_name) except KeyError: RiberryApplication(name=form.application.internal_name, backend=RiberryPoolBackend()) app: RiberryApplication = RiberryApplication.by_name(form.application.internal_name) return app.entry_point(form_name, **kwargs) class RiberryPoolBackend(riberry.app.backends.RiberryApplicationBackend): _local = threading.local() def __init__(self): super().__init__(instance=None) self.task_queue: TaskQueue = TaskQueue(backend=self, limit=3) self.tasks = {} self._exit = threading.Event() self._threads = [] def _create_thread(self, thread_name, target): thread = threading.Thread(name=thread_name, target=target) self._threads.append(thread) return thread def start(self): signal.signal(signal.SIGTERM, self._stop_signal) signal.signal(signal.SIGINT, self._stop_signal) signal.signal(signal.SIGHUP, self._stop_signal) for thread in self._threads: thread.start() for thread in self._threads: thread.join() # noinspection PyUnusedLocal def _stop_signal(self, signum, frame): self.stop() def stop(self): log.info('Stopping app') self._exit.set() def initialize(self): self._create_thread('backend.executor', lambda: tasks.run_task( name='Task Executor', func=lambda: tasks.execution_listener(self.task_queue), interval=0, exit_event=self._exit, )) self._create_thread('backend.queue_external', lambda: tasks.run_task( name='External Task Receiver', func=lambda: tasks.queue_receiver_tasks(self.task_queue), interval=5, exit_event=self._exit, )) self._create_thread('backend.background', lambda: tasks.run_task( name='Background Operations', func=lambda: tasks.background(self.task_queue), interval=5, exit_event=self._exit, )) def default_addons(self) -> Dict[AnyStr, 'riberry.app.addons.Addon']: return {} def start_execution(self, execution_id, root_id, entry_point) -> AnyStr: self.task_queue.submit_entry_task( execution_id=execution_id, root_id=root_id, entry_point=entry_point ) return root_id def create_receiver_task(self, external_task_id, validator): cxt.data.set(f'external:{external_task_id}:validator', validator) def register_task(self, func, name=None, stream=None, step=None, **options): name = name or riberry.app.util.misc.function_path(func) assert name not in self.tasks, f'Multiple registrations for task {name!r}' assert 'after' in options and not self.external_task_callback(options['after']), ( f'"after" argument not supplied to task {name!r}' ) self.tasks[name] = TaskDefinition( func=func, name=name, stream=stream or self.default_stream_name, step=step or name, options=options, ) def task_by_name(self, name: AnyStr) -> TaskDefinition: return self.tasks[name] def external_task_callback(self, name: AnyStr) -> Optional[TaskDefinition]: for task in self.tasks.values(): if task.options.get('after') == name: return task return None def active_task(self): return getattr(self._local, 'task', None) def set_active_task(self, task: Task): self._local.task = task PK!>ΒB!!%riberry/app/backends/impl/pool/exc.pyclass Defer(Exception): pass PK! %riberry/app/backends/impl/pool/log.pyimport logging import sys import threading import riberry class Filter(logging.Filter): def filter(self, record): if riberry.app.current_context.current.root_id: record.root_id = f'root={riberry.app.current_context.current.root_id}' else: record.root_id = '-' if riberry.app.current_context.current.task_id: record.prefix = f'[id={riberry.app.current_context.current.task_id}] ' else: record.prefix = '' if riberry.app.current_context.current.step: record.context = f'{riberry.app.current_context.current.step}' elif threading.current_thread() == threading.main_thread(): record.context = 'backend.main' else: record.context = record.threadName return record def configure(log_level='ERROR'): handler = logging.StreamHandler(stream=sys.stdout) handler.setFormatter( logging.Formatter('%(levelname)-8s | %(asctime)-s | %(context)-41s | %(root_id)-41s | %(prefix)s%(message)s') ) handler.addFilter(Filter()) riberry.log.logger.addHandler(handler) riberry.log.logger.setLevel(log_level.upper()) PK!h<^??5riberry/app/backends/impl/pool/task_queue/__init__.pyfrom .base import Task, TaskDefinition, TaskQueue, TaskCounter PK!R 1riberry/app/backends/impl/pool/task_queue/base.pyimport threading import uuid from functools import wraps from queue import Queue, Full import riberry class TaskDefinition: def __init__(self, func, name, stream, step, options): self.func = func self.name = name self.stream = stream self.step = step self.options = options class Task: def __init__(self, task_id: str, execution_id: int, definition: TaskDefinition): self.id = task_id self.execution_id = execution_id self.definition = definition class TaskCounter: def __init__(self): self._value = 0 self._lock = threading.RLock() def increment(self): with self._lock: self._value += 1 return self._value def decrement(self): with self._lock: self._value -= 1 return self._value @property def lock(self): return self._lock @property def value(self): return self._value class TaskQueue: queue_cls = Queue def __init__(self, backend, queue=None, limit=None): self.backend = backend self.queue = queue or self.queue_cls() self.limit = limit self.counter = TaskCounter() @property def lock(self): return self.counter.lock def limit_reached(self): return bool(self.limit is not None and self.counter.value >= self.limit) def submit_receiver_task(self, external_task: riberry.model.job.JobExecutionExternalTask): self._submit(make_receiver_task(backend=self.backend, external_task=external_task)) def submit_entry_task(self, execution_id: int, root_id: str, entry_point: riberry.app.base.EntryPoint): self._submit(make_entry_task(execution_id=execution_id, root_id=root_id, entry_point=entry_point)) def _submit(self, task: Task): with self.lock: if self.limit_reached(): raise Full self.queue.put_nowait(task) self.counter.increment() def _make_external_task_wrapper(task_id, func): @wraps(func) def wrapper(): task = riberry.model.job.JobExecutionExternalTask.query().filter_by(id=task_id).one() task.status = 'COMPLETE' riberry.model.conn.commit() func(task.output_data) return wrapper def make_receiver_task(backend, external_task: riberry.model.job.JobExecutionExternalTask) -> Task: definition = backend.external_task_callback(external_task.name) return Task( task_id=str(uuid.uuid4()), execution_id=external_task.job_execution.id, definition=TaskDefinition( func=_make_external_task_wrapper(external_task.id, definition.func), name=definition.step, stream=definition.stream, step=definition.step, options=definition.options, ), ) def make_entry_task(execution_id: int, root_id: str, entry_point: riberry.app.base.EntryPoint): return Task( task_id=root_id, execution_id=execution_id, definition=TaskDefinition( func=entry_point.func, name=entry_point.step, stream=entry_point.stream, step=entry_point.step, options={}, ) ) PK!uNN0riberry/app/backends/impl/pool/tasks/__init__.pyfrom threading import Event import riberry from .background import background from .executor import execution_listener from .external_task_receiver import queue_receiver_tasks log = riberry.log.make(__name__) def run_task(name, func, interval, exit_event: Event): log.debug('Started task %s', name) while not exit_event.is_set(): try: func() except: log.exception('Error occurred while processing task %s', name) finally: if interval: exit_event.wait(interval) log.debug('Stopped task %s', name) PK!z52riberry/app/backends/impl/pool/tasks/background.pyimport logging import riberry from ..task_queue import TaskQueue log = logging.getLogger(__name__) def background(queue: TaskQueue): riberry.app.tasks.echo() with queue.lock: if not queue.limit_reached(): riberry.app.tasks.poll(track_executions=False, filter_func=lambda _: not queue.limit_reached()) else: log.debug('Queue limit reached, skipped task polling') riberry.app.tasks.refresh() PK!oX X 0riberry/app/backends/impl/pool/tasks/executor.pyimport threading from contextlib import contextmanager from queue import Empty import time import riberry from riberry.app.misc.signals import task_prerun, task_postrun from ..exc import Defer from ..task_queue import TaskQueue, Task log = riberry.log.make(__name__) def execution_listener(task_queue: TaskQueue): try: task: Task = task_queue.queue.get(timeout=2.0) except Empty: return execution_thread = threading.Thread( name=f'{task.execution_id}:{task.id}:{task.definition.name}', target=execute, args=(task, task_queue) ) execution_thread.start() def execute(task: Task, task_queue: TaskQueue): try: job_execution: riberry.model.job.JobExecution = riberry.model.conn.query( riberry.model.job.JobExecution ).filter_by( id=task.execution_id, ).one() if job_execution.task_id == task.id: task_scope = execute_entry_task(job_execution=job_execution, task=task) else: task_scope = execute_receiver_task(job_execution=job_execution, task=task) context_scope = riberry.app.current_context.scope( root_id=job_execution.task_id, task_id=task.id, task_name=task.definition.name, stream=task.definition.stream, category=None, step=task.definition.step, ) with context_scope, task_scope: execute_task(job_execution=job_execution, task=task) finally: task_queue.counter.decrement() @contextmanager def execute_entry_task(job_execution: riberry.model.job.JobExecution, task: Task): riberry.app.actions.executions.execution_started( task_id=task.id, root_id=job_execution.task_id, job_id=task.execution_id, primary_stream=task.definition.stream, ) yield # noinspection PyUnusedLocal @contextmanager def execute_receiver_task(job_execution: riberry.model.job.JobExecution, task: Task): yield def execute_task(job_execution: riberry.model.job.JobExecution, task: Task): task_prerun(context=riberry.app.current_context, props={}) riberry.app.current_riberry_app.backend.set_active_task(task=task) status = 'SUCCESS' start_time = time.time() try: task.definition.func() except Defer: status = None except Exception as exc: status = 'FAILURE' riberry.app.actions.artifacts.create_artifact_from_traceback(category='Fatal') log.exception('Failed with exception: %s', exc) finally: end_time = time.time() log.info('Completed in %.4f seconds', end_time - start_time) if status: riberry.app.actions.executions.execution_complete( task_id=task.id, root_id=job_execution.task_id, status=status, stream=task.definition.stream, context=riberry.app.current_context, ) task_postrun(context=riberry.app.current_context, props={}, state=status or 'IGNORED') riberry.app.current_riberry_app.backend.set_active_task(task=None) PK!ӃxWW>riberry/app/backends/impl/pool/tasks/external_task_receiver.pyfrom typing import List import riberry from riberry.app import current_context as ctx from ..task_queue import TaskQueue def ready_external_tasks() -> List[riberry.model.job.JobExecutionExternalTask]: return riberry.model.conn.query( riberry.model.job.JobExecutionExternalTask ).filter_by( status='READY', ).join(riberry.model.job.JobExecution).filter_by( status='ACTIVE', ).join(riberry.model.job.Job).filter_by( instance=ctx.current.riberry_app_instance, ).all() def queue_receiver_tasks(queue: TaskQueue): with queue.lock: if not queue.limit_reached(): with riberry.model.conn: external_tasks = ready_external_tasks() while external_tasks and not queue.limit_reached(): queue.submit_receiver_task(external_tasks.pop()) PK! riberry/app/base.pyimport riberry class RiberryApplicationConfig: def __init__(self, **kwargs): self.enable_steps = kwargs.get('enable_steps', True) class RiberryApplication: __registered__ = {} def __init__(self, *, name, backend, config=None, addons=None): self.name = name self.config = config or RiberryApplicationConfig() self.__registered__[self.name] = self self.context: riberry.app.context.Context = riberry.app.context.Context() self.entry_points = {} self.backend: riberry.app.backends.RiberryApplicationBackend = backend self.backend.initialize() self.addons = { **self.backend.default_addons(), **(addons or {}) } for addon in self.addons.values(): if addon is not None: addon.register(riberry_app=self) @classmethod def by_name(cls, name): return cls.__registered__[name] def entry_point(self, form, stream=None, step=None): stream = stream or self.backend.default_stream_name step = step or self.backend.default_step_name def wrapper(func): self.entry_points[form] = EntryPoint( form=form, func=func, stream=stream, step=step, ) return func return wrapper def task(self, func=None, **options): return self.backend.task(func=func, **options) def register_task(self, func, **options): return self.backend.register_task(func=func, **options) def start(self, execution_id, root_id, form) -> str: if form not in self.entry_points: raise ValueError(f'Application {self.name!r} does not have an entry point with ' f'name {form!r} registered.') entry_point: EntryPoint = self.entry_points[form] with self.context.flow.stream_scope(stream=entry_point.stream): return self.backend.start_execution(execution_id=execution_id, root_id=root_id, entry_point=entry_point) class EntryPoint: def __init__(self, form, func, stream, step): self.form = form self.func = func self.stream = stream self.step = step PK!GHQriberry/app/context/__init__.pyfrom contextlib import contextmanager import riberry from .artifact import Artifact from .current import ContextCurrent from .event_registry import EventRegistry, EventRegistryHelper from .external_task import ExternalTask from .flow import Flow from .input_mapping import InputMappings from .report import Report from .shared_data import SharedExecutionData class Context: def __init__(self): self.current = ContextCurrent(context=self) self.input = InputMappings(context=self) self.data = SharedExecutionData(context=self) self.flow = Flow(context=self) self.artifact = Artifact() self.report = Report(context=self) self.external_task = ExternalTask(context=self) self.event_registry = EventRegistry(context=self) self.on = EventRegistryHelper(context=self) @contextmanager def scope(self, root_id, task_id, task_name, stream, category, step): with self.current.scope( root_id=root_id, task_id=task_id, task_name=task_name, stream=stream, category=category, step=step, ): yield def spawn(self, form_name, job_name=None, input_values=None, input_files=None, owner=None, execute=True): return riberry.app.actions.jobs.create_job( form_name=form_name, job_name=job_name, input_files=input_files, input_values=input_values, owner=owner, execute=execute, ) PK!wZ88riberry/app/context/artifact.pyfrom typing import Union, Optional import riberry from riberry.model.job import ArtifactType class Artifact: @staticmethod def create( filename: str, content: Union[bytes, str], name: str = None, type: Union[str, ArtifactType] = ArtifactType.output, category='Default', data: dict = None, ): return riberry.app.actions.artifacts.create_artifact( filename=filename, content=content, name=name, type=type, category=category, data=data, ) @staticmethod def create_from_traceback( name: Optional[str] = None, filename: Optional[str] = None, category: str = 'Intercepted', type: riberry.model.job.ArtifactType = riberry.model.job.ArtifactType.error, ): return riberry.app.actions.artifacts.create_artifact_from_traceback( name=name, filename=filename, type=type, category=category, ) PK! riberry/app/context/current.pyimport threading import uuid from contextlib import contextmanager from typing import Optional import riberry class ContextCurrent: _state = threading.local() def __init__(self, context): self.context: riberry.app.context.Context = context self._worker_uuid = str(uuid.uuid4()) def _get_state(self, key, default=None): _data = getattr(self._state, 'state', {}) return _data.get(key, default) def _set_state(self, **state): self._state.state = state @property def WORKER_UUID(self): return self._worker_uuid @property def stream(self): return self._get_state('stream') @property def step(self): return self._get_state('step') @property def category(self): return self._get_state('category') @property def backend(self) -> 'riberry.app.backends.RiberryApplicationBackend': return self.riberry_app.backend @property def task(self): return self.backend.active_task() @property def riberry_app(self) -> 'riberry.app.RiberryApplication': return riberry.app.current_riberry_app @property def riberry_app_instance(self) -> riberry.model.application.ApplicationInstance: return riberry.app.env.get_instance_model() @property def task_id(self): return self._get_state('task_id') @property def task_name(self): return self._get_state('task_name') @property def root_id(self): return self._get_state('root_id') @property def job_execution(self) -> Optional[riberry.model.job.JobExecution]: return riberry.model.job.JobExecution.query().filter_by(task_id=self.root_id).first() @property def job(self) -> Optional[riberry.model.job.Job]: job_execution = self.job_execution return job_execution.job if self.job_execution else None @contextmanager def scope(self, root_id, task_id, task_name, stream, category, step): try: self._set_state( root_id=root_id, task_name=task_name, task_id=task_id, step=step, stream=stream, category=category) yield finally: self._set_state() @property def progress(self) -> str: progress: riberry.model.job.JobExecutionProgress = riberry.model.job.JobExecutionProgress.query().filter_by( job_execution=self.job_execution, ).order_by( riberry.model.job.JobExecutionProgress.id.desc(), ).limit( 1 ).first() return progress.message if progress else None @progress.setter def progress(self, message: str): if message == self.progress: return progress = riberry.model.job.JobExecutionProgress( job_execution=self.job_execution, message=message, ) riberry.model.conn.add(progress) riberry.model.conn.commit() PK!_ln %riberry/app/context/event_registry.pyimport enum from collections import defaultdict from functools import partial import riberry class EventRegistryTypes(enum.Enum): on_completion = 'on_completion' on_data_updated = 'on_data_update' on_report_refresh = 'on_report_refresh' on_external_result_received = 'on_external_result_received' class EventRegistry: types: EventRegistryTypes = EventRegistryTypes def __init__(self, context): self.context: riberry.app.context.Context = context self._registrations = defaultdict(set) @staticmethod def _make_key(func, **key): if not key and func: return tuple([ ('key', riberry.app.util.misc.function_path(func)) ]) return tuple(sorted(key.items())) def register(self, event_type: EventRegistryTypes, **key): def inner(func): formatted_key = self._make_key(func, **key) self._registrations[(event_type, formatted_key)].add(func) return func return inner def get(self, event_type: EventRegistryTypes, **key): key = self._make_key(func=None, **key) return self._registrations[(event_type, key)] def call(self, event_type: EventRegistryTypes, args=None, kwargs=None, **key): functions = self.get(event_type=event_type, **key) return [function(*args or (), **kwargs or {}) for function in functions] class EventRegistryHelper: def __init__(self, context): self.context: riberry.app.context.Context = context @property def _register(self): return self.context.event_registry.register def _register_single_execution_function(self, func, event_type, key, registration_key=None): key = riberry.app.util.misc.internal_data_key(key=f'once.{key}') func = partial(self.context.data.execute_once, key=key, func=func) return self._register(event_type=event_type, **(registration_key or {}))(func) def execution_failed(self, func): return self._register_single_execution_function( func=func, event_type=EventRegistryTypes.on_completion, key='execution_failed', registration_key=dict( status='FAILURE', ) ) def execution_succeeded(self, func): return self._register_single_execution_function( func=func, event_type=EventRegistryTypes.on_completion, key='execution_succeeded', registration_key=dict( status='SUCCESS', ) ) def data_updated(self, name): return self._register(event_type=EventRegistryTypes.on_data_updated, data_name=name) def external_result_received(self, func): return self._register(event_type=EventRegistryTypes.on_external_result_received)(func) def report_refresh(self, report, bindings, renderer=None): def inner(func): def refresh(): self.context.report.update(report=report, body=func(), renderer=renderer) self._register(event_type=EventRegistryTypes.on_report_refresh, report=report)(refresh) for binding in bindings: self._register(event_type=EventRegistryTypes.on_data_updated, data_name=binding)(refresh) return func return inner PK!hPlj$riberry/app/context/external_task.pyimport inspect from typing import Optional import riberry class ExternalTask: def __init__(self, context): self.context: riberry.app.context.Context = context def create( self, name: str = None, type: str = 'external', external_task_id: Optional[str] = None, input_data: Optional[bytes] = None ): external_task = riberry.app.actions.external_task.create_external_task( job_execution=self.context.current.job_execution, name=name, type=type, external_task_id=external_task_id, input_data=input_data, ) return ExternalTaskCreationResult(context=self.context, instance=external_task) def create_receiver_task(self, external_task_id, validator): if inspect.isfunction(validator): validator = riberry.app.util.misc.function_path(validator) return self.context.current.riberry_app.backend.create_receiver_task( external_task_id=external_task_id, validator=validator, ) @staticmethod def mark_as_ready(external_task_id, output_data): return riberry.app.actions.external_task.mark_as_ready( external_task_id=external_task_id, output_data=output_data ) class ExternalTaskCreationResult: def __init__(self, context, instance): self.context: riberry.app.context.Context = context self.instance = instance self.task_id = instance.task_id def create_receiver_task(self, validator=None): return self.context.external_task.create_receiver_task(external_task_id=self.task_id, validator=validator) PK!PFXffriberry/app/context/flow.pyfrom contextlib import contextmanager import riberry class Flow: def __init__(self, context): self.context: riberry.app.context.Context = context self.scoped_stream = None self.scoped_category = None @contextmanager def stream_scope(self, stream=None, category=None): try: self.scoped_stream = stream self.scoped_category = category yield self finally: self.scoped_stream = None self.scoped_category = None def start(self, task, stream: str = None): return TaskWrap( task, __rib_stream_start=True, __rib_stream=self.cleanse_stream_name(stream), ) def step(self, task, step: str = None, stream: str = None): return TaskWrap( task, __rib_step=step, __rib_stream=self.cleanse_stream_name(stream), ) def end(self, task, stream: str = None): return TaskWrap( task, __rib_stream_end=True, __rib_stream=self.cleanse_stream_name(stream), ) def cleanse_stream_name(self, stream: str): stream = stream or self.scoped_stream if not stream: raise ValueError('Stream name cannot be blank') return str(stream) class TaskWrap: def __init__(self, func, **kwargs): self.func = func self.kwargs = kwargs @property def name(self): return self.func.name def _mixin_kw(self, kwargs): return {**self.kwargs, **kwargs} def s(self, *args, **kwargs): return self.func.s(*args, **self._mixin_kw(kwargs=kwargs)) def si(self, *args, **kwargs): return self.func.si(*args, **self._mixin_kw(kwargs=kwargs)) def delay(self, *args, **kwargs): return self.func.delay(*args, **self._mixin_kw(kwargs=kwargs)) PK!;&zK K $riberry/app/context/input_mapping.pyimport csv import io import json from collections import Mapping from operator import itemgetter from typing import Union, AnyStr, Iterator, Any, Dict import riberry class InputMappings: def __init__(self, context): self.values = InputValueMapping(context=context) self.files = InputFileMapping(context=context) class InputMapping(Mapping): def __init__(self, context, cls): self.context: riberry.app.context.Context = context self.cls = cls def _get_value(self, instance): raise NotImplementedError def get(self, item, default=None): if isinstance(item, str): return super().get(item, default=default) return [super(InputMapping, self).get(_, default) for _ in item] def __getitem__(self, item: Union[AnyStr, Iterator[AnyStr]]) -> Any: query = self.cls.query().filter(self.cls.job == self.context.current.job) if isinstance(item, str): instance = query.filter_by(internal_name=item).first() if not instance: raise KeyError(item) return self._get_value(instance) else: instances = query.filter(self.cls.internal_name.in_(item)).all() mapping = {instance.internal_name: self._get_value(instance) for instance in instances} return tuple(mapping[key] for key in item) def __len__(self) -> int: return self.cls.query().filter_by(job=self.context.current.job).count() def __iter__(self) -> Iterator[AnyStr]: instances = riberry.model.conn.query( self.cls.internal_name ).filter_by( job=self.context.current.job ).all() return map(itemgetter(0), instances) @property def dict(self) -> Dict[AnyStr, Any]: return { instance.internal_name: instance.value for instance in self.cls.query().filter_by(job=self.context.current.job).all() } class InputValueMapping(InputMapping): def __init__(self, context): super().__init__(context=context, cls=riberry.model.interface.InputValueInstance) def _get_value(self, instance): return instance.value class InputFileMapping(InputMapping): def __init__(self, context): super().__init__(context=context, cls=riberry.model.interface.InputFileInstance) def _get_value(self, instance): return InputFileReader(instance) def __getitem__(self, item: Union[AnyStr, Iterator[AnyStr]]) -> Union[ 'InputFileReader', Iterator['InputFileReader']]: return super().__getitem__(item) class InputFileReader: instance: riberry.model.interface.InputFileInstance def __init__(self, file_instance): self.instance = file_instance @property def filename(self): return self.instance.filename @property def size(self): return self.instance.size def bytes(self) -> bytes: return self.instance.binary def text(self, *args, **kwargs) -> str: return self.bytes().decode(*args, **kwargs) def json(self, *args, **kwargs): return json.loads(self.bytes(), *args, **kwargs) def csv(self, *args, **kwargs): reader = csv.DictReader(io.StringIO(self.text()), *args, **kwargs) for row in reader: yield row def __bool__(self): return bool(self.instance.binary) PK!x+riberry/app/context/report.pyimport json from functools import partial import riberry class Report: def __init__(self, context): self.context: riberry.app.context.Context = context def model(self, name): job_execution = self.context.current.job_execution report: riberry.model.job.JobExecutionReport = riberry.model.job.JobExecutionReport.query().filter_by( job_execution=job_execution, name=name, ).first() if not report: self.context.data.execute_once( key=riberry.app.util.misc.internal_data_key(f'once.create_report.{name}'), func=partial(self._create_report, name=name, job_execution=job_execution) ) return self.model(name=name) return report @staticmethod def _create_report(name, job_execution): report = riberry.model.job.JobExecutionReport(job_execution=job_execution, name=name) riberry.model.conn.add(report) riberry.model.conn.commit() def mark_for_refresh(self, name): report = self.model(name=name) report.marked_for_refresh = True riberry.model.conn.commit() def update(self, report, body, renderer=None): model = self.model(name=report) model.marked_for_refresh = False model.renderer = renderer or model.renderer model.report = json.dumps(body).encode() riberry.model.conn.commit() PK!qpX"riberry/app/context/shared_data.pyimport datetime from collections import Mapping, defaultdict from contextlib import contextmanager from operator import itemgetter from typing import AnyStr, Any, Dict, Iterator import pendulum import time from sqlalchemy.exc import IntegrityError import riberry class SharedExecutionData(Mapping): def __init__(self, context): self.context: riberry.app.context.Context = context self._lock: Dict[AnyStr, riberry.model.misc.ResourceData] = {} self._dirty = set() self._listeners = defaultdict(list) def listen(self, key, callback): self._listeners[key].append(callback) def __getitem__(self, item: AnyStr) -> Any: return self._get_instance(key=item).value def __setitem__(self, key, value): if key not in self._lock: raise PermissionError(f'No lock present for data key {key!r}') if pendulum.DateTime.utcnow() > pendulum.instance(self._lock[key].expiry): raise TimeoutError(f'Lock for data key {key!r} has expired!') self._lock[key].value = value self._dirty.add(key) def __delitem__(self, key): instance = self._get_instance(key=key) riberry.model.conn.delete(instance=instance) riberry.model.conn.commit() def __len__(self) -> int: return riberry.model.misc.ResourceData.query().filter_by( resource_id=self.context.current.job_execution.id, resource_type=riberry.model.misc.ResourceType.job_execution, ).count() def __iter__(self) -> Iterator[AnyStr]: instances = riberry.model.conn.query( riberry.model.misc.ResourceData.name ).filter_by( resource_id=self.context.current.job_execution.id, resource_type=riberry.model.misc.ResourceType.job_execution, ).all() return map(itemgetter(0), instances) def _get_instance(self, key): job_execution = self.context.current.job_execution # check if key exists instance = riberry.model.misc.ResourceData.query().filter_by( resource_id=job_execution.id, resource_type=riberry.model.misc.ResourceType.job_execution, name=key, ).first() # create if it doesn't if not instance: instance = riberry.model.misc.ResourceData( resource_id=job_execution.id, resource_type=riberry.model.misc.ResourceType.job_execution, name=key ) try: riberry.model.conn.add(instance) riberry.model.conn.commit() except IntegrityError: riberry.model.conn.rollback() return self._get_instance(key=key) return instance def _acquire_lock(self, key, ttl, poll_interval): instance = self._get_instance(key=key) lock_value = self.context.current.task_id while True: riberry.model.conn.query(riberry.model.misc.ResourceData).filter( (riberry.model.misc.ResourceData.id == instance.id) & ( (riberry.model.misc.ResourceData.lock == None) | (riberry.model.misc.ResourceData.expiry < datetime.datetime.now(tz=datetime.timezone.utc)) ) ).update({ 'lock': lock_value, 'expiry': pendulum.DateTime.utcnow().add(seconds=ttl) }, synchronize_session=False) riberry.model.conn.commit() riberry.model.conn.expire(instance=instance) if instance.lock != lock_value: time.sleep(poll_interval) else: self._lock[key] = instance break def _release_lock(self, key): if self._lock: instance = self._lock.pop(key) instance.lock = None instance.expiry = None instance.marked_for_refresh = instance.marked_for_refresh or (key in self._dirty) riberry.model.conn.commit() if key in self._dirty: for listener in self._listeners[key]: listener(key) self._dirty.remove(key) @contextmanager def lock(self, key, ttl=60, poll_interval=1): try: yield self._acquire_lock(key=key, ttl=ttl, poll_interval=poll_interval) finally: self._release_lock(key=key) def execute_once(self, key, func): with self.context.data.lock(key=key): if not self.context.data[key]: self.context.data[key] = True func() def set(self, key, value, **kwargs): with self.lock(key=key, **kwargs): self[key] = value() if callable(value) else value PK!qvtriberry/app/env.pyimport os import riberry from .base import RiberryApplication from .context import Context from .util.misc import Proxy __cache = dict( app_name={}, ) __cache_app_name = __cache['app_name'] def get_instance_name(raise_on_none=True) -> str: if 'RIBERRY_INSTANCE' not in os.environ and raise_on_none: raise EnvironmentError("Environment variable 'RIBERRY_INSTANCE' not set") return os.environ.get('RIBERRY_INSTANCE') def get_application_name() -> str: instance_name = get_instance_name(raise_on_none=True) if instance_name not in __cache_app_name: __cache_app_name[instance_name] = get_instance_model().application.internal_name return __cache_app_name[instance_name] def get_instance_model() -> riberry.model.application.ApplicationInstance: return riberry.model.application.ApplicationInstance.query().filter_by( internal_name=get_instance_name(raise_on_none=True), ).one() def is_current_instance(instance_name: str) -> bool: return bool(instance_name and get_instance_name(raise_on_none=False) == instance_name) current_riberry_app: RiberryApplication = Proxy( getter=lambda: RiberryApplication.by_name(name=get_application_name()) ) current_context: Context = Proxy( getter=lambda: current_riberry_app.context ) PK!2riberry/app/misc/__init__.py PK!zzriberry/app/misc/signals.pyfrom celery import signals import riberry from ..util.events import create_event __stream_cache = {} def try_cache_stream(root_id, stream_name, stream_state): key = root_id, stream_name, stream_state if key not in __stream_cache: __stream_cache[key] = None if len(__stream_cache) > 10000: evict_key = next(iter(__stream_cache)) __stream_cache.pop(evict_key) return True return False @signals.worker_process_init.connect def worker_process_init(*args, **kwargs): riberry.model.conn.raw_engine.dispose() riberry.model.conn.remove() @signals.celeryd_after_setup.connect def celeryd_after_setup(*args, **kwargs): riberry.model.conn.raw_engine.dispose() riberry.model.conn.remove() @signals.before_task_publish.connect def before_task_publish(sender, headers, body, **_): try: root_id = riberry.app.current_context.current.root_id except: return args, kwargs, *_ = body task_id = headers['id'] if '__rib_stream_start' in kwargs: stream = str(kwargs['__rib_stream']) if try_cache_stream(root_id=root_id, stream_name=stream, stream_state='QUEUED'): create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': stream, 'state': 'QUEUED', } ) if '__rib_step' in kwargs and riberry.app.current_riberry_app.config.enable_steps: stream, step = kwargs['__rib_stream'], kwargs['__rib_step'] create_event( name='step', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'step': str(step), 'state': 'QUEUED', } ) def task_prerun(context, props): task_id = context.current.task_id root_id = context.current.root_id stream = context.current.stream step = context.current.step if not stream: return if 'stream_start' in props: if try_cache_stream(root_id=root_id, stream_name=stream, stream_state='ACTIVE'): create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'state': 'ACTIVE', } ) if step and riberry.app.current_riberry_app.config.enable_steps: create_event( name='step', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'step': str(step), 'state': 'ACTIVE', } ) def task_postrun(context, props, state): task_id = context.current.task_id root_id = context.current.root_id stream = context.current.stream step = context.current.step if not stream: return if 'stream_start' in props and state in ('RETRY', 'FAILURE'): create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'state': state, } ) if 'stream_end' in props: create_event( name='stream', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'state': 'SUCCESS' if state == 'IGNORED' else state or 'FAILURE', } ) if step and riberry.app.current_riberry_app.config.enable_steps: create_event( name='step', root_id=root_id, task_id=task_id, data={ 'stream': str(stream), 'step': str(step), 'state': 'SUCCESS' if state == 'IGNORED' else state or 'FAILURE', } ) PK! riberry/app/tasks.pyfrom typing import Callable, Optional, List import pendulum from sqlalchemy import desc, asc import riberry from riberry.app.util import execution_tracker as tracker from . import actions, env log = riberry.log.make(__name__) def echo(): with riberry.model.conn: app_instance = env.get_instance_model() heartbeat = riberry.model.application.Heartbeat.query().filter_by(instance=app_instance).first() if not heartbeat: heartbeat = riberry.model.application.Heartbeat(instance=app_instance) riberry.model.conn.add(heartbeat) heartbeat.updated = pendulum.DateTime.utcnow() riberry.model.conn.commit() def poll( track_executions: bool = True, filter_func: Optional[Callable[[riberry.model.job.JobExecution], bool]] = None, ): with riberry.model.conn: app_instance = env.get_instance_model() if track_executions: tracker.check_stale_execution(app_instance=app_instance) instance_name = app_instance.internal_name if app_instance.status != 'online': log.debug(f'Instance {instance_name!r} is not online, skipped polling executions') return if app_instance.active_schedule_value('accept', default='Y') == 'N': log.debug(f'Instance {instance_name!r} is not accepting new executions, skipped polling executions') return executions: List[riberry.model.job.JobExecution] = riberry.model.job.JobExecution.query().filter( riberry.model.job.JobExecution.status == 'RECEIVED' ).join(riberry.model.job.Job).order_by( desc(riberry.model.job.JobExecution.priority), asc(riberry.model.job.JobExecution.created), asc(riberry.model.job.JobExecution.id), ).filter_by(instance=app_instance).all() for execution in executions: if execution_limit_reached(app_instance=app_instance): log.debug(f'Instance {instance_name!r} has reached the allowed limit of active/ready executions') return if callable(filter_func) and not filter_func(execution): continue execution_task_id = actions.executions.queue_job_execution( execution=execution, track_executions=track_executions) log.info(f'Queueing execution: id={execution.id!r}, root={execution_task_id!r}, job={execution.job.name!r}') def execution_limit_reached(app_instance: riberry.model.application.ApplicationInstance) -> bool: limit_raw = str(app_instance.active_schedule_value('limit')) if not limit_raw.isdigit(): return False limit = int(limit_raw) if limit > 0: active_execution_count = riberry.model.job.JobExecution.query().filter( riberry.model.job.JobExecution.status.in_(('READY', 'ACTIVE')) ).join( riberry.model.job.Job ).filter_by( instance=app_instance ).count() return active_execution_count >= limit return False def refresh(): with riberry.model.conn: app_instance = env.get_instance_model() actions.shared_data.update_all_data_items(app_instance=app_instance) actions.reports.update_all_reports(app_instance=app_instance) PK!~@::riberry/app/util/__init__.pyfrom . import misc, events, execution_tracker, redis_lock PK!g4riberry/app/util/events.pyimport json import traceback import pendulum import riberry def create_event(name, root_id, task_id, data=None, binary=None): if not root_id: return if isinstance(binary, str): binary = binary.encode() evt = riberry.model.misc.Event( name=name, time=pendulum.DateTime.utcnow().timestamp(), task_id=task_id, root_id=root_id, data=json.dumps(data), binary=binary, ) try: riberry.model.conn.add(evt) riberry.model.conn.commit() riberry.model.conn.flush([evt]) except: traceback.print_exc() riberry.model.conn.rollback() PK!k: ܫ%riberry/app/util/execution_tracker.pyfrom typing import List from sqlalchemy import desc, asc import riberry log = riberry.log.make(__name__) def _tracker_key(value): return f'workflow:active:{value}' def start_tracking_execution(root_id): redis = riberry.celery.util.celery_redis_instance() instance = riberry.app.env.get_instance_name() key = _tracker_key(instance) log.info(f'execution_tracker: Tracking workflow {root_id!r} via key {key!r}') redis.sadd(key, root_id) def check_stale_execution(app_instance): executions: List[riberry.model.job.JobExecution] = riberry.model.job.JobExecution.query().filter( riberry.model.job.JobExecution.status.in_(('ACTIVE', 'READY')) ).join(riberry.model.job.Job).order_by( desc(riberry.model.job.JobExecution.priority), asc(riberry.model.job.JobExecution.created) ).filter_by(instance=app_instance).all() if not executions: return redis = riberry.celery.util.celery_redis_instance() for execution in executions: if not redis.sismember(_tracker_key(app_instance.internal_name), execution.task_id): log.warning( f'execution_tracker: Identified stale workflow. ' f'Root ID: {execution.task_id}, Execution ID: {execution.id}' ) riberry.app.actions.artifacts.create_artifact( name='Workflow Cancelled', type=riberry.model.job.ArtifactType.error, category='Fatal', filename='fatal.log', content=( f'The current executions\'s ID ({execution.task_id}) was not found within Redis and has ' f'therefore been cancelled. This usually occurs when Redis is flushed while an execution is ' f'in the READY or ACTIVE state.' ), task_id=execution.task_id, root_id=execution.task_id, ) riberry.app.actions.executions.execution_complete( task_id=execution.task_id, root_id=execution.task_id, status='FAILURE', stream=None, context=riberry.app.current_context, ) PK!ְ({{riberry/app/util/misc.pyimport inspect class Proxy: def __init__(self, getter): self.get = getter def __getattr__(self, item): return getattr(self.get(), item) def __repr__(self): return f'Proxy({self.get()})' def function_path(func): return f'{inspect.getmodule(func).__name__}.{func.__name__}' def internal_data_key(key): return f'_internal:{key}' PK!iriberry/app/util/redis_lock.pyimport time from redis.exceptions import LockError import riberry log = riberry.log.make(__name__) class RedisLock: def __init__(self, name, on_acquired, interval, min_interval=100): self.name = name self.on_acquired = on_acquired self.interval = interval self.min_interval = min_interval @property def key_timeout(self): return f'lock:{self.name}:timeout' @property def key_lock(self): return f'lock:{self.name}:acquire' def run(self, redis_instance): if redis_instance.get(name=self.key_timeout) is None: try: self._attempt_lock(redis_instance=redis_instance) except LockError: pass def _attempt_lock(self, redis_instance): with redis_instance.lock(name=self.key_lock, timeout=60, blocking_timeout=0.0): log.debug(f"{self.name}: acquired lock...") time_start = time.time() try: self.on_acquired() finally: process_time = time.time() - time_start self._set_timeout(redis_instance=redis_instance, process_time=process_time) def _set_timeout(self, redis_instance, process_time): expiry = int(max(self.interval - (process_time * 1000), self.min_interval)) redis_instance.set(name=self.key_timeout, value='1', px=expiry) log.debug(f'{self.name}: processed task in {process_time:03}, setting lock expiry to {expiry:03} milliseconds') PK!09Triberry/celery/__init__.pyfrom . import background, util PK!%riberry/celery/background/__init__.pyfrom celery import Celery from riberry import config app = Celery(main='background-tasks') app.conf.update(config.config.celery) app.conf.beat_schedule.update({ 'process:execution': { 'task': 'riberry.celery.background.tasks.process_events', 'schedule': config.config.background.events.interval, 'kwargs': { 'event_limit': config.config.background.events.processing_limit }, 'options': {'queue': 'riberry.background.events'} }, 'process:job-schedule': { 'task': 'riberry.celery.background.tasks.job_schedules', 'schedule': config.config.background.schedules.interval, 'options': {'queue': 'riberry.background.schedules'} }, 'process:capacity': { 'task': 'riberry.celery.background.tasks.update_capacity_parameters', 'schedule': config.config.background.capacity.interval, 'options': {'queue': 'riberry.background.schedules'} }, }) app.conf.imports = list(app.conf.imports) + ['riberry.celery.background.tasks'] def register_task(task_path, schedule): app.conf.beat_schedule[task_path] = { 'task': 'riberry.celery.background.tasks.custom_task', 'schedule': schedule, 'args': [task_path], 'options': {'queue': 'riberry.background.custom'} } PK!  ,riberry/celery/background/capacity_config.pyfrom collections import Counter, defaultdict from enum import Enum from typing import Optional, Dict, List, Set import itertools from celery.utils.log import logger from riberry import model class ConsumerStatus(Enum): active = 'active' inactive = 'inactive' class CapacityConsumer: def __init__(self, name: str, status: ConsumerStatus, requested_capacity: Optional[int] = None): self.name = name self.status = status self.requested_capacity = requested_capacity def __repr__(self): return f'CapacityConsumer(name={self.name!r})' @staticmethod def total_capacity(consumers, default=0): return sum(c.requested_capacity or default for c in consumers if c.status == ConsumerStatus.active) @classmethod def distribute(cls, consumers, total_capacity) -> Dict['CapacityConsumer', List[int]]: total_requested_capacity = cls.total_capacity(consumers=consumers, default=total_capacity) or total_capacity return { consumer: ( [(consumer.requested_capacity or total_capacity) / total_requested_capacity * total_capacity] if consumer.status == ConsumerStatus.active else [0] ) for consumer in consumers } @classmethod def from_weight_parameter(cls, parameter_name: str): schedules: List[model.application.ApplicationInstanceSchedule] = \ model.application.ApplicationInstanceSchedule.query().filter_by(parameter=parameter_name).all() instances: Set[model.application.ApplicationInstance] = {sched.instance for sched in schedules} schedule_values = { instance.internal_name: int(instance.active_schedule_value(name=parameter_name, default=0)) for instance in instances } execution_count = execution_count_for_instances(instances=instances) return [ CapacityConsumer( name=instance.internal_name, status=( ConsumerStatus.active if instance.status == 'online' and execution_count[instance.internal_name] else ConsumerStatus.inactive ), requested_capacity=schedule_values[instance.internal_name] ) for instance in instances ] class CapacityProducer: def __init__(self, name: str, capacity: int): self.name = name self.capacity = capacity def __repr__(self): return f'CapacityProducer(name={self.name!r}, capacity={self.capacity})' @staticmethod def total_capacity(producers): return sum(p.capacity for p in producers) @staticmethod def producers_name_pool(producers, distribution_strategy: model.application.CapacityDistributionStrategy): name_lists = sorted( [[producer.name] * producer.capacity for producer in producers], key=len, reverse=True ) if distribution_strategy == model.application.CapacityDistributionStrategy.spread: name_lists = [filter(None, name_list) for name_list in itertools.zip_longest(*name_lists)] return list(itertools.chain.from_iterable(name_lists)) def weighted_schedules(parameter_name: str): schedules = model.application.ApplicationInstanceSchedule.query().filter_by(parameter=parameter_name).all() return schedules def execution_count_for_instances(instances, states=('ACTIVE', 'READY')): execution_count = defaultdict(int) job_executions: List[model.job.JobExecution] = model.job.JobExecution.query().filter( model.job.JobExecution.status.in_(states)).all() for job_execution in job_executions: instance = job_execution.job.instance if instance in instances: execution_count[instance.internal_name] += 1 return execution_count def update_instance_schedule( instance: model.application.ApplicationInstance, capacity: int, producer_allocations: Counter, allocation_config_name: str, capacity_config_name: str ): for sched in list(instance.schedules): if sched.parameter in (capacity_config_name, allocation_config_name): model.conn.delete(sched) schedule_capacity = model.application.ApplicationInstanceSchedule( instance=instance, parameter=capacity_config_name, value=str(capacity), priority=100, ) schedule_allocation = model.application.ApplicationInstanceSchedule( instance=instance, parameter=allocation_config_name, value=' '.join(f'{k}|{v}' for k, v in sorted(producer_allocations.items())), priority=101, ) model.conn.add(schedule_capacity) model.conn.add(schedule_allocation) def update_instance_capacities( producers, weight_parameter, capacity_parameter, producer_parameter, distribution_strategy): consumers = CapacityConsumer.from_weight_parameter(parameter_name=weight_parameter) total_capacity = CapacityProducer.total_capacity(producers=producers) capacity_distribution = CapacityConsumer.distribute(consumers=consumers, total_capacity=total_capacity) producer_name_pool = CapacityProducer.producers_name_pool( producers=producers, distribution_strategy=distribution_strategy) logger.info(f'[{weight_parameter}] Total capacity: {total_capacity}') for consumer, capacities in sorted(capacity_distribution.items(), key=lambda x: x[0].name): raw_capacity = round(sum(capacities)) allocated_names = producer_name_pool[:raw_capacity] producer_name_pool = producer_name_pool[raw_capacity:] capacity = len(allocated_names) producer_allocations = Counter(allocated_names) instance = model.application.ApplicationInstance.query().filter_by(internal_name=consumer.name).one() update_instance_schedule( instance=instance, capacity=capacity, producer_allocations=producer_allocations, allocation_config_name=producer_parameter, capacity_config_name=capacity_parameter, ) allocations_formatted = ', '.join([f'{k}: {v:2}' for k, v in sorted(producer_allocations.items())]) or '-' logger.info( f'[{weight_parameter}] {consumer.name} -> capacity: {capacity:2}, allocations: [ {allocations_formatted} ]') PK!,riberry/celery/background/events/__init__.pyPK!aΩ55*riberry/celery/background/events/events.pyimport json import smtplib import traceback from collections import defaultdict from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import List import pendulum from sqlalchemy.orm.exc import NoResultFound from riberry import model, config from celery.utils.log import logger def email_notification(host, body, mime_type, subject, sender, recipients: List): if not recipients: logger.warn('Attempted to send email notification with no recipients provided.') return try: msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = sender msg['To'] = ', '.join(recipients) msg.attach(MIMEText(body, mime_type)) s = smtplib.SMTP(host) s.sendmail(sender, recipients, msg.as_string()) s.quit() except: logger.warn(f'An error occurred while sending email notification: {traceback.format_exc()}') def handle_artifacts(events: List[model.misc.Event]): job_executions = {} streams = {} to_delete = [] for event in events: try: event_data = json.loads(event.data) stream_name = event_data['stream'] artifact = model.job.JobExecutionArtifact( name=event_data['name'] or 'Untitled', type=model.job.ArtifactType[event_data['type']], category=event_data['category'] or 'Default', filename=event_data['filename'] or 'Untitled', size=len(event.binary) if event.binary else 0, created=pendulum.from_timestamp(event.time), binary=model.job.JobExecutionArtifactBinary(binary=event.binary), data=[ model.job.JobExecutionArtifactData( title=str(title), description=str(description) ) for title, description in event_data['data'].items() if title and description ] ) if event.root_id not in job_executions: try: job_executions[event.root_id] = model.job.JobExecution.query().filter_by(task_id=event.root_id).one() except NoResultFound: to_delete.append(event) continue job_execution = job_executions[event.root_id] artifact.job_execution = job_executions[event.root_id] if stream_name: if (stream_name, event.root_id) not in streams: try: streams[(stream_name, event.root_id)] = model.job.JobExecutionStream.query().filter_by( name=stream_name, job_execution=job_execution).one() except NoResultFound: to_delete.append(event) continue stream = streams[(stream_name, event.root_id)] artifact.stream = stream model.conn.add(artifact) except: logger.warn(f'An error occurred processing artifact event {event}: {traceback.format_exc()}') else: to_delete.append(event) return to_delete def handle_steps(events: List[model.misc.Event]): to_delete = [] steps = {} streams = {} job_executions = {} for event in events: try: event_data = json.loads(event.data) event_time = pendulum.from_timestamp(event.time) stream_name = event_data['stream'] if event.root_id not in job_executions: try: job_executions[event.root_id] = model.job.JobExecution.query().filter_by( task_id=event.root_id).one() except NoResultFound: to_delete.append(event) continue job_execution = job_executions[event.root_id] if (stream_name, event.root_id) not in streams: try: streams[(stream_name, event.root_id)] = model.job.JobExecutionStream.query().filter_by( name=stream_name, job_execution=job_execution).one() except NoResultFound: to_delete.append(event) continue stream = streams[(stream_name, event.root_id)] try: if (stream_name, event.task_id) not in steps: steps[(stream_name, event.task_id)] = model.job.JobExecutionStreamStep.query().filter_by( task_id=event.task_id, stream=stream).one() step = steps[(stream_name, event.task_id)] except NoResultFound: step = model.job.JobExecutionStreamStep( name=event_data['step'], created=pendulum.from_timestamp(event.time), updated=pendulum.from_timestamp(event.time), task_id=event.task_id, stream=stream, status=event_data['state'] ) model.conn.add(step) step_updated = pendulum.instance(step.updated, tz='utc') if event_time >= step_updated: status = event_data['state'] step.status = status step.updated = event_time if status == 'ACTIVE': step.started = event_time elif status in ('SUCCESS', 'FAILURE'): step.completed = event_time except: logger.warn(f'An error occurred processing step event {event}: {traceback.format_exc()}') else: to_delete.append(event) return to_delete def handle_streams(events: List[model.misc.Event]): to_delete = [] streams = {} job_executions = {} for event in events: try: event_data = json.loads(event.data) event_time = pendulum.from_timestamp(event.time, tz='utc') stream_name = event_data['stream'] if not stream_name: logger.warn('Empty stream name provided, skipping') to_delete.append(event) continue if event.root_id not in job_executions: try: job_executions[event.root_id] = model.job.JobExecution.query().filter_by( task_id=event.root_id).one() except NoResultFound: to_delete.append(event) continue job_execution = job_executions[event.root_id] try: if (stream_name, event.root_id) not in streams: streams[(stream_name, event.root_id)] = model.job.JobExecutionStream.query().filter_by( name=stream_name, job_execution=job_execution).one() stream = streams[(stream_name, event.root_id)] except NoResultFound: existing_stream = model.job.JobExecutionStream.query().filter_by(task_id=event.task_id).first() if existing_stream: logger.warn(f'Skipping stream event {event}. Task ID {event.task_id!r} already exists against ' f'an existing stream (id={existing_stream.id}).\n' f'Details:\n' f' root_id: {event.root_id!r}\n' f' name: {stream_name!r}\n' f' data: {event_data}\n') to_delete.append(event) continue stream = model.job.JobExecutionStream( name=str(event_data['stream']), task_id=event.task_id, created=pendulum.from_timestamp(event.time, tz='utc'), updated=pendulum.from_timestamp(event.time, tz='utc'), status='QUEUED', job_execution=job_execution ) model.conn.add(stream) stream_updated = pendulum.instance(stream.updated, tz='utc') if event_time >= stream_updated: status = event_data['state'] stream.status = status stream.updated = event_time if status == 'ACTIVE' and stream.started is None: stream.started = event_time elif status in ('SUCCESS', 'FAILURE'): stream.completed = event_time except: logger.warn(f'An error occurred processing stream event {event}: {traceback.format_exc()}') else: to_delete.append(event) return to_delete def handle_notifications(events: List[model.misc.Event]): to_delete = [] for event in events: event_data = json.loads(event.data) notification_type = event_data['type'] notification_data = event_data['data'] try: execution: model.job.JobExecution = model.job.JobExecution.query().filter_by(task_id=event.root_id).one() user = execution.creator except NoResultFound: to_delete.append(event) continue if notification_type == 'custom-email' and config.config.email.enabled: try: email_notification( host=config.config.email.smtp_server, body=notification_data['body'], mime_type=notification_data.get('mime_type') or 'plain', subject=notification_data['subject'], sender=notification_data.get('from') or config.config.email.sender, recipients=list(filter(None, [user.details.email] + notification_data.get('to', []))), ) except: logger.warn(f'An error occurred processing notification type {notification_type}: ' f'{traceback.format_exc()}') elif notification_type == 'workflow_complete': status = str(notification_data['status']).lower() message = f'Completed execution #{execution.id} for job ' \ f'{execution.job.name} with status {str(status).lower()}' notification = model.misc.Notification( type=( model.misc.NotificationType.success if str(status).lower() == 'success' else model.misc.NotificationType.error ), message=message, user_notifications=[ model.misc.UserNotification(user=user) ], targets=[ model.misc.NotificationTarget(target='JobExecution', target_id=execution.id) ] ) model.conn.add(notification) if config.config.email.enabled and user.details.email: email_notification( host=config.config.email.smtp_server, body=message, mime_type='plain', subject=f'Riberry / {status.title()} / {execution.job.name} / execution #{execution.id}', sender=config.config.email.sender, recipients=[user.details.email], ) elif notification_type == 'workflow_started': message = f'Processing execution #{execution.id} for job {execution.job.name}' notification = model.misc.Notification( type=model.misc.NotificationType.info, message=message, user_notifications=[ model.misc.UserNotification(user=execution.creator) ], targets=[ model.misc.NotificationTarget(target='JobExecution', target_id=execution.id) ] ) model.conn.add(notification) if config.config.email.enabled and user.details.email: email_notification( host=config.config.email.smtp_server, body=message, mime_type='plain', subject=f'Riberry / Started / {execution.job.name} / execution #{execution.id}', sender=config.config.email.sender, recipients=[user.details.email], ) else: logger.warn(f'Received unknown notification type {notification_type}') to_delete.append(event) return to_delete handlers = { 'stream': handle_streams, 'step': handle_steps, 'artifact': handle_artifacts, 'notify': handle_notifications, } def process(event_limit=None): with model.conn: event_mapping = defaultdict(list) query = model.misc.Event.query().order_by(model.misc.Event.time.asc(), model.misc.Event.id.asc()) if event_limit: query = query.limit(event_limit) events = query.all() if not events: return for event in events: event_mapping[event.name].append(event) to_delete = [] for handler_name, handler_func in handlers.items(): handler_events = event_mapping[handler_name] if handler_events: try: to_delete += handlers[handler_name](handler_events) except: logger.warn(f'Failed to process {handler_name} events: {handler_events}') raise for event in to_delete: logger.info(f'Removing processed event {event}') model.conn.delete(event) model.conn.commit() if __name__ == '__main__': process() PK!O}"riberry/celery/background/tasks.pyimport importlib from riberry import model from riberry.celery.background import capacity_config from riberry.celery.background.events import events from . import app @app.task(ignore_result=True) def process_events(event_limit=None): events.process(event_limit) @app.task(ignore_result=True) def job_schedules(): with model.conn: for schedule in model.job.JobSchedule.query().filter_by(enabled=True).all(): schedule.run() model.conn.commit() @app.task(ignore_result=True) def update_capacity_parameters(): with model.conn: for capacity_configuration in model.application.CapacityConfiguration.query().all(): producers = [ capacity_config.CapacityProducer(producer.internal_name, producer.capacity) for producer in capacity_configuration.producers ] capacity_config.update_instance_capacities( producers=producers, weight_parameter=capacity_configuration.weight_parameter, capacity_parameter=capacity_configuration.capacity_parameter, producer_parameter=capacity_configuration.producer_parameter, distribution_strategy=capacity_configuration.distribution_strategy, ) model.conn.commit() @app.task(ignore_result=True) def custom_task(func_path): module_path, func_name = func_path.split(':') module = importlib.import_module(module_path) func = getattr(module, func_name) func() PK![ǢE%%riberry/celery/util.pyfrom urllib.parse import urlparse import redis from celery import current_app def celery_redis_instance(): broker_uri = current_app.connection().as_uri(include_password=True) url = urlparse(broker_uri) return redis.Redis(host=url.hostname, port=url.port, password=url.password) PK!-::riberry/cli/__init__.pyfrom . import root, commands def main(): root.cli() PK!~>riberry/cli/__main__.pyfrom . import main main() PK!So! riberry/cli/commands/__init__.pyfrom . import conf, run, web PK!  riberry/cli/commands/conf.pyimport shutil import click import riberry from ..root import cli @cli.command('conf') @click.argument('path') def import_config(path): target = str(riberry.config.CONF_DEFAULT_PATH) shutil.copy(path, target) print(f'Copied {path!r} to {target!r}.') PK!!riberry/cli/commands/run.pyimport importlib import os import click import riberry from riberry.app.backends.impl.pool.base import RiberryPoolBackend from riberry.app.backends.impl.pool.log import configure as log_configure from ..root import cli @click.group() def run(): pass @run.command() @click.option('--module', '-m', required=True, help='Module containing Riberry pool application') @click.option('--instance', '-i', help='Riberry application instance to run') @click.option('--log-level', '-l', default='ERROR', help='Log level') @click.option('--concurrency', '-c', default=None, help='Task concurrency', type=int) def pool(module, instance, log_level, concurrency): if instance is not None: os.environ['RIBERRY_INSTANCE'] = instance log_configure(log_level=log_level) importlib.import_module(module) backend: RiberryPoolBackend = riberry.app.current_riberry_app.backend backend.task_queue.limit = concurrency backend.start() cli.add_command(run) PK!00riberry/cli/commands/web.pyimport importlib import click from ..root import cli @cli.command('web') @click.option('--module', '-m', show_default=True, default='riberry_web:main', help='Callable to start the web server.') @click.option('--host', '-h', show_default=True, default='127.0.0.1', help='Bind to given host.') @click.option('--port', '-p', show_default=True, default=5445, help='Bind to given port.') @click.option('--log-level', '-l', show_default=True, default='info', help='Logging level.') @click.pass_context def run_webapp(cxt, module: str, host: str, port: int, log_level: str): module_dot_path, callable_name = module.split(':') try: mod = importlib.import_module(module_dot_path) except ModuleNotFoundError: print(f'Could not find module {module_dot_path}.') return cxt.exit(1) try: callable_obj = getattr(mod, callable_name) except AttributeError: print(f'Could not find callable {callable_name} in module {module_dot_path}.') return cxt.exit(1) callable_obj(host=host, port=port, log_level=log_level) PK!Ăriberry/cli/root.pyimport click import riberry @click.group() @click.version_option(riberry.__version__, prog_name='riberry') def cli(): pass PK! 6##riberry/config.pyimport binascii import os import pathlib import warnings import toml from appdirs import AppDirs import riberry from riberry.util.common import variable_substitution APP_DIRS = AppDirs(appname='riberry') APP_DIR_USER_DATA = pathlib.Path(APP_DIRS.user_data_dir) APP_DIR_USER_CONF = pathlib.Path(APP_DIRS.user_config_dir) APP_DIR_USER_DATA.mkdir(parents=True, exist_ok=True) APP_DIR_USER_CONF.mkdir(parents=True, exist_ok=True) CONF_DEFAULT_BG_SCHED_INTERVAL = 10 CONF_DEFAULT_BG_EVENT_INTERVAL = 2 CONF_DEFAULT_BG_EVENT_PROCESS_LIMIT = 1000 CONF_DEFAULT_BG_CAPACITY_INTERVAL = 5 CONF_DEFAULT_DB_CONN_PATH = APP_DIR_USER_DATA / 'model.db' CONF_DEFAULT_DB_CONN_URL = f'sqlite:///{CONF_DEFAULT_DB_CONN_PATH}' CONF_DEFAULT_POLICY_PROVIDER = 'default' CONF_DEFAULT_AUTH_PROVIDER = 'default' CONF_DEFAULT_AUTH_TOKEN_PROVIDER = 'jwt' CONF_DEFAULT_AUTH_TOKEN_PATH = APP_DIR_USER_DATA / 'auth.key' CONF_DEFAULT_AUTH_TOKEN_SIZE = 256 CONF_DEFAULT_PATH = APP_DIR_USER_CONF / 'conf.toml' if 'RIBERRY_CONFIG_PATH' in os.environ: _config = variable_substitution(toml.load(os.environ['RIBERRY_CONFIG_PATH'])) elif CONF_DEFAULT_PATH.exists(): _config = variable_substitution(toml.load(str(CONF_DEFAULT_PATH))) else: warnings.warn(message=f'Environment variable \'RIBERRY_CONFIG_PATH\' not declared, ' f'config at default path {CONF_DEFAULT_PATH} not found, ' f'using in-memory configuration') _config = {} def load_config_value(raw_config, default=None): if 'path' in raw_config: with open(raw_config['path']) as f: return f.read() elif 'envvar' in raw_config: return os.getenv(raw_config['envvar']) elif 'value' in raw_config: return raw_config['value'] else: return default class DatabaseConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} connection_config = self.raw_config.get('connection') or {} self.connection_url = load_config_value(connection_config) if not self.connection_url: CONF_DEFAULT_DB_CONN_PATH.parent.mkdir(exist_ok=True) self.connection_url = CONF_DEFAULT_DB_CONN_URL self.engine_settings = self.raw_config.get('engine', {}) self.connection_arguments = self.raw_config.get('arguments', {}) def enable(self): riberry.model.init( url=self.connection_url, engine_settings=self.engine_settings, connection_arguments=self.connection_arguments, ) class AuthenticationTokenConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.provider = self.raw_config.get('provider') or CONF_DEFAULT_AUTH_TOKEN_PROVIDER self.secret = load_config_value(self.raw_config) if not self.secret: self.secret = self.make_secret() @staticmethod def make_secret(): CONF_DEFAULT_AUTH_TOKEN_PATH.parent.mkdir(exist_ok=True) if not CONF_DEFAULT_AUTH_TOKEN_PATH.is_file(): with open(CONF_DEFAULT_AUTH_TOKEN_PATH, 'wb') as f: f.write(binascii.hexlify(os.urandom(CONF_DEFAULT_AUTH_TOKEN_SIZE))) with open(CONF_DEFAULT_AUTH_TOKEN_PATH, 'rb') as f: return f.read().decode() class AuthenticationProviderConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.default = self.raw_config.get('default') or CONF_DEFAULT_AUTH_PROVIDER self.supported = self.raw_config.get('supported') or [self.default] class AuthenticationConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.providers = AuthenticationProviderConfig(self.raw_config.get('providers')) self.token = AuthenticationTokenConfig(self.raw_config.get('token')) self._config_cache = {} def __getitem__(self, item): if item not in self._config_cache: for provider in riberry.plugins.plugin_register['authentication']: if provider.name() == item: self._config_cache[item] = provider(self.raw_config.get(item, {})) break else: raise ValueError(f'Authentication provider {item!r} not found') return self._config_cache[item] @property def default_provider(self): return self[self.providers.default] def enable(self): for provider_name in self.providers.supported: self[provider_name].on_enabled() class PolicyProviderConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.provider_name = self.raw_config.get('provider') or CONF_DEFAULT_POLICY_PROVIDER self._provider = None @property def provider(self): if self._provider is None: for provider in riberry.plugins.plugin_register['policies']: if provider.name == self.provider_name: self._provider = provider break else: raise ValueError(f'PolicyProviderConfig.provider:: ' f'could not find register provider {self.provider_name!r}') return self._provider class EmailNotificationConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self._enabled = config_dict.get('enabled', False) self.smtp_server = config_dict.get('smtpServer') self.sender = config_dict.get('sender') @property def enabled(self): return bool(self._enabled and self.smtp_server and self.sender) class BackgroundTaskConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.events = BackgroundTaskEventsConfig(self.raw_config.get('events') or {}) self.schedules = BackgroundTaskScheduleConfig(self.raw_config.get('schedules') or {}) self.capacity = BackgroundTaskCapacityConfig(self.raw_config.get('capacity') or {}) class BackgroundTaskEventsConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.interval = config_dict.get('interval', CONF_DEFAULT_BG_EVENT_INTERVAL) self.processing_limit = config_dict.get('limit', CONF_DEFAULT_BG_EVENT_PROCESS_LIMIT) class BackgroundTaskScheduleConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.interval = config_dict.get('interval', CONF_DEFAULT_BG_SCHED_INTERVAL) class BackgroundTaskCapacityConfig: def __init__(self, config_dict): self.raw_config = config_dict or {} self.interval = config_dict.get('interval', CONF_DEFAULT_BG_CAPACITY_INTERVAL) class RiberryConfig: def __init__(self, config_dict): self.raw_config = config_dict self.authentication = AuthenticationConfig(self.raw_config.get('authentication') or {}) self.policies = PolicyProviderConfig(self.raw_config.get('policies') or {}) self.database = DatabaseConfig(self.raw_config.get('database') or {}) if 'notification' in self.raw_config and isinstance(self.raw_config['notification'], dict): email_config = self.raw_config['notification'].get('email') or {} else: email_config = {} self.email = EmailNotificationConfig(email_config) self.background = BackgroundTaskConfig(self.raw_config.get('background') or {}) @property def celery(self): return self.raw_config.get('celery') or {} def enable(self): self.authentication.enable() self.database.enable() config: RiberryConfig = RiberryConfig(config_dict=_config) PK!0dmriberry/exc.py class BaseError(Exception): __msg__ = None __http_code__ = 500 def __init__(self, target=None, data=None, **args): super(BaseError, self).__init__(self._fmt_message(**args, target=target)) self.target = target self.exc_data = data or {} @classmethod def _fmt_message(cls, **args): return cls.__msg__.format(**args) if cls.__msg__ else f'{cls.__name__} was raised' def output(self): return { 'code': type(self).__name__, 'message': str(self), 'target': self.target, 'data': self.exc_data } class AuthenticationError(BaseError): __msg__ = 'Wrong username or password supplied.' __http_code__ = 401 def __init__(self): super(AuthenticationError, self).__init__(target='user') class SessionExpired(BaseError): __msg__ = 'The user\'s session has expired.' __http_code__ = 401 def __init__(self): super(SessionExpired, self).__init__(target='user') class AuthorizationError(BaseError): __msg__ = 'User does not have access to the given resource.' __http_code__ = 403 def __init__(self): super(AuthorizationError, self).__init__(target='user') class ResourceNotFound(BaseError): __msg__ = 'The requested resource does not exist.' __http_code__ = 404 def __init__(self, resource, identifier): super(ResourceNotFound, self).__init__(target=resource, data={ 'id': identifier }) class UnknownError(BaseError): __msg__ = 'An unknown error has occurred.' __http_code__ = 500 def __init__(self, error=None): super(UnknownError, self).__init__() class InputErrorGroup(BaseError): __msg__ = 'One or more errors occurred while validating the request.' __http_code__ = 400 def __init__(self, *errors): super(InputErrorGroup, self).__init__( target=None, data={'errors': []} ) self.extend(errors=errors) def extend(self, errors): self.exc_data['errors'] += [e.output() if isinstance(e, BaseError) else e for e in errors] class RequiredInputError(BaseError): __msg__ = 'Required field {field!r} for {target} not provided.' __http_code__ = 400 def __init__(self, target, field, internal_name=None): super(RequiredInputError, self).__init__( target=target, field=field, data=dict(internal=internal_name) ) class InvalidInputError(BaseError): __msg__ = 'Invalid {field!r} provided for {target}.' __http_code__ = 400 def __init__(self, target, field, internal_name=None): super(InvalidInputError, self).__init__( target=target, field=field, data=dict(internal=internal_name) ) class InvalidEnumError(BaseError): __msg__ = 'Invalid {field!r} provided for {target}. Expected: {allowed_values}.' __http_code__ = 400 def __init__(self, target, field, allowed_values, internal_name=None): super(InvalidEnumError, self).__init__( target=target, field=field, allowed_values=', '.join(repr(value) for value in allowed_values), data=dict(internal=internal_name) ) class UnknownInputError(BaseError): __msg__ = 'Unknown input field {field!r} provided for {target}.' __http_code__ = 400 def __init__(self, target, field): super(UnknownInputError, self).__init__(target=target, field=field) class UniqueInputConstraintError(BaseError): __msg__ = 'Cannot create {target} with {field}: {value!r}. This {field} is already in-use.' __http_code__ = 400 def __init__(self, target, field, value): super(UniqueInputConstraintError, self).__init__(target=target, field=field, value=value) PK!Yy  riberry/log.pyimport logging # noinspection PyTypeChecker root_name: str = None # noinspection PyTypeChecker logger: logging.Logger = None def make(name: str) -> logging.Logger: return logging.getLogger(name=name) def init(): logger.addHandler(logging.NullHandler()) PK!Cv^^riberry/model/__init__.pyfrom typing import Union import sqlalchemy import sqlalchemy.orm import sqlalchemy.pool from . import misc, application, group, auth, interface, job, base ScopedSessionExt = Union[sqlalchemy.orm.Session, sqlalchemy.orm.scoping.ScopedSession] class __ModelProxy: raw_session: ScopedSessionExt = None raw_engine = None def __getattr__(self, item): return getattr(self.raw_session, item) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None: self.raw_session.rollback() self.raw_session.remove() # noinspection PyTypeChecker conn: ScopedSessionExt = __ModelProxy() def init(url='sqlite://', engine_settings=None, connection_arguments=None): engine_defaults = dict( pool_use_lifo=True, pool_pre_ping=True, pool_recycle=360, ) engine_settings = {**engine_defaults, **(engine_settings or {})} connection_arguments = connection_arguments or {} __ModelProxy.raw_engine = sqlalchemy.create_engine( url, poolclass=sqlalchemy.pool.QueuePool, **engine_settings, connect_args=connection_arguments, ) __ModelProxy.raw_session = sqlalchemy.orm.scoped_session(sqlalchemy.orm.sessionmaker(bind=__ModelProxy.raw_engine)) base.Base.metadata.create_all(__ModelProxy.raw_engine) PK!f,۵,,%riberry/model/application/__init__.pyimport enum from datetime import datetime from typing import List import pendulum from sqlalchemy import Column, String, Boolean, ForeignKey, DateTime, Integer, desc, Enum from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import relationship, validates from riberry import model from riberry.model import base class Application(base.Base): """Contains basic metadata related to an application.""" __tablename__ = 'application' __reprattrs__ = ['name', 'enabled'] # columns id = base.id_builder.build() document_id = Column(base.id_builder.type, ForeignKey(column='document.id')) name: str = Column(String(64), nullable=False, unique=True, comment='The human-readable name of the application.') internal_name: str = Column(String(256), nullable=False, unique=True, comment='The internal name or secondary identifier of the application.') description: str = Column(String(256), comment='A brief description of the application\'s purpose.') type: str = Column(String(64), nullable=False, comment='The type of application.') enabled: bool = Column(Boolean(name='application_enabled'), default=True, comment='Whether or not this application and its instances are enabled (TODO).') # associations instances: List['ApplicationInstance'] = relationship( 'ApplicationInstance', cascade='save-update, merge, delete, delete-orphan', back_populates='application') forms: List['model.interface.Form'] = relationship( 'Form', cascade='save-update, merge, delete, delete-orphan', back_populates='application') document: 'model.misc.Document' = relationship('Document', cascade='save-update, merge, delete, delete-orphan', single_parent=True) class ApplicationInstance(base.Base): """ An ApplicationInstance represents a running instance of an Application. Multiple ApplicationInstances are useful when we want to separate out different types of executions for the same Application. An example of this is when we have an application which can support both long-running and short-running executions. We can spin up a separate instance with separate scheduling to ensure that the long-running jobs don't consume block the short-running jobs. """ __tablename__ = 'app_instance' __reprattrs__ = ['name', 'internal_name'] # columns id = base.id_builder.build() application_id = Column(base.id_builder.type, ForeignKey(column='application.id'), nullable=False) name: str = Column(String(64), nullable=False, unique=True, comment='The human-readable name of the application.') internal_name: str = Column(String(256), nullable=False, unique=True, comment='The internal name or secondary identifier of the application instance.') # associations application: 'Application' = relationship('Application', back_populates='instances') heartbeat: 'Heartbeat' = relationship('Heartbeat', cascade='save-update, merge, delete, delete-orphan', uselist=False, back_populates='instance') schedules: List['ApplicationInstanceSchedule'] = relationship( 'ApplicationInstanceSchedule', cascade='save-update, delete, delete-orphan', back_populates='instance', order_by=lambda: ( ApplicationInstanceSchedule.parameter, desc(ApplicationInstanceSchedule.priority), ApplicationInstanceSchedule.start_time, ) ) forms: List['model.interface.Form'] = relationship('Form', cascade='save-update, merge, delete, delete-orphan', back_populates='instance') @property def status(self): if not self.heartbeat: return 'created' diff = base.utc_now() - pendulum.instance(self.heartbeat.updated) if diff.seconds >= 10: return 'offline' if self.active_schedule_value('active', default='Y') == 'N': return 'inactive' return 'online' def active_schedule_value(self, name, default=None, current_time=None): schedule = self.active_schedule(name=name, current_time=current_time) return schedule.value if schedule else default def active_schedule(self, name, current_time=None) -> 'ApplicationInstanceSchedule': schedules = sorted( (s for s in self.schedules if s.parameter == name and s.active(current_time=current_time)), key=lambda s: (-s.priority, s.start_time) ) for schedule in schedules: return schedule @property def active_schedule_values(self): return {name: schedule.value if schedule else None for name, schedule in self.active_schedules.items()} @property def active_schedules(self): parameters = {s.parameter for s in self.schedules} return {param: self.active_schedule(name=param) for param in parameters} class Heartbeat(base.Base): """ The Heartbeat object is used by running ApplicationInstances to report back that they're alive. If an ApplicationInstance's last heartbeat was over 10 seconds ago, we assume that it is offline. """ __tablename__ = 'heartbeat_app_instance' __reprattrs__ = ['instance_id', 'updated'] # columns id = base.id_builder.build() instance_id = Column(base.id_builder.type, ForeignKey('app_instance.id'), nullable=False) created: datetime = Column(DateTime(timezone=True), default=base.utc_now, comment='The first heartbeat we received for the instance.') updated: datetime = Column(DateTime(timezone=True), default=base.utc_now, comment='The last heartbeat we received for the instance.') # associations instance: 'ApplicationInstance' = relationship('ApplicationInstance', back_populates='heartbeat') class ApplicationInstanceSchedule(base.Base): """ The ApplicationInstanceSchedule is a time-based schedule for dynamic parameters. It allows us to define parameters which change depending on the time of the day. There are currently two core parameters: `active` and `concurrency`. These allow us to automatically activate/de-activate and scale up/down our applications during different times of the day. Any custom parameter can be defined and consumed by our Celery application as long as we have a custom parameter handler in place which can interpret the parameter's values. """ __tablename__ = 'sched_app_instance' # columns id = base.id_builder.build() instance_id = Column(base.id_builder.type, ForeignKey('app_instance.id'), nullable=False) days = Column(String(27), nullable=False, default='*', comment='Comma-separated list of specific days ("MON,WED"), or "*" for every day.') start_time = Column(String(8), nullable=False, default='00:00:00', comment='The time when this schedule activates.') end_time = Column(String(8), nullable=False, default='23:59:59', comment='The time when this schedule de-activates.') timezone = Column(String(128), nullable=False, default='UTC', comment='The timezone of for the given start and end times.') parameter = Column(String(32), nullable=False, comment='The parameter which this schedule applies to.') value = Column(String(512), comment='The value of the given parameter.') priority = Column(Integer, default=64, nullable=False, comment='Priority of the schedule, where higher values mean higher priority.') # associations instance: 'ApplicationInstance' = relationship('ApplicationInstance', back_populates='schedules') # validations @validates('priority') def validate_priority(self, _, priority): assert isinstance(priority, int) and 255 >= priority >= 1, ( f'ApplicationInstanceSchedule.priority must be an integer between 1 and 255 (received {priority})') return priority @validates('start_time') def validate_start_time(self, _, start_time): return self.cleanse_time(time_=start_time) @validates('end_time') def validate_end_time(self, _, end_time): return self.cleanse_time(time_=end_time) @staticmethod def cleanse_time(time_): if isinstance(time_, int): now = pendulum.now() date = pendulum.DateTime(now.year, now.month, now.day) + pendulum.duration(seconds=time_) time_ = date.strftime('%H:%M:%S') if isinstance(time_, str): pendulum.DateTime.strptime(time_, '%H:%M:%S') return time_ @validates('timezone') def validate_timezone(self, _, timezone): pendulum.timezone(timezone) return timezone @validates('days') def validate_days(self, _, days): if days in (None, '*'): return '*' days = [o.strip() for o in str(days).lower().split(',')] invalid_days = set(days) - {'mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'} if invalid_days: raise ValueError(f'Invalid days received: {", ".join(invalid_days)}') return ','.join(days) def active(self, current_time=None): now: pendulum.DateTime = pendulum.instance(current_time) \ if current_time else pendulum.DateTime.now(tz=self.timezone) now = now.replace(microsecond=0) if self.days != '*': all_days = self.days.lower().split(',') if now.format('dd').lower() not in all_days: return False start_dt = pendulum.parse(self.start_time, tz=self.timezone)\ .replace(year=now.year, month=now.month, day=now.day) end_dt = pendulum.parse(self.end_time, tz=self.timezone)\ .replace(year=now.year, month=now.month, day=now.day) return end_dt >= now >= start_dt class CapacityDistributionStrategy(enum.Enum): spread = 'spread' binpack = 'binpack' def __repr__(self): return repr(self.value) class CapacityConfiguration(base.Base): __tablename__ = 'capacity_config' # columns id = base.id_builder.build() weight_parameter = Column(String(32), nullable=False, unique=True, comment='Parameter name which defines the requested capacity weight.') capacity_parameter = Column(String(32), nullable=False, comment='Parameter name which defines the total capacity allocation.') producer_parameter = Column(String(32), nullable=False, comment='Parameter name which defines the capacity producers.') distribution_strategy = Column( Enum(CapacityDistributionStrategy), nullable=False, default=CapacityDistributionStrategy.binpack, comment='Binpack (fill vertically) or spread (fill horizontally)' ) # associations producers: List['CapacityProducer'] = relationship( 'CapacityProducer', cascade='save-update, merge, delete, delete-orphan', back_populates='configuration' ) class CapacityProducer(base.Base): __tablename__ = 'capacity_producer' # columns id = base.id_builder.build() configuration_id = Column(base.id_builder.type, ForeignKey('capacity_config.id'), nullable=False) name = Column(String(128), nullable=False, comment='The human-readable name of the producer.') internal_name = Column(String(128), nullable=False, comment='The internal name of the producer.') capacity = Column(Integer, nullable=False, comment='The total capacity of the consumer.') # associations configuration: 'CapacityConfiguration' = relationship('CapacityConfiguration', back_populates='producers') PK!& riberry/model/auth/__init__.pyimport datetime import re from typing import AnyStr, Dict, List import jwt import pendulum from sqlalchemy import Column, String, ForeignKey, DateTime, desc from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import relationship, validates, joinedload from riberry import model, exc from riberry.model import base from riberry.config import config class User(base.Base): __tablename__ = 'users' __reprattrs__ = ['username'] id = base.id_builder.build() username = Column(String(48), nullable=False, unique=True) password = Column(String(512)) auth_provider = Column(String(32), nullable=False, default=config.authentication.providers.default) details: 'UserDetails' = relationship('UserDetails', uselist=False, back_populates='user') # associations group_associations: List['model.group.ResourceGroupAssociation'] = model.group.ResourceGroupAssociation.make_relationship( resource_id=id, resource_type=model.misc.ResourceType.user ) jobs: List['model.job.Job'] = relationship( 'Job', order_by=lambda: desc(model.job.Job.created), back_populates='creator') executions: List['model.job.JobExecution'] = relationship( 'JobExecution', order_by=lambda: desc(model.job.JobExecution.updated), back_populates='creator') notifications: List['model.misc.UserNotification'] = relationship( 'UserNotification', order_by=lambda: desc(model.misc.UserNotification.created), back_populates='user') # proxies groups: List['model.group.Group'] = association_proxy('group_associations', 'group') @property def forms(self) -> List['model.interface.Form']: return model.interface.Form.query().filter( (model.group.ResourceGroupAssociation.group_id.in_(o.group_id for o in self.group_associations)) & (model.group.ResourceGroupAssociation.resource_type == model.misc.ResourceType.form) & (model.interface.Form.id == model.group.ResourceGroupAssociation.resource_id) ).all() @property def applications(self) -> List['model.application.Application']: forms = model.interface.Form.query().filter( model.interface.Form.id.in_(form.id for form in self.forms) ).options( joinedload(model.interface.Form.application) ).all() return [form.application for form in forms] @classmethod def authenticate(cls, username, password): existing_user: cls = cls.query().filter_by(username=username).first() if existing_user: provider = config.authentication[existing_user.auth_provider] else: provider = config.authentication.default_provider if not provider.authenticate(username=username, password=password): raise exc.AuthenticationError return cls.query().filter_by(username=username).first() @validates('username') def validate_username(self, _, username): if not username or len(username) < 3: raise ValueError(f'User.username :: usernames must be 3+ characters long. Received {repr(username)}') return username @classmethod def secure_password(cls, password: str, provider_name=None) -> bytes: provider = config.authentication[provider_name or config.authentication.providers.default] password = provider.secure_password(password=password) return password class UserDetails(base.Base): __tablename__ = 'user_details' id = base.id_builder.build() user_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=False) user: 'User' = relationship('User', back_populates='details') first_name = Column(String(64)) last_name = Column(String(64)) display_name = Column(String(128)) department = Column(String(128)) email = Column(String(128)) updated: datetime = Column(DateTime(timezone=True), default=base.utc_now) @property def full_name(self): if self.first_name and self.last_name: return f'{self.first_name} {self.last_name}' else: return self.first_name or self.last_name or self.display_name or self.user.username @validates('email') def validate_email(self, _, email): if email and not re.match(r'[^@]+@[^@]+\.[^@]+', email or ''): raise ValueError(f'UserDetails.email :: Invalid email received ({repr(email)})') return email class AuthToken: @staticmethod def create(user: User, expiry_delta: datetime.timedelta=datetime.timedelta(hours=24)) -> AnyStr: iat: pendulum.DateTime = base.utc_now() exp: pendulum.DateTime = iat + expiry_delta return jwt.encode({ 'iat': iat.int_timestamp, 'exp': exp.int_timestamp, 'subject': user.username }, config.authentication.token.secret, algorithm='HS256') @staticmethod def verify(token: AnyStr) -> Dict: try: return jwt.decode(token, config.authentication.token.secret, algorithms=['HS256']) except jwt.ExpiredSignatureError: raise exc.SessionExpired except Exception: raise exc.AuthenticationError PK!A%%riberry/model/base.pyfrom typing import Union import pendulum from sqlalchemy import Column, Sequence, Integer, MetaData from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta from riberry import model class _BaseMixin: @classmethod def query(cls): return model.conn.query(cls) def __repr__(self): attribute_names = (['id'] if hasattr(self, 'id') else []) + getattr(self, '__reprattrs__', []) attributes = ', '.join(f'{attr}={repr(getattr(self, attr))}' for attr in attribute_names) return f'<{type(self).__name__} {attributes}>' class _IdBuilder: def __init__(self, id_type): self.type = id_type def build(self, sequence='SEQUENCE_PK'): return Column(self.type, Sequence(sequence), primary_key=True) meta = MetaData( naming_convention={ 'ix': 'ix_%(column_0_label)s', 'uq': 'uq_%(table_name)s_%(column_0_N_name)s', 'ck': 'ck_%(table_name)s_%(constraint_name)s', 'fk': 'fk_%(table_name)s_%(column_0_N_name)s_%(referred_table_name)s', 'pk': 'pk_%(table_name)s' } ) DeclarativeMetaExt = Union[_BaseMixin, DeclarativeMeta] Base: DeclarativeMetaExt = declarative_base(cls=_BaseMixin, metadata=meta) id_builder = _IdBuilder(id_type=Integer) def utc_now(): return pendulum.DateTime.utcnow() PK!8w9 riberry/model/group/__init__.pyimport enum from typing import List from sqlalchemy import String, Column, Enum, ForeignKey, sql from sqlalchemy.orm import relationship from riberry import model from riberry.model import base class ResourceGroupAssociation(base.Base): __tablename__ = 'resource_group' __reprattrs__ = ['group_id', 'resource_id', 'resource_type'] # columns id = base.id_builder.build() group_id = Column(ForeignKey('groups.id'), nullable=False) resource_id = Column(base.id_builder.type, nullable=False) resource_type = Column(Enum(model.misc.ResourceType), nullable=False) # associations group: 'Group' = relationship('Group', back_populates='resource_associations') @classmethod def make_relationship(cls, resource_id, resource_type): return relationship( 'ResourceGroupAssociation', primaryjoin=lambda: sql.and_( resource_id == ResourceGroupAssociation.resource_id, ResourceGroupAssociation.resource_type == resource_type ), foreign_keys=lambda: ResourceGroupAssociation.resource_id, cascade='save-update, merge, delete, delete-orphan', ) class Group(base.Base): __tablename__ = 'groups' __reprattrs__ = ['name'] # columns id = base.id_builder.build() name: str = Column(String(128), nullable=False, unique=True) _display_name: str = Column('display_name', String(128)) description: str = Column(String(128)) # associations resource_associations: List['ResourceGroupAssociation'] = relationship( 'ResourceGroupAssociation', back_populates='group') user_associations: List['ResourceGroupAssociation'] = relationship( 'ResourceGroupAssociation', primaryjoin=lambda: sql.and_( ResourceGroupAssociation.group_id == Group.id, ResourceGroupAssociation.resource_type == model.misc.ResourceType.user ) ) form_associations: List['ResourceGroupAssociation'] = relationship( 'ResourceGroupAssociation', primaryjoin=lambda: sql.and_( ResourceGroupAssociation.group_id == Group.id, ResourceGroupAssociation.resource_type == model.misc.ResourceType.form ) ) @property def display_name(self): return self._display_name or self.name @property def users(self): return model.auth.User.query().filter( (ResourceGroupAssociation.group_id == self.id) & (ResourceGroupAssociation.resource_type == model.misc.ResourceType.user) & (model.auth.User.id == ResourceGroupAssociation.resource_id) ).all() @property def forms(self): return model.interface.Form.query().filter( (ResourceGroupAssociation.group_id == self.id) & (ResourceGroupAssociation.resource_type == model.misc.ResourceType.form) & (model.interface.Form.id == ResourceGroupAssociation.resource_id) ).all() PK!*y#riberry/model/interface/__init__.pyimport json import mimetypes from typing import List from sqlalchemy import Column, String, Boolean, ForeignKey, Integer, Binary, DateTime, desc from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import relationship, deferred from riberry import model from riberry.model import base class Form(base.Base): """ A Form is an interface to creating jobs for a given ApplicationInstance. """ __tablename__ = 'form' __reprattrs__ = ['internal_name', 'version'] # columns id = base.id_builder.build() instance_id = Column(base.id_builder.type, ForeignKey('app_instance.id'), nullable=False) application_id = Column(base.id_builder.type, ForeignKey('application.id'), nullable=False) document_id = Column(base.id_builder.type, ForeignKey(column='document.id')) name: str = Column(String(64), unique=True, nullable=False, comment='The human-readable name of the form.') internal_name: str = Column(String(256), unique=True, nullable=False, comment='The internal name or secondary identifier of the form.') description: str = Column(String(256), comment='A brief description of the form\'s purpose.') version: int = Column(Integer, nullable=False, default=1, comment='The version of the form.') enabled: bool = Column(Boolean(name='form_enabled'), nullable=False, default=True, comment='Whether or not this form is enabled.') # associations instance: 'model.application.ApplicationInstance' = relationship('ApplicationInstance', back_populates='forms') application: 'model.application.Application' = relationship('Application', back_populates='forms') schedules: List['FormSchedule'] = relationship('FormSchedule', back_populates='form') jobs: List['model.job.Job'] = relationship( 'Job', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: desc(model.job.Job.created), back_populates='form' ) group_associations: List['model.group.ResourceGroupAssociation'] = model.group.ResourceGroupAssociation.make_relationship( resource_id=id, resource_type=model.misc.ResourceType.form ) input_value_definitions: List['InputValueDefinition'] = relationship( 'InputValueDefinition', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: InputValueDefinition.id.asc(), back_populates='form' ) input_file_definitions: List['InputFileDefinition'] = relationship( 'InputFileDefinition', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: InputFileDefinition.id.asc(), back_populates='form' ) document: 'model.misc.Document' = relationship('Document', cascade='save-update, merge, delete, delete-orphan', single_parent=True) # proxies groups: List['model.group.Group'] = association_proxy('group_associations', 'group') class FormSchedule(base.Base): __tablename__ = 'sched_form' # columns id = base.id_builder.build() form_id = Column(base.id_builder.type, ForeignKey('form.id'), nullable=False) start = Column(DateTime(timezone=True), nullable=False) end = Column(DateTime(timezone=True), nullable=False) # associations form: 'Form' = relationship('Form', back_populates='schedules') class InputFileDefinition(base.Base): """The InputFileDefinition object defines the properties of an input file.""" __tablename__ = 'input_file_definition' __reprattrs__ = ['name', 'type'] # columns id = base.id_builder.build() form_id = Column(base.id_builder.type, ForeignKey('form.id'), nullable=False) name: str = Column(String(64), nullable=False) internal_name: str = Column(String(256), nullable=False) description: str = Column(String(128)) type: str = Column(String(64), nullable=False) accept: str = Column(String(256)) required: bool = Column(Boolean(name='form_required'), nullable=False, default=True) # associations form: 'Form' = relationship('Form', back_populates='input_file_definitions') class InputValueDefinition(base.Base): """The InputFileDefinition object defines the properties of an input value.""" __tablename__ = 'input_value_definition' __reprattrs__ = ['name', 'type'] # columns id = base.id_builder.build() form_id = Column(base.id_builder.type, ForeignKey('form.id'), nullable=False) name: str = Column(String(64), nullable=False) internal_name: str = Column(String(256), nullable=False) description: str = Column(String(128)) type: str = Column(String(64), nullable=False) required: bool = Column(Boolean(name='input_value_definition_required'), nullable=False, default=True) default_binary = Column('defaults', Binary) # associations form: 'Form' = relationship('Form', back_populates='input_value_definitions') allowed_value_enumerations: List['InputValueEnum'] = relationship( 'InputValueEnum', cascade='save-update, merge, delete, delete-orphan', back_populates='definition') # proxies allowed_binaries: List[bytes] = association_proxy( 'allowed_value_enumerations', 'value', creator=lambda value: InputValueEnum(value=value) ) @property def allowed_values(self): return [json.loads(v.decode()) for v in self.allowed_binaries] @hybrid_property def default_value(self): return json.loads(self.default_binary.decode()) if self.default_binary else None @default_value.setter def default_value(self, value): self.default_binary = json.dumps(value).encode() class InputValueEnum(base.Base): """The InputValueEnum object defines a valid enumeration for a given InputValueInstance.""" __tablename__ = 'input_value_enum' __reprattrs__ = ['value'] # columns id = base.id_builder.build() definition_id = Column(base.id_builder.type, ForeignKey('input_value_definition.id'), nullable=False) value = Column(Binary, nullable=False) # associations definition: 'InputValueDefinition' = relationship( 'InputValueDefinition', back_populates='allowed_value_enumerations') class InputValueInstance(base.Base): """The InputValueInstance object contains data for a InputValueDefinition and is linked to a Job.""" __tablename__ = 'input_value_instance' # columns id = base.id_builder.build() job_id = Column(base.id_builder.type, ForeignKey('job.id'), nullable=False) name: str = Column(String(256), nullable=False) internal_name: str = Column(String(256), nullable=False) raw_value: bytes = Column('value', Binary) # associations job: 'model.job.Job' = relationship('Job', back_populates='values') @property def definition(self): return self.job.form @hybrid_property def value(self): return json.loads(self.raw_value.decode()) if self.raw_value else None @value.setter def value(self, value): self.raw_value = json.dumps(value).encode() class InputFileInstance(base.Base): """The InputFileInstance object contains data for a InputFileDefinition and is linked to a Job.""" __tablename__ = 'input_file_instance' __reprattrs__ = ['filename', 'size'] # columns id = base.id_builder.build() job_id = Column(base.id_builder.type, ForeignKey('job.id'), nullable=False) name: str = Column(String(256), nullable=False) internal_name: str = Column(String(256), nullable=False) filename: str = Column(String(512), nullable=False) size: int = Column(Integer, nullable=False) binary: bytes = deferred(Column(Binary)) # associations job: 'model.job.Job' = relationship('Job', back_populates='files') @property def content_type(self): if self.filename.endswith('.log'): # quick fix for missing .log type on unix systems return 'text/plain' return mimetypes.guess_type(self.filename)[0] @property def content_encoding(self): return mimetypes.guess_type(self.filename)[1] PK!~LLriberry/model/job/__init__.pyimport enum import functools import json import mimetypes from datetime import datetime from typing import List, Optional import pendulum from croniter import croniter from sqlalchemy import Column, String, ForeignKey, DateTime, Boolean, Integer, Binary, Index, Enum, desc, Float, asc, \ UniqueConstraint, select, func, sql from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import relationship, deferred, validates, foreign, remote from riberry import model from riberry.model import base class ArtifactType(enum.Enum): output = 'output' error = 'error' report = 'report' def __repr__(self): return repr(self.value) class Job(base.Base): """ A Job is an object which represents a set of inputs provided to a form. A Job can have one or more JobExecutions which represent the execution of the linked ApplicationInterface on the linked ApplicationInstance for the given input stored against this Job. Jobs are immutable. If we require a different set of values, we'll need to create a new Job. """ __tablename__ = 'job' __reprattrs__ = ['name'] __table_args__ = ( Index('j__idx_form_id', 'form_id'), Index('j__idx_creator_id', 'creator_id'), ) # columns id = base.id_builder.build() form_id = Column(base.id_builder.type, ForeignKey('form.id'), nullable=False) creator_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=False) name: str = Column(String(64), nullable=False, unique=True, comment='The unique name of our job.') created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) # associations creator: 'model.auth.User' = relationship('User') form: 'model.interface.Form' = relationship('Form', back_populates='jobs') executions: List['JobExecution'] = relationship( 'JobExecution', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: desc(JobExecution.updated), back_populates='job') schedules: List['JobSchedule'] = relationship('JobSchedule', cascade='save-update, merge, delete, delete-orphan', back_populates='job') values: List['model.interface.InputValueInstance'] = relationship('InputValueInstance', cascade='save-update, merge, delete, delete-orphan', back_populates='job') files: List['model.interface.InputFileInstance'] = relationship('InputFileInstance', cascade='save-update, merge, delete, delete-orphan', back_populates='job') # proxies instance: 'model.application.ApplicationInstance' = association_proxy('form', 'instance') def execute(self, creator_id): model.conn.add(instance=JobExecution(job=self, creator_id=creator_id)) class JobSchedule(base.Base): """ A JobSchedule object defines a schedule which will trigger an execution for the linked Job. Note that a JobExecution will only be created for an ApplicationInstance which has a status of "online". Applications which are offline or inactive due to an ApplicationInstanceSchedule will not have JobExecutions created. """ __tablename__ = 'sched_job' # columns id = base.id_builder.build() job_id = Column(base.id_builder.type, ForeignKey('job.id'), nullable=False) creator_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=False) enabled: bool = Column(Boolean(name='sched_job_enabled'), default=True, nullable=False, comment='Whether or not this schedule is active.') cron: str = Column(String(24), nullable=False, comment='The cron expression which defines our schedule.') created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False, comment='The time our schedule was created.') last_run: datetime = Column(DateTime(timezone=True), default=None, comment='The last time a job execution was created from our schedule.') limit: int = Column(Integer, default=0, comment='The amount of valid runs for this schedule.') total_runs: int = Column(Integer, default=0, comment='The total amount of runs for this schedule.') # associations job: 'Job' = relationship('Job', back_populates='schedules') creator: 'model.auth.User' = relationship('User') def run(self): if not self.enabled or self.job.instance.status != 'online': return ready_run = None for cron_time in croniter(self.cron, start_time=self.last_run or self.created, ret_type=pendulum.DateTime): cron_time = pendulum.instance(cron_time) if cron_time > base.utc_now(): break ready_run = cron_time if ready_run: self.last_run = ready_run self.total_runs += 1 if self.limit and self.total_runs >= self.limit: self.enabled = False self.job.execute(creator_id=self.creator_id) @property def next_run(self): if not self.enabled: return instance = croniter(self.cron, start_time=self.last_run or self.created, ret_type=pendulum.DateTime) return pendulum.instance(next(instance)) def memoize(func): result = [] @functools.wraps(func) def inner(*args, **kwargs): if result: return result[0] result.append(func(*args, **kwargs)) return result[0] return inner @memoize def _job_execution_select_latest_progress(): return select([ func.max(JobExecutionProgress.id).label('id'), JobExecutionProgress.job_execution_id ]).group_by( JobExecutionProgress.job_execution_id ).alias() class JobExecution(base.Base): """A JobExecution represent a single execution of our Job.""" __tablename__ = 'job_execution' __reprattrs__ = ['job_id', 'task_id', 'status'] __table_args__ = ( Index('j_e__idx_job_id', 'job_id'), Index('j_e__idx_creator_id', 'creator_id'), ) # columns id = base.id_builder.build() job_id = Column(base.id_builder.type, ForeignKey('job.id'), nullable=False) creator_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=False) task_id: str = Column(String(36), unique=True, comment='The internal identifier of our job execution. This is usually the Celery root ID.') status: str = Column(String(24), default='RECEIVED', comment='The current status of our job execution.') created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) started: datetime = Column(DateTime(timezone=True)) completed: datetime = Column(DateTime(timezone=True)) updated: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) priority = Column(Integer, default=64, nullable=False, comment='The priority of this execution. This only applies to tasks in the RECEIVED state.') parent_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), comment='The id of the execution which triggered this execution.') # associations creator: 'model.auth.User' = relationship('User') job: 'Job' = relationship('Job', back_populates='executions') streams: List['JobExecutionStream'] = relationship( 'JobExecutionStream', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionStream.id), back_populates='job_execution' ) artifacts: List['JobExecutionArtifact'] = relationship( 'JobExecutionArtifact', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionArtifact.id), back_populates='job_execution') external_tasks: List['JobExecutionExternalTask'] = relationship( 'JobExecutionExternalTask', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionExternalTask.id), back_populates='job_execution' ) reports: List['JobExecutionReport'] = relationship( 'JobExecutionReport', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionReport.id), back_populates='job_execution' ) progress: List['JobExecutionProgress'] = relationship( 'JobExecutionProgress', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: JobExecutionProgress.id.asc(), back_populates='job_execution' ) data: List['model.misc.ResourceData'] = model.misc.ResourceData.make_relationship( resource_id=id, resource_type=model.misc.ResourceType.job_execution, ) latest_progress: 'JobExecutionProgress' = relationship( 'JobExecutionProgress', secondary=lambda: _job_execution_select_latest_progress(), primaryjoin=lambda: JobExecution.id == _job_execution_select_latest_progress().c.job_execution_id, secondaryjoin=lambda: JobExecutionProgress.id == _job_execution_select_latest_progress().c.id, viewonly=True, uselist=False, ) parent_execution: 'JobExecution' = relationship('JobExecution', back_populates='child_executions', remote_side=[id]) child_executions: List['JobExecution'] = relationship('JobExecution', back_populates='parent_execution') # validations @validates('priority') def validate_priority(self, _, priority): assert isinstance(priority, int) and 255 >= priority >= 1, ( f'ApplicationInstanceSchedule.priority must be an integer between 1 and 255 (received {priority})') return priority @property def stream_status_summary(self): summary = model.conn.query( model.job.JobExecutionStream.status, func.count(model.job.JobExecutionStream.status) ).filter_by( job_execution=self, ).group_by( model.job.JobExecutionStream.status, ).all() return dict(summary) class JobExecutionProgress(base.Base): __tablename__ = 'job_progress' __reprattrs__ = ['message'] # columns id = base.id_builder.build() job_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), nullable=False) created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) message: str = Column(String(256), default=None, comment='Message describing the progress of the job execution.') progress_percentage = Column(Integer, default=None, nullable=True, comment='The progress of the job execution.') # associations job_execution: 'JobExecution' = relationship('JobExecution', back_populates='progress') # validations @validates('progress_percentage') def validate_priority(self, _, progress_percentage): if progress_percentage is not None: progress_percentage = min(max(progress_percentage, 0), 100) return progress_percentage class JobExecutionStream(base.Base): __tablename__ = 'job_stream' __reprattrs__ = ['name', 'task_id', 'status'] __table_args__ = ( Index('j_s__idx_job_execution_id', 'job_execution_id'), ) # columns id = base.id_builder.build() job_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), nullable=False) task_id: str = Column(String(36), unique=True) parent_stream_id = Column(base.id_builder.type, ForeignKey('job_stream.id')) name: str = Column(String(64), nullable=False) category: str = Column(String(64), nullable=False, default='Overall') status: str = Column(String(24), default='QUEUED') created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) started: datetime = Column(DateTime(timezone=True)) completed: datetime = Column(DateTime(timezone=True)) updated: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) # associations job_execution: 'JobExecution' = relationship('JobExecution', back_populates='streams') steps: List['JobExecutionStreamStep'] = relationship( 'JobExecutionStreamStep', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionStreamStep.id), back_populates='stream', ) artifacts: List['JobExecutionArtifact'] = relationship('JobExecutionArtifact', back_populates='stream') external_tasks: List['JobExecutionExternalTask'] = relationship( 'JobExecutionExternalTask', cascade='save-update, merge, delete, delete-orphan', order_by=lambda: asc(JobExecutionExternalTask.id), back_populates='stream', ) parent_stream: 'JobExecutionStream' = relationship('JobExecutionStream', back_populates='child_streams', remote_side=[id]) child_streams: List['JobExecutionStream'] = relationship('JobExecutionStream', back_populates='parent_stream') class JobExecutionStreamStep(base.Base): __tablename__ = 'job_stream_step' __reprattrs__ = ['name', 'task_id', 'status'] __table_args__ = ( Index('j_s_s__idx_stream_id', 'stream_id'), Index('j_s_s__idx_task_id', 'task_id'), ) # columns id = base.id_builder.build() stream_id = Column(base.id_builder.type, ForeignKey('job_stream.id'), nullable=False) task_id: str = Column(String(36)) name: str = Column(String(64)) status: str = Column(String(24), default='RECEIVED') created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) started: datetime = Column(DateTime(timezone=True)) completed: datetime = Column(DateTime(timezone=True)) updated: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) # associations stream: 'JobExecutionStream' = relationship('JobExecutionStream', back_populates='steps') class JobExecutionArtifact(base.Base): __tablename__ = 'job_artifact' __reprattrs__ = ['name', 'filename'] __table_args__ = ( Index('j_a__idx_job_execution_id', 'job_execution_id'), Index('j_a__idx_stream_id', 'stream_id'), ) # columns id = base.id_builder.build() job_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), nullable=False) stream_id = Column(base.id_builder.type, ForeignKey('job_stream.id')) name: str = Column(String(128), nullable=False) type: str = Column(Enum(ArtifactType), nullable=False) category: str = Column(String(128), nullable=False, default='Default') filename: str = Column(String(512), nullable=False) created: datetime = Column(DateTime(timezone=True), default=base.utc_now, nullable=False) size: int = Column(Integer, nullable=False) # associations job_execution: 'JobExecution' = relationship('JobExecution', back_populates='artifacts') stream: 'JobExecutionStream' = relationship('JobExecutionStream', back_populates='artifacts') binary: 'JobExecutionArtifactBinary' = relationship( 'JobExecutionArtifactBinary', cascade='save-update, merge, delete, delete-orphan', back_populates='artifact', uselist=False) data: List['JobExecutionArtifactData'] = relationship( 'JobExecutionArtifactData', cascade='save-update, merge, delete, delete-orphan', back_populates='artifact') @property def content_type(self): if self.filename.endswith('.log'): # quick fix for missing .log type on unix systems return 'text/plain' return mimetypes.guess_type(self.filename)[0] @property def content_encoding(self): return mimetypes.guess_type(self.filename)[1] class JobExecutionArtifactData(base.Base): __tablename__ = 'job_artifact_data' # columns id = base.id_builder.build() title: str = Column(String(64), nullable=False) description: str = Column(String(512), nullable=False) artifact_id = Column(base.id_builder.type, ForeignKey('job_artifact.id'), nullable=False) # associations artifact: 'JobExecutionArtifact' = relationship('JobExecutionArtifact', back_populates='data') class JobExecutionArtifactBinary(base.Base): __tablename__ = 'job_artifact_binary' # columns id = base.id_builder.build() binary: bytes = deferred(Column(Binary, nullable=True)) artifact_id = Column(base.id_builder.type, ForeignKey('job_artifact.id'), nullable=False) # associations artifact: 'JobExecutionArtifact' = relationship('JobExecutionArtifact', back_populates='binary') class JobExecutionExternalTask(base.Base): __tablename__ = 'job_external_task' __reprattrs__ = ['name', 'type'] __table_args__ = ( Index('j_e__idx_job_execution_id', 'job_execution_id'), Index('j_e__idx_stream_id', 'stream_id'), ) # columns id = base.id_builder.build() job_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), nullable=False) stream_id = Column(base.id_builder.type, ForeignKey('job_stream.id')) user_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=True) group_id = Column(base.id_builder.type, ForeignKey('groups.id'), nullable=True) task_id: str = Column(String(64), unique=True) name: str = Column(String(128), nullable=False) type: str = Column(String(24), nullable=False) status: str = Column(String(24), default='WAITING', comment='The current status of the manual task.') input_data: Optional[bytes] = deferred(Column(Binary, nullable=True)) output_data: Optional[bytes] = deferred(Column(Binary, nullable=True)) # associations job_execution: 'JobExecution' = relationship('JobExecution', back_populates='external_tasks') stream: 'JobExecutionStream' = relationship('JobExecutionStream', back_populates='external_tasks') class JobExecutionReport(base.Base): __tablename__ = 'job_report' __reprattrs__ = ['internal_name'] # columns id = base.id_builder.build() job_execution_id = Column(base.id_builder.type, ForeignKey('job_execution.id'), nullable=False) name: str = Column(String(128), nullable=False) renderer: str = Column(String(24), nullable=False, default='unspecified') title: str = Column(String(128), nullable=True) category: str = Column(String(128), nullable=True) key: str = Column(String(128), nullable=True) raw_input_data: Optional[bytes] = deferred(Column('input_data', Binary, nullable=True)) report: Optional[bytes] = deferred(Column(Binary, nullable=True)) marked_for_refresh: bool = Column(Boolean(name='job_report_marked_for_refresh'), nullable=False, default=False) # associations job_execution: 'JobExecution' = relationship('JobExecution', back_populates='reports') @hybrid_property def input_data(self): return json.loads(self.raw_input_data.decode()) if self.raw_input_data else None @input_data.setter def input_data(self, value): self.raw_input_data = json.dumps(value).encode() PK!$<  riberry/model/misc/__init__.pyimport enum import json from datetime import datetime from typing import List, Optional from sqlalchemy import Binary, String, Column, Float, ForeignKey, Boolean, DateTime, Index, Enum, UniqueConstraint, sql from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm import relationship from riberry import model from riberry.model import base class ResourceType(enum.Enum): form = 'Form' user = 'User' job = 'Job' job_execution = 'JobExecution' user_interface = 'UserInterface' misc = 'Misc' def __repr__(self): return repr(self.value) class Document(base.Base): __tablename__ = 'document' __reprattrs__ = ['type'] id = base.id_builder.build() type: str = Column(String(24), nullable=False, default='markdown') content: bytes = Column(Binary, nullable=False) class Event(base.Base): __tablename__ = 'event' __reprattrs__ = ['name', 'root_id'] # columns id = base.id_builder.build() name: str = Column(String(64), nullable=False) time: float = Column(Float, nullable=False) root_id: str = Column(String(36), nullable=False) task_id: str = Column(String(36), nullable=False) data: str = Column(String(1024)) binary: bytes = Column(Binary) class NotificationType(enum.Enum): info = 'info' warning = 'warning' success = 'success' error = 'error' alert = 'alert' class UserNotification(base.Base): __tablename__ = 'notification_user' __table_args__ = ( Index('u_n__idx_job_id', 'user_id', 'read'), ) # columns id = base.id_builder.build() user_id = Column(base.id_builder.type, ForeignKey('users.id'), nullable=False) notification_id = Column(base.id_builder.type, ForeignKey('notification.id'), nullable=False) created: datetime = Column(DateTime(timezone=True), default=base.utc_now) read = Column(Boolean(name='notification_user_read'), nullable=False, default=False) # associations user: 'model.auth.User' = relationship('User', back_populates='notifications') notification: 'Notification' = relationship('Notification', back_populates='user_notifications') class Notification(base.Base): __tablename__ = 'notification' # columns id = base.id_builder.build() type = Column(Enum(NotificationType), nullable=False, default=NotificationType.info) message = Column(String(128), nullable=False) # associations user_notifications: List['UserNotification'] = relationship('UserNotification', back_populates='notification') targets: List['NotificationTarget'] = relationship('NotificationTarget', back_populates='notification') class NotificationTarget(base.Base): __tablename__ = 'notification_target' # columns id = base.id_builder.build() notification_id = Column(base.id_builder.type, ForeignKey('notification.id'), nullable=False) target = Column(String(128), nullable=False) target_id = Column(String(128), nullable=False) action = Column(String(32)) # associations notification: 'Notification' = relationship('Notification', back_populates='targets') class MenuItem(base.Base): __tablename__ = 'menu_item' # columns id = base.id_builder.build() parent_id = Column(base.id_builder.type, ForeignKey('menu_item.id')) menu_type = Column(String(128), nullable=False) type = Column(String(128), nullable=False) key = Column(String(128), nullable=False) label: Optional[str] = Column(String(128), nullable=True) # associations parent: 'MenuItem' = relationship('MenuItem', back_populates='children', remote_side=[id]) children: List['MenuItem'] = relationship('MenuItem', back_populates='parent') class ResourceData(base.Base): __tablename__ = 'resource_data' __reprattrs__ = ['name'] __table_args__ = ( UniqueConstraint('resource_id', 'resource_type', 'name'), ) # columns id = base.id_builder.build() resource_id = Column(base.id_builder.type, nullable=True) resource_type = Column(Enum(ResourceType), nullable=False) name: str = Column(String(256)) raw_value: bytes = Column('value', Binary, nullable=True) lock: str = Column(String(72), nullable=True) expiry: datetime = Column(DateTime(timezone=True), nullable=True) marked_for_refresh: bool = Column(Boolean(name='resource_data_marked_for_refresh'), nullable=False, default=False) @hybrid_property def value(self): return json.loads(self.raw_value.decode()) if self.raw_value else None @value.setter def value(self, value): self.raw_value = json.dumps(value).encode() @classmethod def make_relationship(cls, resource_id, resource_type): return relationship( 'ResourceData', primaryjoin=lambda: sql.and_( resource_id == ResourceData.resource_id, ResourceData.resource_type == resource_type ), order_by=lambda: ResourceData.id.asc(), foreign_keys=lambda: ResourceData.resource_id, cascade='save-update, merge, delete, delete-orphan', ) PK!Ariberry/plugins/__init__.pyfrom . import defaults import importlib import pkgutil from collections import defaultdict plugin_register = defaultdict(set) plugin_register['authentication'].add(defaults.authentication.DefaultAuthenticationProvider) plugin_register['policies'].add(defaults.policies.default_policies) ext_plugins = { name: importlib.import_module(name) for finder, name, ispkg in pkgutil.iter_modules() if name.startswith('riberry_') } PK!d''$riberry/plugins/defaults/__init__.pyfrom . import authentication, policies PK!h^<<*riberry/plugins/defaults/authentication.pyimport hashlib import os from riberry.model import auth from riberry.plugins.interfaces import AuthenticationProvider def make_hash(text) -> bytes: hash_ = hashlib.sha512() hash_.update(text) return hash_.hexdigest().encode() def salted_hash(plain: bytes, salt: bytes): plain_hash: bytes = make_hash(plain) return make_hash(plain_hash + salt) def check_password(input_password: bytes, hashed_password: bytes): size = len(hashed_password) // 2 password_hash, salt = hashed_password[:size], hashed_password[size:] input_hash = salted_hash(input_password, salt) + salt return hashed_password == input_hash def hash_password(password: bytes) -> bytes: salt = make_hash(os.urandom(1024)) return salted_hash(password, salt) + salt class DefaultAuthenticationProvider(AuthenticationProvider): @classmethod def name(cls) -> str: return 'default' def secure_password(self, password: str) -> bytes: return hash_password(password=(password or '').encode()) def authenticate(self, username: str, password: str) -> bool: user = auth.User.query().filter_by(username=username).first() if user: return check_password(input_password=(password or '').encode(), hashed_password=user.password.encode()) else: return False PK!KB^$riberry/plugins/defaults/policies.pyfrom riberry import policy, model from riberry.policy import AttributeContext class RootPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return context.subject is not None class UserPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return True class ApplicationPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.application.Application) def condition(self, context: AttributeContext) -> bool: return True class ApplicationViewPolicy(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewApplicationUsingFormRelationshipRule(policy.Rule): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject application: model.application.Application = context.resource for instance in application.instances: for form in instance.forms: if set(user.groups) & set(form.groups): return True return False class GenericCreatePolicy(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'create' def condition(self, context: AttributeContext) -> bool: return True class RejectCreateRule(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return False class ApplicationInstancePolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.application.ApplicationInstance) def condition(self, context: AttributeContext) -> bool: return True class ApplicationInstanceViewPolicy(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewApplicationInstanceUsingFormRelationshipRule(policy.Rule): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject instance: model.application.ApplicationInstance = context.resource for form in instance.forms: if set(user.groups) & set(form.groups): return True return False class ApplicationFormPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.interface.Form) def condition(self, context: AttributeContext) -> bool: return True class ApplicationFormViewPolicy(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewFormRule(policy.Rule): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject form: model.interface.Form = context.resource if set(user.groups) & set(form.groups): return True return False class JobPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.job.Job) def condition(self, context: AttributeContext) -> bool: return True class JobViewPolicy(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewJobUsingFormRelationshipRule(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject job: model.job.Job = context.resource if set(user.groups) & set(job.form.groups): return True return False class JobExecutionPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.job.JobExecution) def condition(self, context: AttributeContext) -> bool: return True class JobExecutionViewPolicy(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewJobExecutionUsingFormRelationshipRule(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject execution: model.job.JobExecution = context.resource if set(user.groups) & set(execution.job.form.groups): return True return False class JobExecutionArtifactPolicySet(policy.PolicySet): def target_clause(self, context: AttributeContext) -> bool: return isinstance(context.resource, model.job.JobExecutionArtifact) def condition(self, context: AttributeContext) -> bool: return True class JobExecutionArtifactViewPolicy(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return context.action == 'view' def condition(self, context: AttributeContext) -> bool: return True class ViewJobExecutionArtifactUsingFormRelationshipRule(policy.Policy): def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: user: model.auth.User = context.subject execution: model.job.JobExecutionArtifact = context.resource if set(user.groups) & set(execution.job_execution.job.form.groups): return True return False default_policies = policy.AuthorizationEngine( 'default', RootPolicySet( UserPolicySet( ApplicationPolicySet( ApplicationViewPolicy( ViewApplicationUsingFormRelationshipRule() ), GenericCreatePolicy( RejectCreateRule() ) ), ApplicationInstancePolicySet( ApplicationInstanceViewPolicy( ViewApplicationInstanceUsingFormRelationshipRule() ), GenericCreatePolicy( RejectCreateRule() ) ), ApplicationFormPolicySet( ApplicationFormViewPolicy( ViewFormRule() ), GenericCreatePolicy( RejectCreateRule() ) ), JobPolicySet( JobViewPolicy( ViewJobUsingFormRelationshipRule() ) ), JobExecutionPolicySet( JobExecutionViewPolicy( ViewJobExecutionUsingFormRelationshipRule() ) ), JobExecutionArtifactPolicySet( JobExecutionArtifactViewPolicy( ViewJobExecutionArtifactUsingFormRelationshipRule() ) ) ) ) )PK! riberry/plugins/interfaces.pyimport abc class AuthenticationProvider(metaclass=abc.ABCMeta): def __init__(self, config_dict): self.raw_config = config_dict @classmethod def name(cls) -> str: raise NotImplementedError def authenticate(self, username: str, password: str) -> bool: raise NotImplementedError def secure_password(self, password: bytes) -> bytes: raise NotImplementedError def on_enabled(self): pass PK!j;ͬriberry/policy/__init__.pyfrom .engine import PolicyContext, AuthorizationEngine, PolicySet, Policy, Rule, AttributeContext from .helpers import policy_set, policy, rule context = PolicyContext() PK! Zriberry/policy/engine.pyimport abc import functools import inspect from contextlib import contextmanager from typing import Set, Union, Optional, Type, Callable from riberry.exc import AuthorizationError from .store import ThreadLocalPolicyContextStore, PolicyContextStore class NotApplicable(Exception): pass class AuthorizationEngine: def __init__(self, name, *policy): self.name = name self.policies: Set[Union[PolicySet, Policy]] = set(policy) def authorize(self, context): results = [] for policy_sey in self.policies: try: results.append(policy_sey.authorize(context=context)) except NotApplicable: pass return False if False in results else True class PolicyContext: store: PolicyContextStore = ThreadLocalPolicyContextStore() _no_default = object() def __getitem__(self, item): return self.store.get(item, default=None) def __setitem__(self, item, value): self.store.set(item, value) @property def enabled(self): return self['enabled'] @enabled.setter def enabled(self, value): self['enabled'] = value @property def subject(self): return self['subject'] @subject.setter def subject(self, value): self['subject'] = value @property def environment(self): return self['environment'] @environment.setter def environment(self, value): self['environment'] = value @property def engine(self) -> AuthorizationEngine: return self['policy_engine'] @engine.setter def engine(self, value): self['policy_engine'] = value @property def on_deny(self) -> Optional[Exception]: return self['on_deny'] @on_deny.setter def on_deny(self, value): self['on_deny'] = value @contextmanager def scope(self, subject, environment, policy_engine, on_deny: Optional[Union[Type, Callable]] = AuthorizationError): self.configure(subject=subject, environment=environment, policy_engine=policy_engine, on_deny=on_deny) yield self.reset() @contextmanager def disabled_scope(self): try: self.enabled = False yield finally: self.enabled = True def configure(self, subject, environment, policy_engine, on_deny: Optional[Union[Type, Callable]] = AuthorizationError): self.enabled = True self.subject = subject self.environment = environment self.engine = policy_engine self.on_deny = on_deny def reset(self): self.enabled = True self.subject = None self.environment = None self.engine = None self.on_deny = None @classmethod def current(cls): return cls() def authorize(self, resource, action, on_deny: Optional[Union[Type, Callable]] = _no_default): if not self.enabled: return True attr_context = AttributeContext( subject=self.subject, environment=self.environment, action=action, resource=resource ) result = self.engine.authorize(attr_context) if result is False: on_deny = self.on_deny if on_deny is self._no_default else on_deny if inspect.isclass(on_deny) and issubclass(on_deny, Exception): raise on_deny elif callable(on_deny): on_deny(attr_context) return result def filter(self, resources, action): return [resource for resource in resources if self.authorize(resource, action, on_deny=None)] def post_filter(self, action): def outer(func): @functools.wraps(func) def inner(*args, **kwargs): result = func(*args, **kwargs) return self.filter(resources=result, action=action) return inner return outer def post_authorize(self, action, on_deny: Optional[Union[Type, Callable]] = _no_default): def outer(func): @functools.wraps(func) def inner(*args, **kwargs): result = func(*args, **kwargs) self.authorize(resource=result, action=action, on_deny=on_deny) return result return inner return outer class AttributeContext: def __init__(self, subject, resource, action, environment): self.subject = subject self.resource = resource self.action = action self.environment = environment class AuthorizationElement(metaclass=abc.ABCMeta): def target_clause(self, context: AttributeContext) -> bool: raise NotImplementedError def condition(self, context: AttributeContext) -> bool: raise NotImplementedError def apply(self, context: AttributeContext): if not self.target_clause(context=context): raise NotApplicable return self.condition(context=context) def authorize(self, context): raise NotImplementedError class PolicyCollection(AuthorizationElement, metaclass=abc.ABCMeta): def __init__(self, *collection): self.collection: Set[AuthorizationElement] = set(collection) def authorize(self, context): if not self.apply(context=context): return False results = set() for policy in self.collection: try: results.add(policy.authorize(context=context)) except NotApplicable: pass if any(results): return True return False if False in results else None class PolicySet(PolicyCollection, metaclass=abc.ABCMeta): pass class Policy(PolicyCollection, metaclass=abc.ABCMeta): pass class Rule(AuthorizationElement, metaclass=abc.ABCMeta): def on_permit(self, context): pass def on_deny(self, context): pass def authorize(self, context): result = self.apply(context=context) self.on_permit(context=context) if result else self.on_deny(context=context) return result PK!?iMriberry/policy/helpers.pyfrom .engine import Rule, AttributeContext, Policy class ShorthandRule(Rule): def __init__(self, func): self.func = func def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return self.func(context) class ShorthandPolicy(Policy): def __init__(self, func, *collection): super(ShorthandPolicy, self).__init__(*collection) self.func = func def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return self.func(context) class ShorthandPolicySet(Policy): def __init__(self, func, *collection): super(ShorthandPolicySet, self).__init__(*collection) self.func = func def target_clause(self, context: AttributeContext) -> bool: return True def condition(self, context: AttributeContext) -> bool: return self.func(context) def rule(func): return ShorthandRule(func) def policy(func): def builder(*collection): return ShorthandPolicy(func, *collection) return builder def policy_set(func): def builder(*collection): return ShorthandPolicySet(func, *collection) return builderPK!5+Xriberry/policy/store.pyimport abc from threading import local class PolicyContextStore(abc.ABC): def get(self, item, default=None): raise NotImplementedError def set(self, item, value): raise NotImplementedError class ThreadLocalPolicyContextStore(PolicyContextStore): _local = local() def get(self, item, default=None): return getattr(self._local, item, default) def set(self, item, value): setattr(self._local, item, value) PK!Gdriberry/services/__init__.pyfrom riberry import policy as rib_policy from . import application, application_instance, form, auth, job, job_executions, self, policy def fetch_relationship(model_object, attribute, action): resource = getattr(model_object, attribute) if isinstance(resource, list): return rib_policy.context.filter(resources=resource, action=action) if rib_policy.context.authorize(resource=resource, action=action, on_deny=None) is not False: return resource return None PK!0lriberry/services/application.pyfrom typing import List from riberry import model, policy @policy.context.post_filter(action='view') def all_applications() -> List[model.application.Application]: return model.application.Application.query().all() @policy.context.post_authorize(action='view') def application_by_id(application_id) -> model.application.Application: return model.application.Application.query().filter_by(id=application_id).one() @policy.context.post_authorize(action='view') def application_by_internal_name(internal_name) -> model.application.Application: return model.application.Application.query().filter_by(internal_name=internal_name).one() def create_application(name, internal_name, description, type, document): app = model.application.Application( name=name, internal_name=internal_name, description=description, type=type, document=model.misc.Document(content=document) if document else None ) policy.context.authorize(app, action='create') model.conn.add(app) return app def update_application(application, attributes): for attr in {'name', 'description', 'type'} & set(attributes): setattr(application, attr, attributes[attr]) return application PK!<0(riberry/services/application_instance.pyfrom typing import List, Dict from riberry import model, policy, services @policy.context.post_filter(action='view') def all_application_instances() -> List[model.application.ApplicationInstance]: return model.application.ApplicationInstance.query().all() @policy.context.post_authorize(action='view') def application_instance_by_id(application_instance_id) -> model.application.ApplicationInstance: return model.application.ApplicationInstance.query().filter_by(id=application_instance_id).one() @policy.context.post_authorize(action='view') def application_instance_by_internal_name(internal_name) -> model.application.ApplicationInstance: return model.application.ApplicationInstance.query().filter_by(internal_name=internal_name).one() @policy.context.post_filter(action='view') def instances_by_application_id(application_id) -> List[model.application.ApplicationInstance]: application = services.application.application_by_id(application_id=application_id) return application.instances def create_application_instance(application, name, internal_name, schedules: List[Dict]) -> model.application.ApplicationInstance: application_instance = model.application.ApplicationInstance( application=application, name=name, internal_name=internal_name, schedules=create_application_instance_schedules(attributes_dict=schedules), ) policy.context.authorize(application_instance, action='create') model.conn.add(application_instance) return application_instance def create_application_instance_schedules(attributes_dict): return [ model.application.ApplicationInstanceSchedule( days=schedule['days'], start_time=schedule['start_time'], end_time=schedule['end_time'], timezone=schedule['timezone'], parameter=schedule['parameter'], value=schedule['value'], priority=schedule['priority'], ) for schedule in attributes_dict ] def update_application_instance(application_instance: model.application.ApplicationInstance, attributes: Dict): for attr in {'name'} & set(attributes): setattr(application_instance, attr, attributes[attr]) return application_instance PK!,Tbbriberry/services/auth.pyfrom riberry import model from datetime import timedelta from riberry.model.group import Group def authenticate_user(username: str, password: str) -> str: user = model.auth.User.authenticate(username=username, password=password) access_token = model.auth.AuthToken.create(user, expiry_delta=timedelta(days=5)) return access_token.decode() def all_groups(): return Group.query().all() def group_by_id(group_id): return Group.query().filter_by(id=group_id).first() def users_for_group_id(group_id): group = group_by_id(group_id=group_id) return group.users def forms_for_group_id(group_id): group = group_by_id(group_id=group_id) return group.forms def remove_user_from_group(group_id, user_id): _remove_resource_from_group(group_id, user_id, model.misc.ResourceType.user) def add_user_to_group(group_id, user_id): _add_resource_to_group(group_id, user_id, model.misc.ResourceType.user) def remove_form_from_group(group_id, form_id): _remove_resource_from_group(group_id, form_id, model.misc.ResourceType.form) def add_form_to_group(group_id, form_id): _add_resource_to_group(group_id, form_id, model.misc.ResourceType.form) def _find_group_association(group_id, resource_id, resource_type): return model.group.ResourceGroupAssociation.query().filter_by( group_id=group_id, resource_id=resource_id, resource_type=resource_type ).first() def _remove_resource_from_group(group_id, resource_id, resource_type): association = _find_group_association(group_id, resource_id, resource_type) if association: model.conn.delete(association) def _add_resource_to_group(group_id, resource_id, resource_type): association = _find_group_association(group_id, resource_id, resource_type) if not association: association = model.group.ResourceGroupAssociation( group_id=group_id, resource_id=resource_id, resource_type=resource_type ) model.conn.add(association) def create_group(name): group = model.group.Group(name=name) model.conn.add(group) return groupPK!M riberry/services/form.pyfrom typing import List, Dict from sqlalchemy import desc from riberry import model, policy from riberry import services @policy.context.post_filter(action='view') def all_forms() -> List[model.interface.Form]: return model.interface.Form.query().all() @policy.context.post_authorize(action='view') def form_by_id(form_id) -> model.interface.Form: return model.interface.Form.query().filter_by(id=form_id).one() @policy.context.post_authorize(action='view') def form_by_internal_name(internal_name) -> model.interface.Form: return model.interface.Form.query().filter_by( internal_name=internal_name, ).one() @policy.context.post_filter(action='view') def forms_by_application_id(application_id): application = services.application.application_by_id(application_id=application_id) return application.forms def create_form(application, instance, name, internal_name, version, description, input_files, input_values) -> model.interface.Form: form = model.interface.Form( application=application, instance=instance, name=name, internal_name=internal_name, version=version, description=description, input_file_definitions=[model.interface.InputFileDefinition(**d) for d in input_files], input_value_definitions=[model.interface.InputValueDefinition(**d) for d in input_values] ) policy.context.authorize(form, action='create') model.conn.add(form) return form def update_form(form: model.interface.Form, attributes: Dict): for attr in {'name', 'version', 'description'} & set(attributes): setattr(form, attr, attributes[attr]) return form @policy.context.post_authorize(action='view') def file_definition_by_internal_name(form, internal_name) -> model.interface.InputFileDefinition: return model.interface.InputFileDefinition.query().filter_by( form=form, internal_name=internal_name, ).one() @policy.context.post_authorize(action='view') def value_definition_by_internal_name(form, internal_name) -> model.interface.InputValueDefinition: return model.interface.InputValueDefinition.query().filter_by( form=form, internal_name=internal_name, ).one() def update_file_definition(definition: model.interface.InputFileDefinition, attributes: Dict): for attr in {'required', 'type', 'name', 'description', 'accept'} & set(attributes): setattr(definition, attr, attributes[attr]) return definition def update_value_definition(definition: model.interface.InputValueDefinition, attributes: Dict): for attr in {'required', 'type', 'name', 'description', 'default_binary'} & set(attributes): setattr(definition, attr, attributes[attr]) if 'allowed_binaries' in attributes: current = set(definition.allowed_binaries) present = set(attributes['allowed_binaries']) removed = current - present if current != present: definition.allowed_binaries = [enum for enum in attributes['allowed_binaries'] if enum not in removed] elif definition.allowed_binaries: definition.allowed_binaries = [] return definition def job_executions_by_id(form_id, limit=50): form = form_by_id(form_id=form_id) return model.job.JobExecution.query().filter( (model.job.JobExecution.job_id == model.job.Job.id) & (model.job.Job.form_id == form.id) ).order_by(desc(model.job.JobExecution.updated)).limit(limit).all() PK!F UÌriberry/services/job.pyimport json from io import BytesIO from typing import Dict from riberry import model, services, policy, exc class InputFileProxy(BytesIO): def __init__(self, obj, filename=None): self.filename = filename super().__init__(obj) @classmethod def from_object(cls, obj, filename='input'): if isinstance(obj, bytes): binary = obj elif isinstance(obj, str): binary = obj.encode() if not filename.endswith('.txt'): filename += '.txt' else: binary = json.dumps(obj).encode() if not filename.endswith('.json'): filename += '.json' return cls(binary, filename) def jobs_by_form_id(form_id): return model.job.Job.query().filter_by(form_id=form_id).all() def verify_inputs(input_value_definitions, input_file_definitions, input_values, input_files): value_map_definitions: Dict[str, 'model.interface.InputValueDefinition'] = {input_def.name: input_def for input_def in input_value_definitions} file_map_definitions: Dict[str, 'model.interface.InputValueDefinition'] = {input_def.name: input_def for input_def in input_file_definitions} value_mapping = {d.internal_name: d.name for d in value_map_definitions.values()} file_mapping = {d.internal_name: d.name for d in file_map_definitions.values()} input_values = {value_mapping.get(k, k): v for k, v in input_values.items()} input_files = {file_mapping.get(k, k): v for k, v in input_files.items()} input_value_mapping = {} input_file_mapping = {} errors = [] for name, definition in value_map_definitions.items(): if name in input_values: value = input_values.pop(name) else: value = definition.default_binary if definition.required and not value: err = exc.RequiredInputError(target='job', field=definition.name, internal_name=definition.internal_name) errors.append(err) continue if definition.allowed_binaries and value: values = value if isinstance(value, str): values = [value] if isinstance(values, list): values = [json.dumps(v).encode() if v else v for v in values] if set(values) - set(definition.allowed_binaries) or definition.type != 'text-multiple' and len(values) > 1: err = exc.InvalidEnumError( target='job', field=definition.name, allowed_values=definition.allowed_values, internal_name=definition.internal_name ) errors.append(err) continue input_value_mapping[definition] = value for name in list(input_values): if name in file_mapping and name not in input_files: input_files[file_mapping[name]] = input_values.pop(name) for name, definition in file_map_definitions.items(): if name in input_files: value = input_files.pop(name) else: value = None if definition.required and not value: err = exc.RequiredInputError(target='job', field=definition.name, internal_name=definition.internal_name) errors.append(err) continue input_file_mapping[definition] = value unexpected_inputs = set(input_values) | set(input_files) if unexpected_inputs: for input_ in unexpected_inputs: err = exc.UnknownInputError(target='job', field=input_) errors.append(err) if errors: raise exc.InputErrorGroup(*errors) return input_value_mapping, input_file_mapping def create_job(form_id, name, input_values, input_files, execute, parent_execution=None): form = services.form.form_by_id(form_id=form_id) policy.context.authorize(form, action='view') errors = [] if not name: err = exc.RequiredInputError(target='job', field='name') errors.append(err) else: if model.job.Job.query().filter_by(name=name).first(): err = exc.UniqueInputConstraintError(target='job', field='name', value=name) errors.append(err) try: values_mapping, files_mapping = verify_inputs( input_value_definitions=form.input_value_definitions, input_file_definitions=form.input_file_definitions, input_values=input_values, input_files=input_files ) except exc.InputErrorGroup as e: e.extend(errors) raise else: if errors: raise exc.InputErrorGroup(*errors) input_value_instances = [] input_file_instances = [] values_mapping = { k: (json.dumps(v).encode() if v and not isinstance(v, bytes) else v) for k, v in values_mapping.items() } for definition, value in values_mapping.items(): input_value_instance = model.interface.InputValueInstance( name=definition.name, internal_name=definition.internal_name, raw_value=value ) input_value_instances.append(input_value_instance) for definition, value in files_mapping.items(): filename = definition.internal_name if not hasattr(value, 'read'): value = InputFileProxy.from_object(obj=value, filename=filename) binary = value.read() if isinstance(binary, str): binary = binary.encode() if hasattr(value, 'filename'): filename = value.filename input_file_instance = model.interface.InputFileInstance( name=definition.name, internal_name=definition.internal_name, filename=filename, binary=binary, size=len(binary) if binary else 0 ) input_file_instances.append(input_file_instance) job = model.job.Job( form=form, name=name, files=input_file_instances, values=input_value_instances, creator=policy.context.subject ) policy.context.authorize(job, action='create') if execute: create_job_execution(job, parent_execution=parent_execution) model.conn.add(job) return job @policy.context.post_authorize(action='view') def job_by_id(job_id): return model.job.Job.query().filter_by(id=job_id).one() @policy.context.post_filter(action='view') def job_executions_by_id(job_id): return model.job.JobExecution.query().filter_by(job_id=job_id).all() def create_job_execution_by_job_id(job_id): job = job_by_id(job_id=job_id) return create_job_execution(job=job) def create_job_execution(job, parent_execution=None): execution = model.job.JobExecution(job=job, creator=policy.context.subject, parent_execution=parent_execution) policy.context.authorize(execution, action='create') model.conn.add(execution) return execution def input_file_instance_by_id(input_file_instance_id) -> model.interface.InputFileInstance: return model.interface.InputFileInstance.query().filter_by(id=input_file_instance_id).one() def delete_job_by_id(job_id): delete_job(job=job_by_id(job_id=job_id)) @policy.context.post_authorize(action='view') def delete_job(job): model.conn.delete(job) PK!:}}"riberry/services/job_executions.pyimport riberry from riberry import model, policy @policy.context.post_authorize(action='view') def job_execution_by_id(execution_id): return model.job.JobExecution.query().filter_by(id=execution_id).one() @policy.context.post_authorize(action='view') def job_artifact_by_id(artifact_id): return model.job.JobExecutionArtifact.query().filter_by(id=artifact_id).one() @policy.context.post_authorize(action='view') def job_stream_by_id(stream_id): return model.job.JobExecutionStream.query().filter_by(id=stream_id).one() @policy.context.post_authorize(action='view') def delete_job_execution_by_id(execution_id): delete_job_execution(execution=job_execution_by_id(execution_id=execution_id)) @policy.context.post_authorize(action='view') def delete_job_execution(execution): model.conn.delete(execution) @policy.context.post_authorize(action='view') def cancel_job_execution_by_id(execution_id): cancel_job_execution(execution=job_execution_by_id(execution_id=execution_id)) @policy.context.post_authorize(action='view') def cancel_job_execution(execution): if execution.status in ('SUCCESS', 'FAILURE'): return user = policy.context.subject message = 'The current execution was manually cancelled by user {} ({}).'.format( user.username, user.details.display_name ).encode() artifact = model.job.JobExecutionArtifact( name=f'Workflow cancelled by user {user.username}', job_execution=execution, type=model.job.ArtifactType.error, category='Fatal', filename='fatal.log', size=len(message), binary=model.job.JobExecutionArtifactBinary(binary=message), ) model.conn.add(artifact) riberry.app.actions.executions.execution_complete( task_id=execution.task_id, root_id=execution.task_id, status='FAILURE', stream=None, context=None, ) PK!緔riberry/services/policy.pyfrom contextlib import contextmanager from riberry import model, config, policy @contextmanager def policy_scope(user=None, environment=None): if isinstance(user, str): user = model.auth.User.query().filter_by(username=user).one() policy_provider = config.config.policies.provider with policy.context.scope(subject=user, environment=environment, policy_engine=policy_provider): yield PK! riberry/services/self.pyfrom typing import List from sqlalchemy import desc from riberry import model, policy def profile() -> model.auth.User: return policy.context.subject def latest_notifications(): user = policy.context.subject return model.misc.UserNotification.query().filter_by(user=user).order_by( desc(model.misc.UserNotification.created) ).limit(32).all() def unread_notification_count(): user = policy.context.subject return model.misc.UserNotification.query().filter_by(read=False, user=user).count() def mark_notifications_as_read(notification_ids: List): user = policy.context.subject notifications: List[model.misc.UserNotification] = model.misc.UserNotification.query().filter( (model.misc.UserNotification.id.in_(notification_ids)) & (model.misc.UserNotification.user == user) & (model.misc.UserNotification.read == False) ).all() for notification in notifications: notification.read = True def mark_all_notifications_as_read(): user = policy.context.subject notifications: List[model.misc.UserNotification] = model.misc.UserNotification.query().filter_by( user=user, read=False ).all() for notification in notifications: notification.read = True PK!riberry/util/__init__.pyPK!_ _ riberry/util/__main__.pyimport json import click from . import user, config_importer, groups @click.group() def cli(): pass @cli.command('import') @click.option('--config-path', '-p', prompt='Configuration path', help='YAML file containing application references.') @click.option('--commit/--rollback', '-c/-r', prompt='Dry run', is_flag=True, help='Gathers all database changes without commits.', default=False) @click.option('--app', '-a', 'apps', help='Restricts the import to the specified app', multiple=True) def importer(config_path, commit, apps): changes = config_importer.import_from_file(config_path, dry_run=not commit, restrict_apps=apps) print(json.dumps(changes, indent=2)) @cli.command('add-user') @click.option('--username', prompt='Username', help="User's username") @click.option('--password', prompt='Password', help="User's password", hide_input=True, confirmation_prompt=True) @click.option('--first-name', prompt='First name', help="User's first name") @click.option('--last-name', prompt='Last name', help="User's last name") @click.option('--display-name', prompt='Full name', help="User's last name") @click.option('--department', prompt='Department', help="User's department") @click.option('--email', prompt='Email', help="User's email") def add_user(username, password, first_name, last_name, display_name, department, email): user_id = user.add_user( username=username, password=password, first_name=first_name, last_name=last_name, display_name=display_name, department=department, email=email, ) print(f'Created user {username} (User ID: {user_id})') @cli.command('user-groups') @click.argument('action', type=click.Choice(['add', 'remove'])) @click.option('--username', '-u', prompt='Username', help="User's username") @click.option('--group', '-g', prompt='Username', help="Group's name") def modify_user_groups(action, username, group): try: if action == 'add': groups.add_user_to_group(username=username, group_name=group) print(f'Added {username} to group {group}') elif action == 'remove': groups.remove_user_from_group(username=username, group_name=group) print(f'Removed {username} from group {group}') except Exception as exc: print(str(exc)) exit(1) if __name__ == '__main__': cli() PK!W@xxriberry/util/common.pyimport os from string import Template def variable_substitution(obj): if isinstance(obj, str): try: return Template(template=obj).substitute(os.environ) except KeyError as exc: key = exc.args[0] raise ValueError(f'Environment variable substitution failed for {key!r}. ' f'Does the environment variable exist?') elif isinstance(obj, dict): return {k: variable_substitution(v) for k, v in obj.items()} elif isinstance(obj, (tuple, list, set)): return type(obj)(map(variable_substitution, obj)) else: return obj PK!Pb"?"?riberry/util/config_importer.pyimport json import os import uuid import yaml from sqlalchemy.inspection import inspect from sqlalchemy.orm.exc import NoResultFound from riberry import model, services, policy from riberry.util.common import variable_substitution class Loader(yaml.SafeLoader): """ https://stackoverflow.com/a/9577670 """ def __init__(self, stream): self._root = os.path.split(stream.name)[0] super(Loader, self).__init__(stream) def include(self, node): filename = variable_substitution(os.path.join(self._root, self.construct_scalar(node))) with open(filename, 'r') as f: if filename.endswith('.yaml') or filename.endswith('.yml'): return yaml.load(f, Loader) else: return f.read() Loader.add_constructor('!include', Loader.include) def collection_diff(obj, collection_name, loader): current_collection = set(getattr(obj, collection_name)) new_collection = set(loader()) for stale in current_collection - new_collection: model.conn.delete(stale) setattr(obj, collection_name, list(new_collection)) return obj def model_diff(obj): object_info = inspect(obj) return { name: attr.history for name, attr in object_info.attrs.items() if attr.history.has_changes() } def session_diff(): diff = {} for obj in sorted(model.conn.dirty | model.conn.new | model.conn.deleted, key=lambda o: str(o)): type_ = 'Modified' if obj in model.conn.dirty else 'Added' if obj in model.conn.new else 'Deleted' if obj in model.conn.deleted else 'REF' diff[obj] = type_, model_diff(obj) return diff def import_applications(applications, restrict=None): existing_apps = {a.internal_name: a for a in model.application.Application.query().all()} if not restrict: for stale in set(existing_apps) - set(applications): model.conn.delete(existing_apps[stale]) apps = [] for application, properties in applications.items(): if restrict and application not in restrict: continue app = import_application(internal_name=application, attributes=properties) apps.append(app) return apps def import_application(internal_name, attributes): try: app = services.application.application_by_internal_name(internal_name=internal_name) app = services.application.update_application(app, attributes) except NoResultFound: app = services.application.create_application( internal_name=internal_name, name=attributes.get('name'), description=attributes.get('description'), type=attributes.get('type'), document=None ) if attributes.get('document'): if not app.document: app.document = model.misc.Document() model.conn.add(app.document) app.document.content = attributes['document'].encode() else: if app.document: app.document = None import_instances(app, attributes.get('instances') or {}) import_forms(app, attributes.get('forms') or {}) return app def import_instances(app, instances): return collection_diff( obj=app, collection_name='instances', loader=lambda: { import_instance(app, name, attrs) for name, attrs in instances.items() } ) def import_forms(app, forms): return collection_diff( obj=app, collection_name='forms', loader=lambda: { import_form(app, name, attrs) for name, attrs in forms.items() } ) def import_form(app, internal_name, attributes): try: form = services.form.form_by_internal_name(internal_name=internal_name) form = services.form.update_form(form, attributes) except NoResultFound: instance_internal_name = attributes['instance'] instances = [instance for instance in app.instances if instance.internal_name == instance_internal_name] if not instances: raise ValueError(f'Could not find instance {instance_internal_name} for form {internal_name}') form = services.form.create_form( application=app, instance=instances[0], name=attributes.get('name'), internal_name=internal_name, version=attributes.get('version'), description=attributes.get('description'), input_files=[], input_values=[] ) if attributes.get('document'): if not form.document: form.document = model.misc.Document() model.conn.add(form.document) form.document.content = attributes['document'].encode() else: if form.document: form.document = None import_form_inputs( form=form, input_files=attributes.get('inputFiles') or {}, input_values=attributes.get('inputValues') or {}, ) return form def import_input_file_definition(form, internal_name, attributes): try: if not form.id: raise NoResultFound definition = services.form.file_definition_by_internal_name(form=form, internal_name=internal_name) definition = services.form.update_file_definition(definition, attributes) except NoResultFound: definition = model.interface.InputFileDefinition(internal_name=internal_name, **attributes) model.conn.add(definition) return definition def import_input_value_definition(form, internal_name, attributes): mapping = { 'enumerations': ('allowed_binaries', lambda values: [json.dumps(v).encode() for v in values]), 'default': ('default_binary', lambda v: json.dumps(v).encode()), } attributes = dict( (mapping[k][0], mapping[k][1](v)) if k in mapping else (k, v) for k, v in attributes.items() ) try: if not form.id: raise NoResultFound definition = services.form.value_definition_by_internal_name(form=form, internal_name=internal_name) definition = services.form.update_value_definition(definition, attributes) except NoResultFound: definition = model.interface.InputValueDefinition(internal_name=internal_name, **attributes) model.conn.add(definition) return definition def import_form_inputs(form, input_files, input_values): collection_diff( obj=form, collection_name='input_file_definitions', loader=lambda: { import_input_file_definition(form, name, attrs) for name, attrs in input_files.items() } ) collection_diff( obj=form, collection_name='input_value_definitions', loader=lambda: { import_input_value_definition(form, name, attrs) for name, attrs in input_values.items() } ) def import_instance(app, internal_name, attributes): try: instance = services.application_instance.application_instance_by_internal_name(internal_name=internal_name) instance = services.application_instance.update_application_instance(instance, attributes) except NoResultFound: instance = services.application_instance.create_application_instance( application=app, internal_name=internal_name, name=attributes.get('name'), schedules=[] ) current_schedules = {} for schedule in instance.schedules: current_schedules[( schedule.days, schedule.start_time, schedule.end_time, schedule.timezone, schedule.parameter, schedule.value, schedule.priority, )] = schedule loaded_schedules = set(( sched['days'].lower() if isinstance(sched.get('days'), str) else sched.get('days', '*'), model.application.ApplicationInstanceSchedule.cleanse_time( sched.get('startTime') or model.application.ApplicationInstanceSchedule.start_time.default.arg), model.application.ApplicationInstanceSchedule.cleanse_time( sched.get('endTime') or model.application.ApplicationInstanceSchedule.end_time.default.arg), sched.get('timeZone') or model.application.ApplicationInstanceSchedule.timezone.default.arg, sched['parameter'], str(sched['value']) if sched['value'] not in ('', None) else None, sched.get('priority') or model.application.ApplicationInstanceSchedule.priority.default.arg, ) for sched in attributes.get('schedules', [])) for stale in set(current_schedules) - loaded_schedules: model.conn.delete(current_schedules[stale]) new_schedules = [] for schedule in loaded_schedules: if schedule in current_schedules: continue days, start_time, end_time, timezone, parameter, value, priority = schedule new_schedules.append( dict( days=days, start_time=start_time, end_time=end_time, timezone=timezone, parameter=parameter, value=value, priority=priority, ) ) instance.schedules += services.application_instance.create_application_instance_schedules(new_schedules) return instance def import_groups(applications, restrict): created_groups = {} for application, properties in applications.items(): if restrict and application not in restrict: continue for form_internal_name, form_info in properties.get('forms', {}).items(): groups = form_info.get('groups') or [] form: model.interface.Form = model.interface.Form.query().filter_by( internal_name=form_internal_name, ).first() if not form: print(f'import_groups:: Form {form_internal_name} not found, skipping {application}') continue associations = {assoc.group.name: assoc for assoc in form.group_associations} stale_assocs = set(associations) - set(groups) for stale in stale_assocs: model.conn.delete(associations[stale]) for group_name in groups: if group_name in associations: continue group = model.group.Group.query().filter_by(name=group_name).first() if not group: group = created_groups.get(group_name) if not group: group = services.auth.create_group(name=group_name) created_groups[group_name] = group association = model.group.ResourceGroupAssociation( resource_id=form.id, resource_type=model.misc.ResourceType.form, group=group ) model.conn.add(association) def _convert_value(value): if isinstance(value, list): return [_convert_value(v) for v in value] if isinstance(value, model.base.Base): return dict( name=type(value).__name__, id=getattr(value, 'id', None) ) if isinstance(value, str) and len(value) > 512: return f'(trimmed) {value[:512]}' if isinstance(value, bytes): return f'bytes (size: {len(value)})' import enum if isinstance(value, enum.Enum): return value.name return value def json_diff(diff): output = {'Modified': [], 'Added': [], 'Deleted': []} for obj, (diff_type, changes) in diff.items(): if not changes and diff_type not in ('Added', 'Deleted'): continue entry = {} for k, v in changes.items(): attr = {} if v.deleted: attr['Deleted'] = _convert_value(v.deleted[0]) if v.added and v.added[0]: attr['Added'] = _convert_value(v.added[0]) entry[k] = attr obj_dict = _convert_value(obj) output[diff_type].append({ 'id': obj_dict['id'], 'type': obj_dict['name'], 'attributes': entry }) return output def import_menu_for_forms(menu): for item in model.misc.MenuItem.query().all(): model.conn.delete(item) for item in menu: menu_item = import_menu_item(item, menu_type='forms', parent=None) model.conn.add(menu_item) def import_menu_item(item, menu_type, parent=None): menu_item = model.misc.MenuItem(parent=parent, menu_type=menu_type) if isinstance(item, dict): menu_item.key = str(uuid.uuid4()) menu_item.type = 'branch' menu_item.label = list(item.keys())[0] menu_item.children = [ import_menu_item(child, menu_type, menu_item) for child in list(item.values())[0] ] elif isinstance(item, str): menu_item.key = item menu_item.type = 'leaf' return menu_item def import_config(config, formatter=None, restrict_apps=None): applications = config.get('applications') or {} import_applications(applications=applications, restrict=restrict_apps) if not restrict_apps: import_capacities(config.get('capacity-configuration') or {}) diff = session_diff() model.conn.flush() import_menu_for_forms(menu=config.get('menues', {}).get('forms', {})) import_groups(applications=applications, restrict=restrict_apps) for k, v in session_diff().items(): if k in diff: diff[k] = diff[k][0], {**diff[k][1], **v[1]} else: diff[k] = v model.conn.flush() return formatter(diff) if formatter else diff def import_capacities(capacities): existing = {c.weight_parameter: c for c in model.application.CapacityConfiguration.query().all()} for stale in set(existing) - set(capacities): model.conn.delete(existing[stale]) for weight_name, properties in capacities.items(): import_capacity(weight_name, properties) def import_capacity(weight_parameter, properties): capacity_config: model.application.CapacityConfiguration = \ model.application.CapacityConfiguration.query().filter_by(weight_parameter=weight_parameter).first() if not capacity_config: capacity_config = model.application.CapacityConfiguration(weight_parameter=weight_parameter) capacity_config.capacity_parameter = properties['parameters']['capacity'] capacity_config.producer_parameter = properties['parameters']['producer'] capacity_config.distribution_strategy = ( model.application.CapacityDistributionStrategy(properties['strategy']) if 'strategy' in properties else model.application.CapacityConfiguration.distribution_strategy.default.arg ) new_producers = {p['internalName']: p for p in properties['producers']} # Update existing, delete old for producer in capacity_config.producers: if producer.internal_name not in new_producers: model.conn.delete(producer) else: producer_config = new_producers.pop(producer.internal_name) producer.name = producer_config.get('name') or producer.internal_name producer.capacity = producer_config['capacity'] # Add new for internal_name, producer_config in new_producers.items(): capacity_config.producers.append( model.application.CapacityProducer( internal_name=internal_name, name=producer_config.get('name') or internal_name, capacity=producer_config['capacity'], ) ) model.conn.add(capacity_config) def import_from_file(config_path, dry_run=True, formatter=json_diff, restrict_apps=None): with open(config_path) as f: config = variable_substitution(yaml.load(f, Loader) or {}) with model.conn.no_autoflush, policy.context.disabled_scope(): output = import_config(config, formatter=formatter, restrict_apps=restrict_apps) if dry_run: model.conn.rollback() else: model.conn.commit() return output PK!+ ]==riberry/util/groups.pyfrom typing import Tuple from riberry import model, services from sqlalchemy.orm.exc import NoResultFound def _find_user_and_group(username: str, group_name: str) -> Tuple[model.auth.User, model.group.Group]: try: user = model.auth.User.query().filter_by(username=username).one() except NoResultFound: raise ValueError(f'Could not find user with username {username!r}') try: group = model.group.Group.query().filter_by(name=group_name).one() except NoResultFound: raise ValueError(f'Could not find group with name {group_name!r}') return user, group def add_user_to_group(username: str, group_name: str): user, group = _find_user_and_group(username=username, group_name=group_name) services.auth.add_user_to_group(group_id=group.id, user_id=user.id) model.conn.commit() def remove_user_from_group(username: str, group_name: str): user, group = _find_user_and_group(username=username, group_name=group_name) services.auth.remove_user_from_group(group_id=group.id, user_id=user.id) model.conn.commit() PK!Ȟ66riberry/util/user.pyfrom riberry import model def add_user(username, password, first_name, last_name, display_name, department, email): user = model.auth.User( username=username, password=model.auth.User.secure_password(password).decode(), auth_provider='default', details=model.auth.UserDetails( first_name=first_name, last_name=last_name, display_name=display_name, department=department, email=email ) ) model.conn.add(user) model.conn.commit() return user.id PK!HK(,*riberry-0.10.14.dist-info/entry_points.txtN+I/N.,()*LJ-*z9Vy\\PK!((!riberry-0.10.14.dist-info/LICENSEMIT License Copyright (c) 2018 srafehi Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTUriberry-0.10.14.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H-"riberry-0.10.14.dist-info/METADATAMS0BflZOe¡ 2zpPHrRSCѾJZ]B< Kkr:f#kȩ+p%/S7%׍ܵ9jCeMZ8Cjnx L4f55sZP|8,eVPօ`ф$)\q~/<lBg|ݶ\Q2;AEl/?)D&ZvJ.baQd@vq`K;N/[қ+  1|K'56"V tرuŇVc@ȭ)6֠l4xjE<PK!H==&& riberry-0.10.14.dist-info/RECORDZIXE/9 }G ƾ _?duw5<(3UV }Ӑ/tpgp:/ <[plem6 N1֏>4H83eI;I{3y1z >xqչ[-9'nU4eQ;^C F0#qф\ iZ{-g8M(JbD4g[G24Dtϳ(z3({RߟK8^SzMjǮٝoMc|<hQ;nej[)c~nᕐa9| Ss5aQ>b`{Ju79-*{YcedJFʗJncf5&=zO1}(~&*UޒxcL7n 3NKӺר G~Ȅe Πn$g9wҳ9yVuMFFuF #ޡGCإklky1+ԣz@~n߰.߫%~ͥx57옹xkTI7bzRJqU+&yLSq\g^ YȱLoa:gK@BrnHo(S6c !0g#&YԃAx98 _Oe RF1$/,ΧU D?tY@X>5X,CE!9޲,!Y$}lJ>?AܵӖ^àL)J^Q+[ H E/zeI{wѢ apA`NEf@}< iլAINg{jSpG{D:ΧA:BS6]6Э_y9Nv-h..xc.zѭ%&-7NB?1_ʼnt,'y"da`c*5a Ҥ&gM0OeG0$Z團_FmX\Ʃ!`,*h@R[̲)m*= 6/ nUâ {GC\oVDނiBw8 LW+ 2ğQs۵o&4Q L38< =q4C g]vPUs҇pc͊rZ0r6NW_4έ㷼g>Qەc_5&> d\~st?"e^a'S/*|seq*S K]x8?f}-f_c4NPqb^<"q}gj`;MY<^1|F=Y*IZQ75GmdAV*dp%TS:_F}yZ &PE?g~np!mNy1w t[1/uEf6-@}09([r.v! B^7嗿$'C ĞTpcg'mg[Ļr72훗#F]Xo<](wP4<'-,Ohs45* /8 z0 -n'|5?uYAhKY2Zg"/cV==<Ϛo( >OEyIJq6hl; s]Zdx=jl)*dpSf-i=PrM}aII?yQ<8sscz٘ػ};>E6'̡P1r4lE,Z\7<:}=019NIFM?CCʺ$@ jهKgg1;KޏhTNKt7GLgɖчLf`6iv<FYݺ$}9ATΐ rm1#d%㟈i8XtGG$6E+' bSjUѝ ~wW`8ǀ j Ōm#Şp`k_LOwj> \Ok@}!i $@oY"ת~0ۗmӬry5jp yWZ`5 r=+ A p63EQ_,7:l~m[(b}&Kl[Mc-S%!}fcX>wZلb/sy\T|]%Vf(tw'$)WŌu|RyZa"Vr…)^סJA6}3`\cmik { )CH^veFFP2Ipno`ujd0˙Lu UF?+(Zz9;?@|:!W_Guөq#O%T' 7m~1pb9 VǾbhhąןRz A|=D*P:[+D=ݚK^CGPyPTf{v?%V .aJFz6Eǀ SlBL-9d|$xx=5PƺrR{8=C)iӧ@+ܺ+{Uİt Ҧ5{ "S732._J 9Ϥ`9UL_Ch EG?]c:,֎wUKOC@WMhO54^\v*pP⛣{@7;Z7J'kQ :SDžj֬r3e;-q x_7JD W>u=ucs?idb 2Y\%.)Tzzo+PGneygZ{qhgWh_dZq|s= Ϊ~hvq2T fȳl ƹCwl(@ &{F:<]Nt6ĵxߺEAoRsbU&؇-v^\Khl*qnlym)Tmt'z]4#blL#B tw/MPK!!דriberry/__init__.pyPK!d@riberry/app/__init__.pyPK!WW(riberry/app/actions/__init__.pyPK!f;U riberry/app/actions/artifacts.pyPK!yg' ! riberry/app/actions/executions.pyPK!<}J$riberry/app/actions/external_task.pyPK!lnn riberry/app/actions/jobs.pyPK!qqk$riberry/app/actions/notify.pyPK!v9*riberry/app/actions/reports.pyPK!44"*.riberry/app/actions/shared_data.pyPK!3riberry/app/addons/__init__.pyPK!{3riberry/app/addons/base.pyPK!Š,, 4riberry/app/backends/__init__.pyPK!Mg__!5riberry/app/backends/base.pyPK!%9riberry/app/backends/impl/__init__.pyPK!qIi<<,9riberry/app/backends/impl/celery/__init__.pyPK!3:riberry/app/backends/impl/celery/addons/__init__.pyPK!gЂ5m;riberry/app/backends/impl/celery/addons/background.pyPK!0\/B@riberry/app/backends/impl/celery/addons/base.pyPK!Y*4 4 <rGriberry/app/backends/impl/celery/addons/capacity/__init__.pyPK!Tl4BSriberry/app/backends/impl/celery/addons/capacity/priority_queue.pyPK!Óx3 3 A criberry/app/backends/impl/celery/addons/external_task_receiver.pyPK!ݨJ..0lriberry/app/backends/impl/celery/addons/scale.pyPK!m$ $ (riberry/app/backends/impl/celery/base.pyPK!+:ڦ,hriberry/app/backends/impl/celery/executor.pyPK!fM,,-riberry/app/backends/impl/celery/extension.pyPK!Yo??)Driberry/app/backends/impl/celery/patch.pyPK!;yD)riberry/app/backends/impl/celery/tasks.pyPK!(*riberry/app/backends/impl/pool/__init__.pyPK!]̍11&riberry/app/backends/impl/pool/base.pyPK!>ΒB!!%*riberry/app/backends/impl/pool/exc.pyPK! %riberry/app/backends/impl/pool/log.pyPK!h<^??5rriberry/app/backends/impl/pool/task_queue/__init__.pyPK!R 1riberry/app/backends/impl/pool/task_queue/base.pyPK!uNN0 riberry/app/backends/impl/pool/tasks/__init__.pyPK!z52riberry/app/backends/impl/pool/tasks/background.pyPK!oX X 0riberry/app/backends/impl/pool/tasks/executor.pyPK!ӃxWW>Zriberry/app/backends/impl/pool/tasks/external_task_receiver.pyPK!  riberry/app/base.pyPK!GHQ riberry/app/context/__init__.pyPK!wZ88Eriberry/app/context/artifact.pyPK! riberry/app/context/current.pyPK!_ln %"riberry/app/context/event_registry.pyPK!hPlj$/riberry/app/context/external_task.pyPK!PFXff6riberry/app/context/flow.pyPK!;&zK K $_>riberry/app/context/input_mapping.pyPK!x+Kriberry/app/context/report.pyPK!qpX"Qriberry/app/context/shared_data.pyPK!qvtdriberry/app/env.pyPK!2iriberry/app/misc/__init__.pyPK!zzjriberry/app/misc/signals.pyPK! yriberry/app/tasks.pyPK!~@::riberry/app/util/__init__.pyPK!g4Uriberry/app/util/events.pyPK!k: ܫ%riberry/app/util/execution_tracker.pyPK!ְ({{riberry/app/util/misc.pyPK!iriberry/app/util/redis_lock.pyPK!09Tޚriberry/celery/__init__.pyPK!%5riberry/celery/background/__init__.pyPK!  ,riberry/celery/background/capacity_config.pyPK!,riberry/celery/background/events/__init__.pyPK!aΩ55*6riberry/celery/background/events/events.pyPK!O}"'riberry/celery/background/tasks.pyPK![ǢE%%Yriberry/celery/util.pyPK!-::riberry/cli/__init__.pyPK!~>!riberry/cli/__main__.pyPK!So! qriberry/cli/commands/__init__.pyPK!  riberry/cli/commands/conf.pyPK!!riberry/cli/commands/run.pyPK!00riberry/cli/commands/web.pyPK!Ăriberry/cli/root.pyPK! 6##2riberry/config.pyPK!0dm!riberry/exc.pyPK!Yy  0riberry/log.pyPK!Cv^^1riberry/model/__init__.pyPK!f,۵,,%{7riberry/model/application/__init__.pyPK!& sdriberry/model/auth/__init__.pyPK!A%%?yriberry/model/base.pyPK!8w9 ~riberry/model/group/__init__.pyPK!*y#riberry/model/interface/__init__.pyPK!~LLriberry/model/job/__init__.pyPK!$<  riberry/model/misc/__init__.pyPK!A riberry/plugins/__init__.pyPK!d''$ riberry/plugins/defaults/__init__.pyPK!h^<<*e riberry/plugins/defaults/authentication.pyPK!KB^$riberry/plugins/defaults/policies.pyPK! 2riberry/plugins/interfaces.pyPK!j;ͬ4riberry/policy/__init__.pyPK! Z5riberry/policy/engine.pyPK!?iMMriberry/policy/helpers.pyPK!5+X(Sriberry/policy/store.pyPK!Gd,Uriberry/services/__init__.pyPK!0lSWriberry/services/application.pyPK!<0(l\riberry/services/application_instance.pyPK!,Tbberiberry/services/auth.pyPK!M "nriberry/services/form.pyPK!F UÌ|riberry/services/job.pyPK!:}}"˜riberry/services/job_executions.pyPK!緔riberry/services/policy.pyPK! Wriberry/services/self.pyPK!riberry/util/__init__.pyPK!_ _ riberry/util/__main__.pyPK!W@xxSriberry/util/common.pyPK!Pb"?"?riberry/util/config_importer.pyPK!+ ]==^riberry/util/groups.pyPK!Ȟ66riberry/util/user.pyPK!HK(,*7riberry-0.10.14.dist-info/entry_points.txtPK!((!riberry-0.10.14.dist-info/LICENSEPK!HڽTUriberry-0.10.14.dist-info/WHEELPK!H-"riberry-0.10.14.dist-info/METADATAPK!H==&&  riberry-0.10.14.dist-info/RECORDPKoo6"p