PK! qfastlane/__init__.py__version__ = '0.1.0' PK!fastlane/api/__init__.pyPK!!FFfastlane/api/app.pyimport logging import sys from json import loads import rq_dashboard import structlog from flask import Flask from flask_redis import FlaskRedis from flask_redis_sentinel import SentinelExtension from structlog.processors import ( JSONRenderer, StackInfoRenderer, TimeStamper, format_exc_info, ) from structlog.stdlib import add_log_level, add_logger_name, filter_by_level import fastlane.api.metrics as metrics import fastlane.api.rqb as rqb from fastlane.api.enqueue import bp as enqueue from fastlane.api.healthcheck import bp as healthcheck from fastlane.api.status import bp as status from fastlane.api.task import bp as task_api from fastlane.models import db class Application: def __init__(self, config, log_level, testing=False): self.config = config self.log_level = log_level self.create_app(testing) def create_app(self, testing): self.app = Flask("fastlane") self.app.testing = testing self.app.config.from_object(rq_dashboard.default_settings) self.app.error_handlers = [] for key in self.config.items.keys(): self.app.config[key] = self.config[key] self.app.config.DEBUG = self.config.DEBUG self.app.config.ENV = self.config.ENV self.app.original_config = self.config self.app.log_level = self.log_level self.configure_logging() self.connect_redis() self.connect_queue() self.connect_db() self.load_executor() self.load_error_handlers() metrics.init_app(self.app) self.app.register_blueprint(metrics.bp) self.app.register_blueprint(healthcheck) self.app.register_blueprint(enqueue) self.app.register_blueprint(task_api) self.app.register_blueprint(status) # self.app.register_blueprint(rq_dashboard.blueprint, url_prefix="/rq") def configure_logging(self): if self.app.testing: structlog.reset_defaults() disabled = [ "docker.utils.config", "docker.auth", "docker.api.build", "docker.api.swarm", "docker.api.image", "rq.worker", "werkzeug", "requests", "urllib3", ] for logger in disabled: log = logging.getLogger(logger) log.setLevel(logging.ERROR) log.disabled = True self.app.logger.disabled = True logging.basicConfig( level=self.log_level, stream=sys.stdout, format="%(message)s" ) chain = [ filter_by_level, add_log_level, add_logger_name, TimeStamper(fmt="iso"), StackInfoRenderer(), format_exc_info, JSONRenderer(indent=1, sort_keys=True), ] log = structlog.wrap_logger( logging.getLogger(__name__), processors=chain, context_class=dict, wrapper_class=structlog.stdlib.BoundLogger, # cache_logger_on_first_use=True, ) self.logger = log self.app.logger = self.logger def connect_redis(self): self.logger.debug("Connecting to redis...") if self.app.testing: self.logger.info("Configuring Fake Redis...") import fakeredis self.app.redis = FlaskRedis.from_custom_provider(fakeredis.FakeStrictRedis) self.app.redis.connect = self._mock_redis(True) self.app.redis.disconnect = self._mock_redis(False) self.app.redis.init_app(self.app) elif self.app.config["REDIS_URL"].startswith("redis+sentinel"): self.logger.info( "Configuring Redis Sentinel...", redis_url=self.app.config["REDIS_URL"] ) redis_sentinel = SentinelExtension() redis_connection = redis_sentinel.default_connection redis_sentinel.init_app(self.app) self.app.redis = redis_connection else: self.logger.info( "Configuring Redis...", redis_url=self.app.config["REDIS_URL"] ) self.app.redis = FlaskRedis() self.app.redis.init_app(self.app) self.logger.info("Connection to redis successful") def connect_queue(self): self.app.queue = None self.app.register_blueprint(rqb.bp) rqb.init_app(self.app) def connect_db(self): self.app.config["MONGODB_SETTINGS"] = loads(self.app.config["MONGODB_CONFIG"]) self.logger.info( "Connecting to MongoDB...", mongo=self.app.config["MONGODB_SETTINGS"] ) db.init_app(self.app) self.logger.info( "Connected to MongoDB successfully.", mongo=self.app.config["MONGODB_SETTINGS"], ) def load_executor(self): """Can't be loaded eagerly due to fork of jobs""" def _loads(): if getattr(self.app, "executor_module", None) is None: executor_module = __import__(self.config.EXECUTOR) if "." in self.config.EXECUTOR: for part in self.config.EXECUTOR.split(".")[1:]: executor_module = getattr(executor_module, part) self.app.executor_module = executor_module return self.app.executor_module.Executor(self.app) self.app.load_executor = _loads def load_error_handlers(self): self.app.error_handlers = [] for handler_name in self.app.config["ERROR_HANDLERS"]: parts = handler_name.split(".") obj = __import__(".".join(parts[:-1]), None, None, [parts[-1]], 0) obj = getattr(obj, parts[-1]) self.app.error_handlers.append(obj(self.app)) self.app.report_error = self.report_error def report_error(self, err, metadata=None): for handler in self.app.error_handlers: handler.report(err, metadata) def run(self, host, port): self.app.run(host, port) def _mock_redis(self, connected): def handle(): self.app.redis._redis_client.connected = connected return handle PK!fastlane/api/enqueue.pyfrom datetime import datetime, timezone from flask import Blueprint, current_app, g, make_response, request from rq_scheduler import Scheduler from fastlane.models.task import Task from fastlane.utils import parse_time from fastlane.worker.job import run_job try: from ujson import dumps, loads except ImportError: from json import dumps, loads bp = Blueprint("enqueue", __name__) def get_details(): details = request.get_json() if details is None and request.get_data(): details = loads(request.get_data()) return details @bp.route("/tasks/", methods=("POST",)) def create_task(task_id): details = get_details() if details is None or details == "": msg = "Failed to enqueue task because JSON body could not be parsed." g.logger.warn(msg) return make_response(msg, 400) image = details.get("image", None) command = details.get("command", None) if image is None or command is None: return make_response("image and command must be filled in the request.", 400) logger = g.logger.bind(task_id=task_id, image=image, command=command) logger.debug("Creating task...") task = Task.objects(task_id=task_id).modify(task_id=task_id, upsert=True, new=True) logger.info("Task created successfully.") logger.debug("Creating job...") retries = details.get("retries", 0) expiration = details.get("expiration") hard_limit = current_app.config["HARD_EXECUTION_TIMEOUT_SECONDS"] timeout = details.get("timeout", hard_limit) timeout = min( timeout, hard_limit ) # ensure jobs can't specify more than hard limit j = task.create_job() j.metadata["retries"] = retries j.metadata["retry_count"] = 0 j.metadata["expiration"] = expiration j.metadata["timeout"] = timeout j.metadata["envs"] = details.get("envs", {}) j.save() job_id = str(j.id) logger.debug("Job created successfully...", job_id=job_id) queue_job_id = None start_at = details.get("startAt", None) start_in = parse_time(details.get("startIn", None)) cron = details.get("cron", None) scheduler = Scheduler("jobs", connection=current_app.redis) args = [task_id, job_id, image, command] if start_at is not None: dt = datetime.utcfromtimestamp(int(start_at)) logger.debug("Enqueuing job execution in the future...", start_at=dt) result = scheduler.enqueue_at(dt, run_job, *args) j.metadata["enqueued_id"] = result.id j.save() logger.info("Job execution enqueued successfully.", start_at=dt) elif start_in is not None: dt = datetime.now(tz=timezone.utc) + start_in logger.debug("Enqueuing job execution in the future...", start_at=dt) result = scheduler.enqueue_at(dt, run_job, *args) j.metadata["enqueued_id"] = result.id j.save() logger.info("Job execution enqueued successfully.", start_at=dt) elif cron is not None: logger.debug("Enqueuing job execution using cron...", cron=cron) result = scheduler.cron( cron, # A cron string (e.g. "0 0 * * 0") func=run_job, args=args, repeat=None, queue_name="jobs", ) j.metadata["enqueued_id"] = result.id j.metadata["cron"] = cron j.scheduled = True j.save() logger.info("Job execution enqueued successfully.", cron=cron) else: logger.debug("Enqueuing job execution...") result = current_app.job_queue.enqueue(run_job, *args, timeout=-1) queue_job_id = result.id j.metadata["enqueued_id"] = result.id j.save() logger.info("Job execution enqueued successfully.") return dumps({"taskId": task_id, "jobId": job_id, "queueJobId": queue_job_id}) PK!#hgfastlane/api/healthcheck.pyfrom flask import Blueprint, current_app, jsonify from fastlane.models import db bp = Blueprint("healthcheck", __name__, url_prefix="/healthcheck") @bp.route("/", methods=("GET",)) def healthcheck(): status = {"redis": True, "mongo": True, "errors": []} try: res = current_app.redis.ping() assert res, f"Connection to redis failed ({res})." except Exception as err: status["errors"].append({"source": "redis", "message": str(err)}) status["redis"] = False try: database = current_app.config["MONGODB_SETTINGS"]["db"] conn = getattr(db.connection, database) res = tuple(conn.jobs.find().limit(1)) assert isinstance(res, (tuple,)), f"Connection to mongoDB failed ({res})." except Exception as err: status["errors"].append({"source": "mongo", "message": str(err)}) status["mongo"] = False code = 200 if len(status["errors"]) > 0: code = 500 return jsonify(status), code PK!=Jfastlane/api/metrics.pyfrom datetime import datetime from uuid import uuid4 from flask import Blueprint, current_app, g, request bp = Blueprint('metrics', __name__) def init_app(app): @app.before_request def start_timer(): request_id = request.headers.get('X-Request-ID', str(uuid4())) g.logger = current_app.logger.bind(request_id=request_id) g.request_id = request_id g.start = datetime.now() @app.after_request def log_request(response): if request.path == '/favicon.ico': return response now = datetime.now() duration = int(round((now - g.start).microseconds / 1000, 2)) ip = request.headers.get('X-Forwarded-For', request.remote_addr) host = request.host.split(':', 1)[0] log_params = { 'method': request.method, 'path': request.path, 'status': response.status_code, 'duration': duration, 'ip': ip, 'host': host, } request_id = g.request_id if request_id: log_params['request_id'] = request_id if response.status_code < 400: current_app.logger.info('Request succeeded', **log_params) elif response.status_code < 500: current_app.logger.info('Bad Request', **log_params) else: current_app.logger.error('Internal Server Error', **log_params) return response PK!BHfastlane/api/rqb.pyfrom flask import Blueprint from rq import Queue from rq_scheduler import Scheduler bp = Blueprint('rq', __name__) class JobQueue: def __init__(self, queue_name, app): self.app = app self.queue_name = queue_name self._queue = None self._scheduler = None def enqueue_in(self, *args, **kw): self.app.logger.info('Scheduling execution for the future.', **kw) return self._scheduler.enqueue_in(*args, **kw) def enqueue_at(self, *args, **kw): self.app.logger.info('Scheduling execution for a specific timestamp.', **kw) return self._scheduler.enqueue_at(*args, **kw) def enqueue(self, *args, **kw): return self.queue.enqueue(*args, **kw) @property def queue(self): if self._queue is None: self._queue = Queue(self.queue_name, connection=self.app.redis) return self._queue @property def scheduler(self): if self._scheduler is None: self._scheduler = Scheduler(queue=self._queue) return self._scheduler def init_app(app): for qn in ['jobs', 'monitor']: key = f"{qn.rstrip('s')}" setattr(app, f'{key}_queue', JobQueue(qn, app)) PK!mfastlane/api/status.pyfrom datetime import datetime import croniter from flask import Blueprint, current_app, jsonify, url_for from fastlane.models.job import Job bp = Blueprint("status", __name__, url_prefix="/status") @bp.route("/", methods=("GET",)) def status(): executor = current_app.load_executor() status = {"hosts": [], "containers": {"running": []}} containers = executor.get_running_containers() for host, port, container_id in containers["running"]: status["containers"]["running"].append( {"host": host, "port": port, "id": container_id} ) status["hosts"] = containers["available"] status["queues"] = {"jobs": {}, "monitor": {}, "error": {}} for queue in ["jobs", "monitor", "error"]: jobs_queue_size = current_app.redis.llen(f"rq:queue:{queue}") status["queues"][queue]["length"] = jobs_queue_size status["scheduled"] = [] scheduled_jobs = Job.objects(scheduled=True).all() for job in scheduled_jobs: j = job.to_dict(include_executions=False) itr = croniter.croniter(job.metadata["cron"], datetime.utcnow()) j["nextScheduledAt"] = itr.get_next(datetime).isoformat() task_id = job.task.task_id job_url = url_for( "task.get_job", task_id=task_id, job_id=str(job.id), _external=True ) j["url"] = job_url stop_job_url = url_for( "task.stop_job", task_id=task_id, job_id=str(job.id), _external=True ) j["stopUrl"] = stop_job_url task_url = url_for("task.get_task", task_id=task_id, _external=True) del j["taskId"] j["task"] = {"id": task_id, "url": task_url} status["scheduled"].append(j) return jsonify(status), 200 PK!UQ Q fastlane/api/task.pyfrom flask import Blueprint, abort, current_app, g, jsonify, url_for from rq_scheduler import Scheduler from fastlane.models.job import Job from fastlane.models.task import Task bp = Blueprint("task", __name__) @bp.route("/tasks/", methods=("GET",)) def get_task(task_id): logger = g.logger.bind(task_id=task_id) logger.debug("Getting job...") task = Task.get_by_task_id(task_id) if task is None: logger.error("Task not found.") abort(404) return logger.debug("Task retrieved successfully...") jobs = [] for job_id in task.jobs: url = url_for( "task.get_job", task_id=task_id, job_id=str(job_id.id), _external=True ) job = {"id": str(job_id.id), "url": url} jobs.append(job) return jsonify({"taskId": task_id, "jobs": jobs}) @bp.route("/tasks//jobs/", methods=("GET",)) def get_job(task_id, job_id): logger = g.logger.bind(task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: logger.error("Job not found in task.") abort(404) return logger.debug("Job retrieved successfully...") details = job.to_dict( include_log=True, include_error=True, blacklist=current_app.config["ENV_BLACKLISTED_WORDS"].lower().split(","), ) task_url = url_for("task.get_task", task_id=task_id, _external=True) return jsonify({"task": {"id": task_id, "url": task_url}, "job": details}) @bp.route("/tasks//jobs//stop", methods=("POST",)) def stop_job(task_id, job_id): logger = g.logger.bind(task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: logger.error("Job not found in task.") abort(404) return scheduler = Scheduler("jobs", connection=current_app.redis) if ( "enqueued_id" not in job.metadata or job.metadata["enqueued_id"] not in scheduler ): msg = "Could not stop job since it's not recurring." logger.error(msg) abort(400) return msg scheduler.cancel(job.metadata["enqueued_id"]) job.scheduled = False job.save() return jsonify({"taskId": task_id, "job": {"id": job_id}}) PK!++fastlane/cli/__init__.pyfrom fastlane.cli.core import main # NOQA PK!bp%%fastlane/cli/api.pyfrom fastlane.api.app import Application from fastlane.config import Config class APIHandler: def __init__(self, click, host, port, config, log_level): self.config_path = config self.config = None self.click = click self.host = host self.port = port self.log_level = log_level self.load_config() def load_config(self): # self.click.echo(f'Loading configuration from {self.config_path}...') self.config = Config.load(self.config_path) def __call__(self): app = Application(self.config, self.log_level) app.logger.info( "fastlane is runnning.", host=self.host, port=self.port, environment=self.config.ENV, ) app.run(self.host, self.port) PK!JJfastlane/cli/core.pyimport sys from os.path import abspath, dirname, join import click from fastlane.cli.api import APIHandler from fastlane.cli.worker import WorkerHandler ROOT_CONFIG = abspath(join(dirname(__file__), '../config/local.conf')) LEVELS = { 0: 'ERROR', 1: 'WARN', 2: 'INFO', 3: 'DEBUG', } @click.group() def main(): pass @click.command() @click.option('-b', '--host', default='0.0.0.0') @click.option('-p', '--port', default=10000) @click.option('-v', '--verbose', default=0, count=True) @click.option( '-c', '--config', default=ROOT_CONFIG, help='configuration file to use with fastlane') def api(host, port, verbose, config): """Runs fastlane API in the specified host and port.""" log_level = LEVELS.get(verbose, 'ERROR') handler = APIHandler(click, host, port, config, log_level) handler() @click.command() @click.option('-i', '--id', default=None, help='ID for this worker') @click.option( '-j', '--no-jobs', default=False, help='''Process the 'jobs' queue?''', is_flag=True) @click.option( '-m', '--no-monitor', default=False, help='''Process the 'monitor' queue?''', is_flag=True) @click.option('-v', '--verbose', default=0, count=True) @click.option( '-c', '--config', default=ROOT_CONFIG, help='configuration file to use with fastlane') def worker(id, no_jobs, no_monitor, verbose, config): """Runs an fastlane Worker with the specified queue name and starts processing.""" jobs = not no_jobs monitor = not no_monitor if not jobs and not monitor: click.echo('Worker must monitor at least one queue: jobs or monitor') sys.exit(1) log_level = LEVELS.get(verbose, 'ERROR') handler = WorkerHandler(click, id, jobs, monitor, config, log_level) handler() main.add_command(api) main.add_command(worker) PK!2z fastlane/cli/worker.pyimport time from uuid import uuid4 import rq from rq import Connection, Worker from fastlane.api.app import Application from fastlane.config import Config from fastlane.worker.scheduler import QueueScheduler class WorkerHandler: def __init__(self, click, worker_id, jobs, monitor, config, log_level): self.config_path = config self.config = None self.click = click self.worker_id = worker_id self.log_level = log_level self.queues = [] if jobs: self.queues.append("jobs") if monitor: self.queues.append("monitor") self.load_config() def load_config(self): # self.click.echo(f'Loading configuration from {self.config_path}...') self.config = Config.load(self.config_path) def __call__(self): # self.click.echo( # f'Running fastlane worker processing queues {",".join(self.queues)}.') app = Application(self.config, self.log_level) interval = app.config["WORKER_SLEEP_TIME_MS"] / 1000.0 with app.app.app_context(): worker_kw = dict(connection=app.app.redis) if self.worker_id is None: app.logger.warn( "The worker id was not set for this worker and a random one will be used." ) self.worker_id = str(uuid4()) app.logger = app.logger.bind(worker_id=self.worker_id, queues=self.queues) app.app.logger = app.logger worker_kw["name"] = self.worker_id worker = Worker(self.queues, **worker_kw) schedulers = {} with Connection(app.app.redis): for queue in self.queues: schedulers[queue] = QueueScheduler(queue, app=app.app) app.schedulers = schedulers # app.logger.debug('Processing enqueued items...') try: while True: for queue in self.queues: # app.logger.debug("Processing scheduler...", queue=queue) schedulers[queue].move_jobs() # app.logger.debug('Processing queues...') worker.work(burst=True) time.sleep(interval) except rq.worker.StopRequested: app.logger.info("Worker exiting gracefully.") return PK!.rlfastlane/config/__init__.pyfrom derpconf.config import Config Config.allow_environment_variables() Config.define("ENV", "development", "Environment application is running in", "General") Config.define( "SECRET_KEY", "OTNDN0VCRDAtRTMyMS00NUM0LUFFQUYtNEI4QUE4RkFCRjUzCg==", "Secret key to use in flask.", "General", ) Config.define( "REDIS_URL", "redis://localhost:10100/0", "Redis connection string", "Redis" ) Config.define( "WORKER_SLEEP_TIME_MS", 10, "Number of milliseconds that fastlane must sleep before getting the next job", "Worker", ) Config.define( "HARD_EXECUTION_TIMEOUT_SECONDS", 30 * 60, "Number of seconds that fastlane must wait before killing an execution", "Worker", ) Config.define( "EXPONENTIAL_BACKOFF_MIN_MS", 1000, "Number of milliseconds that fastlane must wait before the first retry in each job", "Worker", ) Config.define( "EXPONENTIAL_BACKOFF_FACTOR", 2, "Factor to multiply backoff by in each retry", "Worker", ) Config.define( "EXECUTOR", "fastlane.worker.docker_executor", "Module full name where to find the Executor class", "Worker", ) Config.define( "DOCKER_HOSTS", '[{"match": "", "hosts": ["localhost:2376"], "maxRunning":2}]', "Docker Hosts to add to pool", "Docker Executor", ) Config.define( "MONGODB_CONFIG", """{ "host": "localhost", "port": 10101, "db": "fastlane", "serverSelectionTimeoutMS": 100, "connect": false }""", "MongoDB configuration", "Models", ) Config.define( "ERROR_HANDLERS", ["fastlane.errors.sentry.SentryErrorHandler"], "List of configured error handlers", "Errors", ) Config.define("SENTRY_DSN", "", "Sentry DSN to send errors to", "Errors") Config.define( "ENV_BLACKLISTED_WORDS", "password,key,secret,client_id", "Words that if present in environment variables are redacted", "Errors", ) PK!YYfastlane/config/local.confDEBUG=True DOCKER_HOSTS='[{"match": "", "hosts": ["localhost:1234"], "maxRunning": 2}]' PK!fastlane/errors/__init__.pyclass ErrorReporter: def __init__(self, app): self.app = app def report(self, err, metadata=None): raise NotImplementedError() PK!Scfastlane/errors/sentry.py# import sentry_sdk from raven import Client from fastlane.errors import ErrorReporter class SentryErrorHandler(ErrorReporter): def __init__(self, app): super(SentryErrorHandler, self).__init__(app) self.client = None self.send = True if app.config["SENTRY_DSN"] == "": self.send = False else: app.logger.info( "Sentry configured properly.", sentry_dsn=app.config["SENTRY_DSN"] ) self.client = Client(app.config["SENTRY_DSN"], auto_log_stacks=True) def report(self, err, metadata=None): if not self.send: return if metadata is None: metadata = {} exc_info = (err.__class__, err, err.__traceback__) self.client.captureException(exc_info=exc_info, extra=metadata) # with sentry_sdk.configure_scope() as scope: # scope.level = "error" # for key, val in metadata.items(): # scope.set_extra(key, val) # sentry_sdk.capture_exception(err) PK!EH>>fastlane/models/__init__.pyfrom flask_mongoengine import MongoEngine db = MongoEngine() PK!/BHsfastlane/models/job.pyimport datetime from uuid import uuid4 import mongoengine.errors from mongoengine import ( BooleanField, DateTimeField, DictField, EmbeddedDocumentField, IntField, ListField, ReferenceField, StringField, ) from fastlane.models import db class JobExecution(db.EmbeddedDocument): class Status: enqueued = "enqueued" pulling = "pulling" running = "running" done = "done" failed = "failed" timedout = "timedout" expired = "expired" stopped = "stopped" created_at = DateTimeField(required=True) started_at = DateTimeField(required=False) finished_at = DateTimeField(required=False) execution_id = StringField(required=True) image = StringField(required=True) command = StringField(required=True) status = StringField(required=True, default=Status.enqueued) log = StringField(required=False) error = StringField(required=False) exit_code = IntField(required=False) metadata = DictField(required=False) def to_dict(self, include_log=False, include_error=False): s_at = self.started_at.isoformat() if self.started_at is not None else None f_at = self.finished_at.isoformat() if self.finished_at is not None else None res = { "createdAt": self.created_at.isoformat(), "startedAt": s_at, "finishedAt": f_at, "image": self.image, "command": self.command, "metadata": self.metadata, "status": self.status, "exitCode": self.exit_code, } if self.finished_at is not None: res["finishedAt"] = self.finished_at.isoformat() if include_log: res["log"] = self.log if include_error: res["error"] = self.error return res class Job(db.Document): created_at = DateTimeField(required=True) last_modified_at = DateTimeField(required=True, default=datetime.datetime.now) job_id = StringField(required=True) executions = ListField(EmbeddedDocumentField(JobExecution)) task = ReferenceField( "Task", required=True, reverse_delete_rule=mongoengine.CASCADE ) metadata = DictField(required=False) scheduled = BooleanField(required=True, default=False) def save(self, *args, **kwargs): if self.executions is None: self.executions = [] if not self.created_at: self.created_at = datetime.datetime.utcnow() self.last_modified_at = datetime.datetime.utcnow() return super(Job, self).save(*args, **kwargs) def create_execution(self, image, command): ex_id = str(uuid4()) ex = JobExecution( execution_id=ex_id, image=image, command=command, created_at=datetime.datetime.utcnow(), ) self.executions.append(ex) self.save() return ex def get_metadata(self, blacklist): if "envs" in self.metadata: envs = {} for key, val in self.metadata["envs"].items(): for word in blacklist: if word in key.lower(): val = "*" * len(str(val)) break envs[key] = val self.metadata["envs"] = envs return self.metadata def to_dict( self, include_log=False, include_error=False, include_executions=True, blacklist=None, ): if blacklist is None: blacklist = [] meta = self.get_metadata(blacklist) res = { "createdAt": self.created_at.isoformat(), "lastModifiedAt": self.last_modified_at.isoformat(), "taskId": self.task.task_id, "scheduled": self.scheduled, "executionCount": len(self.executions), "metadata": meta, } if include_executions: executions = [ ex.to_dict(include_log, include_error) for ex in self.executions ] res["executions"] = executions return res @classmethod def get_by_id(cls, task_id, job_id): from fastlane.models.task import Task if task_id is None or task_id == "" or job_id is None or job_id == "": raise RuntimeError( "Task ID and Job ID are required and can't be None or empty." ) t = Task.objects(task_id=task_id).first() j = cls.objects(task=t, job_id=job_id).first() return j def get_execution_by_id(self, execution_id): for job_execution in self.executions: if job_execution.execution_id == execution_id: return job_execution return None PK!KTfastlane/models/task.pyimport datetime import mongoengine.errors from bson.objectid import ObjectId from mongoengine import (BooleanField, DateTimeField, ListField, ReferenceField, StringField) from fastlane.models import db class Task(db.Document): created_at = DateTimeField(required=True) last_modified_at = DateTimeField( required=True, default=datetime.datetime.now) task_id = StringField(required=True) jobs = ListField(ReferenceField('Job')) def _validate(self): errors = {} if self.task_id == "": errors["task_id"] = mongoengine.errors.ValidationError( 'Field is required', field_name="task_id") if errors: message = 'ValidationError (%s:%s) ' % (self._class_name, self.pk) raise mongoengine.errors.ValidationError(message, errors=errors) def save(self, *args, **kwargs): self._validate() if not self.created_at: self.created_at = datetime.datetime.now() self.last_modified_at = datetime.datetime.now() return super(Task, self).save(*args, **kwargs) @classmethod def create_task(cls, task_id): t = cls(task_id=task_id) t.save() return t @classmethod def get_by_task_id(cls, task_id): if task_id is None or task_id == "": raise RuntimeError( "Task ID is required and can't be None or empty.") t = cls.objects(task_id=task_id).no_dereference().first() return t def create_job(self): from fastlane.models.job import Job job_id = ObjectId() j = Job( id=job_id, job_id=str(job_id), ) j.task = self j.save() self.jobs.append(j) self.save() return j PK!*Ӂfastlane/utils.pyimport re from datetime import timedelta regex = re.compile( r'((?P\d+?)h)?((?P\d+?)m)?((?P\d+?)s)?') def parse_time(time_str): if time_str is None: return None parts = regex.match(time_str) if not parts: return None parts = parts.groupdict() time_params = {} for (name, param) in parts.items(): if param: time_params[name] = int(param) return timedelta(**time_params) PK!Jߕfastlane/worker/__init__.pyclass ExecutionResult: class Status: created = 'created' running = 'running' failed = 'failed' done = 'done' def __init__(self, status): self.status = status self.exit_code = None self.started_at = None self.finished_at = None self.log = '' self.error = '' def set_log(self, log): self.log = log PK!"fastlane/worker/docker_executor.pyimport random import re from json import loads import docker from dateutil.parser import parse from fastlane.worker import ExecutionResult # https://docs.docker.com/engine/reference/commandline/ps/#examples # One of created, restarting, running, removing, paused, exited, or dead STATUS = { "created": ExecutionResult.Status.created, "exited": ExecutionResult.Status.done, "dead": ExecutionResult.Status.failed, "running": ExecutionResult.Status.running, } class DockerPool: def __init__(self, docker_hosts): self.docker_hosts = docker_hosts self.max_running = {} self.clients_per_regex = [] self.clients = {} self.__init_clients() def __init_clients(self): for regex, docker_hosts, max_running in self.docker_hosts: clients = (regex, []) self.clients_per_regex.append(clients) self.max_running[regex] = max_running for address in docker_hosts: host, port = address.split(":") cl = docker.DockerClient(base_url=address) self.clients[address] = (host, port, cl) clients[1].append((host, port, cl)) def get_client(self, task_id, host=None, port=None): if host is not None or port is not None: return self.clients.get(f"{host}:{port}") for regex, clients in self.clients_per_regex: if regex is not None and not regex.match(task_id): continue return random.choice(clients) raise RuntimeError(f"Failed to find a docker host for task id {task_id}.") class Executor: def __init__(self, app, pool=None): self.app = app self.pool = pool if pool is None: docker_hosts = [] clusters = loads(self.app.config["DOCKER_HOSTS"]) for cluster in clusters: regex = cluster["match"] if not regex: regex = None else: regex = re.compile(regex) hosts = cluster["hosts"] max_running = cluster.get("maxRunning", 10) docker_hosts.append((regex, hosts, max_running)) self.pool = DockerPool(docker_hosts) def validate_max_running_executions(self, task_id): total_running = 0 max_running = 0 for regex, clients in self.pool.clients_per_regex: if regex is not None and not regex.match(task_id): continue total_running = len(self.get_running_containers(regex)["running"]) max_running = self.pool.max_running[regex] break return total_running == 0 or total_running <= max_running def update_image(self, task, job, execution, image, tag): host, port, cl = self.pool.get_client(task.task_id) cl.images.pull(image, tag=tag) execution.metadata["docker_host"] = host execution.metadata["docker_port"] = port def run(self, task, job, execution, image, tag, command): host, port, cl = None, None, None if "docker_host" in execution.metadata: h = execution.metadata["docker_host"] p = execution.metadata["docker_port"] host, port, cl = self.pool.get_client(task.task_id, h, p) else: host, port, cl = self.pool.get_client(task.task_id) execution.metadata["docker_host"] = host execution.metadata["docker_port"] = port container = cl.containers.run( image=f"{image}:{tag}", name=f"fastlane_worker_{execution.execution_id}", command=command, detach=True, environment=job.metadata.get("envs", {}), ) execution.metadata["container_id"] = container.id return True def stop_job(self, task, job, execution): if "container_id" not in execution.metadata: return h = execution.metadata["docker_host"] p = execution.metadata["docker_port"] host, port, cl = self.pool.get_client(task.task_id, h, p) container = cl.containers.get(execution.metadata["container_id"]) container.stop() def convert_date(self, dt): return parse(dt) def get_result(self, task, job, execution): h = execution.metadata["docker_host"] p = execution.metadata["docker_port"] host, port, cl = self.pool.get_client(task.task_id, h, p) container_id = execution.metadata["container_id"] container = cl.containers.get(container_id) # container.attrs['State'] # {'Status': 'exited', 'Running': False, 'Paused': False, 'Restarting': False, # 'OOMKilled': False, 'Dead': False, 'Pid': 0, 'ExitCode': 0, 'Error': '', # 'StartedAt': '2018-08-27T17:14:14.1951232Z', 'FinishedAt': '2018-08-27T17:14:14.2707026Z'} result = ExecutionResult( STATUS.get(container.status, ExecutionResult.Status.done) ) state = container.attrs["State"] result.exit_code = state["ExitCode"] result.error = state["Error"] result.started_at = self.convert_date(state["StartedAt"]) if ( result.status == ExecutionResult.Status.done or result.status == ExecutionResult.Status.failed ): result.finished_at = self.convert_date(state["FinishedAt"]) result.log = container.logs(stdout=True, stderr=False) if result.error != "": result.error += ( f"\n\nstderr:\n{container.logs(stdout=False, stderr=True)}" ) else: result.error = container.logs(stdout=False, stderr=True) return result def get_running_containers(self, regex=None): running = [] clients = self.pool.clients.values() if regex is not None: for r, cl in self.pool.clients_per_regex: if r is not None and r != regex: continue clients = cl for (host, port, client) in clients: containers = client.containers.list( sparse=False, filters={"status": "running"} ) for container in containers: if not container.name.startswith("fastlane_worker_"): continue running.append((host, port, container.id)) return { "available": [f"{host}:{port}" for (host, port, client) in clients], "running": running, } PK!ʅ[|))fastlane/worker/job.pyimport calendar import math import time from datetime import datetime, timedelta from flask import current_app from rq_scheduler import Scheduler from fastlane.models.job import Job, JobExecution from fastlane.worker import ExecutionResult def validate_max_concurrent(executor, task_id, job, image, command, logger): if not executor.validate_max_running_executions(task_id): logger.debug( "Maximum number of global container executions reached. Enqueuing job execution..." ) args = [task_id, job.id, image, command] result = current_app.job_queue.enqueue(run_job, *args, timeout=-1) job.metadata["enqueued_id"] = result.id job.save() logger.info( "Job execution re-enqueued successfully due to max number of container executions." ) return False return True def validate_expiration(job, ex, logger): d = datetime.utcnow() unixtime = calendar.timegm(d.utctimetuple()) if ( job.metadata.get("expiration") is not None and job.metadata["expiration"] < unixtime ): expiration_utc = datetime.utcfromtimestamp(job.metadata["expiration"]) ex.status = JobExecution.Status.expired ex.error = f"Job was supposed to be done before {expiration_utc.isoformat()}, but was started at {d.isoformat()}." ex.finished_at = datetime.utcnow() job.save() logger.info( "Job execution canceled due to being expired.", job_expiration=job.metadata["expiration"], current_ts=unixtime, ) return False return True def download_image(executor, job, ex, image, tag, command, logger): try: logger.debug("Downloading updated container image...", image=image, tag=tag) before = time.time() executor.update_image(job.task, job, ex, image, tag) ellapsed = time.time() - before logger.info( "Image downloaded successfully.", image=image, tag=tag, ellapsed=ellapsed ) except Exception as err: logger.error("Failed to download image.", error=err) ex.error = str(err) ex.status = JobExecution.Status.failed job.save() current_app.report_error( err, metadata=dict( operation="Downloading Image", task_id=job.task.task_id, job_id=job.id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) return False return True def run_container(executor, job, ex, image, tag, command, logger): logger.debug( "Running command in container...", image=image, tag=tag, command=command ) try: ex.started_at = datetime.utcnow() job.save() before = time.time() executor.run(job.task, job, ex, image, tag, command) ellapsed = time.time() - before logger.info( "Container started successfully.", image=image, tag=tag, command=command, ellapsed=ellapsed, ) except Exception as err: logger.error("Failed to run command", error=err) ex.error = str(err) ex.status = JobExecution.Status.failed job.save() current_app.report_error( err, metadata=dict( operation="Running Container", task_id=job.task.task_id, job_id=job.id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) return False return True def run_job(task_id, job_id, image, command): app = current_app logger = app.logger.bind( task_id=task_id, job_id=job_id, image=image, command=command ) try: executor = app.load_executor() job = Job.get_by_id(task_id, job_id) if job is None: logger.error("Job was not found with task id and job id.") return False if not validate_max_concurrent(executor, task_id, job, image, command, logger): return False tag = "latest" if ":" in image: image, tag = image.split(":") logger = logger.bind(image=image, tag=tag) logger.debug("Changing job status...", status=JobExecution.Status.pulling) ex = job.create_execution(image=image, command=command) ex.status = JobExecution.Status.enqueued job.save() logger.debug( "Job status changed successfully.", status=JobExecution.Status.pulling ) logger = logger.bind(execution_id=ex.execution_id) except Exception as err: logger.error("Failed to create job execution. Skipping job...", error=err) current_app.report_error( err, metadata=dict(task_id=task_id, job_id=job_id, image=image, command=command), ) return False try: if not validate_expiration(job, ex, logger): return False logger.info("Started processing job.") if not download_image(executor, job, ex, image, tag, command, logger): return False if not run_container(executor, job, ex, image, tag, command, logger): return False logger.debug("Changing job status...", status=JobExecution.Status.running) ex.status = JobExecution.Status.running job.save() logger.debug( "Job status changed successfully.", status=JobExecution.Status.running ) app.monitor_queue.enqueue( monitor_job, task_id, job_id, ex.execution_id, timeout=-1 ) return True except Exception as err: logger.error("Failed to run job", error=err) ex.status = JobExecution.Status.failed ex.error = f"Job failed to run with error: {str(err)}" job.save() current_app.report_error( err, metadata=dict( operation="Running Container", task_id=task_id, job_id=job_id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) def monitor_job(task_id, job_id, execution_id): try: app = current_app executor = app.load_executor() job = Job.get_by_id(task_id, job_id) logger = app.logger.bind(task_id=task_id, job_id=job_id) if job is None: logger.error("Failed to retrieve task or job.") return False execution = job.get_execution_by_id(execution_id) result = executor.get_result(job.task, job, execution) logger.info( "Container result obtained.", container_status=result.status, container_exit_code=result.exit_code, ) if result.status in ( ExecutionResult.Status.created, ExecutionResult.Status.running, ): ellapsed = (datetime.utcnow() - execution.started_at).total_seconds() if ellapsed > job.metadata["timeout"]: execution.finished_at = datetime.utcnow() execution.status = JobExecution.Status.timedout execution.error = f"Job execution timed out after {ellapsed} seconds." executor.stop_job(job.task, job, execution) logger.debug( "Job execution timed out. Storing job details in mongo db.", status=execution.status, ellapsed=ellapsed, error=result.error, ) job.save() logger.info("Job execution timed out.", status=execution.status) return False scheduler = Scheduler("monitor", connection=app.redis) logger.info( "Job has not finished. Retrying monitoring in the future.", container_status=result.status, seconds=1, ) interval = timedelta(seconds=5) scheduler.enqueue_in(interval, monitor_job, task_id, job_id, execution_id) return True if ( result.exit_code != 0 and "retry_count" in job.metadata and job.metadata["retry_count"] < job.metadata["retries"] ): retry_logger = logger.bind( exit_code=result.exit_code, retry_count=job.metadata["retry_count"], retries=job.metadata["retries"], ) retry_logger.debug("Job failed. Enqueuing job retry...") job.metadata["retry_count"] += 1 scheduler = Scheduler("jobs", connection=current_app.redis) args = [task_id, job_id, execution.image, execution.command] factor = app.config["EXPONENTIAL_BACKOFF_FACTOR"] min_backoff = app.config["EXPONENTIAL_BACKOFF_MIN_MS"] / 1000.0 delta = timedelta(seconds=min_backoff) if job.metadata["retries"] > 0: delta = timedelta( seconds=math.pow(factor, job.metadata["retry_count"]) * min_backoff ) dt = datetime.utcnow() + delta enqueued = scheduler.enqueue_at(dt, run_job, *args) job.metadata["enqueued_id"] = enqueued.id job.save() retry_logger.info("Job execution enqueued successfully.") # still need to finish current execution as the retry # will be a new execution execution.finished_at = datetime.utcnow() execution.status = JobExecution.Status.done execution.exit_code = result.exit_code execution.log = result.log.decode("utf-8") execution.error = result.error.decode("utf-8") logger.debug( "Job finished. Storing job details in mongo db.", status=execution.status, log=result.log, error=result.error, ) job.save() logger.info("Job details stored in mongo db.", status=execution.status) return True except Exception as err: logger.error("Failed to monitor job", error=err) current_app.report_error( err, metadata=dict( operation="Monitoring Job", task_id=task_id, job_id=job_id, execution_id=execution_id, ), ) raise err PK!L'=((fastlane/worker/scheduler.pyfrom rq_scheduler import Scheduler class QueueScheduler: def __init__(self, queue_name, app): self.app = app self.logger = self.app.logger.bind(queue_name=queue_name) self.scheduler = Scheduler(queue_name=queue_name, connection=app.redis) def move_jobs(self): if self.scheduler.acquire_lock(): try: jobs = self.scheduler.get_jobs() self.logger.debug( "Lock acquired. Enqueuing scheduled jobs...", jobs=jobs ) self.scheduler.enqueue_jobs() finally: self.scheduler.remove_lock() else: self.logger.debug( "Lock could not be acquired. Enqueuing scheduled jobs skipped. Trying again next cycle." ) PK!H+l.C)fastlane-0.1.9.dist-info/entry_points.txtN+I/N.,()JK,.IK1s2r3rrPK!DE"'33 fastlane-0.1.9.dist-info/LICENSEMIT License Copyright (c) 2018 Bernardo Heynemann Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HWYfastlane-0.1.9.dist-info/WHEEL A н#Z;/"b&F]xzwC;dhfCSTֻ0*Ri.4œh6-]{H, JPK!HM]G!fastlane-0.1.9.dist-info/METADATAQo0~$la(AtZi.Zm{vƕgVp?  )5OVBCM @H^rl{=D9"(4,y${#j%7F݉.g܆~[O`QXi @%uBo\U3+<]QήX̮;{e/ 5l]\<BO}Y6_Fb& v\G(c,O-,ؕ6:ʋzs;%+X-.bM ͣႁɚj+Gk֜f^Am~? O9n`­zs?hn@~:n8N=P'QdJVeۡ7q- quEy_PK!HeUZV fastlane-0.1.9.dist-info/RECORDuǖX}? d͢8!F8!xӷjfZq|čGFTG ; ۠10GJ7iͭW=_ܖQDKz4jIOF_;Ua_O*8%+#Dp*&Uid0kfCGpۑ>Y[La4#̫,:m>7984~sL #8EӼ/e7P=y 2.BNŮ5T"[-A(MП)m N!6FV}>#yi5_: -S a8[R:&&n~[1׊#l)RpLIb#?]NJ0}k8!yW.N_{Ր# U>q8lZ]"\oJhZZ<{Pglm ;9 ȤNútf?Q~cաBL#HvǵMZd5 ="aRM74bs!NR`*NZYbm?^~Y]ۑAKܕFMa"%ͼZl@̮H0 0~WU:hFW_/n0*8'q2r4l; &D\iG4h̩"18^VtjQ;ZG{EzFR(PU 8ep&XݤRۂۤ`]7@{]}v?oOgWdǮ4:d^9IثJs$A_NIy UkAC4~f}']<~DNt% |=2 o5y d7 3Ј+Dл89YGkf|~ p_,=N2(ug/Mޏfastlane/api/task.pyPK!++Gfastlane/cli/__init__.pyPK!bp%%Hfastlane/cli/api.pyPK!JJ^Kfastlane/cli/core.pyPK!2z Rfastlane/cli/worker.pyPK!.rl\fastlane/config/__init__.pyPK!YY|dfastlane/config/local.confPK! efastlane/errors/__init__.pyPK!Scefastlane/errors/sentry.pyPK!EH>>0jfastlane/models/__init__.pyPK!/BHsjfastlane/models/job.pyPK!KT}fastlane/models/task.pyPK!*Ӂfastlane/utils.pyPK!Jߕfastlane/worker/__init__.pyPK!"fastlane/worker/docker_executor.pyPK!ʅ[|))ɢfastlane/worker/job.pyPK!L'=((fastlane/worker/scheduler.pyPK!H+l.C)fastlane-0.1.9.dist-info/entry_points.txtPK!DE"'33 fastlane-0.1.9.dist-info/LICENSEPK!HWYfastlane-0.1.9.dist-info/WHEELPK!HM]G!fastlane-0.1.9.dist-info/METADATAPK!HeUZV fastlane-0.1.9.dist-info/RECORDPKV