PK! qfastlane/__init__.py__version__ = '0.1.0' PK!fastlane/api/__init__.pyPK!*//fastlane/api/app.py# Standard Library import logging import sys from json import loads # 3rd Party import rq_dashboard import structlog from flask import Flask from flask_redis import FlaskRedis from flask_redis_sentinel import SentinelExtension from flask_sockets import Sockets from gevent import pywsgi from geventwebsocket.handler import WebSocketHandler from structlog.processors import (JSONRenderer, StackInfoRenderer, TimeStamper, format_exc_info) from structlog.stdlib import add_log_level, add_logger_name, filter_by_level # Fastlane import fastlane.api.metrics as metrics import fastlane.api.rqb as rqb from fastlane.api.enqueue import bp as enqueue from fastlane.api.healthcheck import bp as healthcheck from fastlane.api.status import bp as status from fastlane.api.stream import bp as stream from fastlane.api.task import bp as task_api from fastlane.models import db class Application: def __init__(self, config, log_level, testing=False): self.config = config self.log_level = log_level self.create_app(testing) def create_app(self, testing): self.app = Flask("fastlane") self.app.testing = testing self.app.config.from_object(rq_dashboard.default_settings) self.app.error_handlers = [] for key in self.config.items.keys(): self.app.config[key] = self.config[key] self.app.config.DEBUG = self.config.DEBUG self.app.config.ENV = self.config.ENV self.app.original_config = self.config self.app.log_level = self.log_level self.configure_logging() self.connect_redis() self.connect_queue() self.connect_db() self.load_executor() self.load_error_handlers() metrics.init_app(self.app) self.app.register_blueprint(metrics.bp) self.app.register_blueprint(healthcheck) self.app.register_blueprint(enqueue) self.app.register_blueprint(task_api) self.app.register_blueprint(status) # self.app.register_blueprint(rq_dashboard.blueprint, url_prefix="/rq") sockets = Sockets(self.app) sockets.register_blueprint(stream) def configure_logging(self): if self.app.testing: structlog.reset_defaults() disabled = [ "docker.utils.config", "docker.auth", "docker.api.build", "docker.api.swarm", "docker.api.image", "rq.worker", "werkzeug", "requests", "urllib3", ] for logger in disabled: log = logging.getLogger(logger) log.setLevel(logging.ERROR) log.disabled = True self.app.logger.disabled = True logging.basicConfig( level=self.log_level, stream=sys.stdout, format="%(message)s" ) chain = [ filter_by_level, add_log_level, add_logger_name, TimeStamper(fmt="iso"), StackInfoRenderer(), format_exc_info, JSONRenderer(indent=1, sort_keys=True), ] log = structlog.wrap_logger( logging.getLogger(__name__), processors=chain, context_class=dict, wrapper_class=structlog.stdlib.BoundLogger, # cache_logger_on_first_use=True, ) self.logger = log self.app.logger = self.logger def connect_redis(self): self.logger.debug("Connecting to redis...") if self.app.testing: self.logger.info("Configuring Fake Redis...") import fakeredis self.app.redis = FlaskRedis.from_custom_provider(fakeredis.FakeStrictRedis) self.app.redis.connect = self._mock_redis(True) self.app.redis.disconnect = self._mock_redis(False) self.app.redis.init_app(self.app) elif self.app.config["REDIS_URL"].startswith("redis+sentinel"): self.logger.info( "Configuring Redis Sentinel...", redis_url=self.app.config["REDIS_URL"] ) redis_sentinel = SentinelExtension() redis_connection = redis_sentinel.default_connection redis_sentinel.init_app(self.app) self.app.redis = redis_connection else: self.logger.info( "Configuring Redis...", redis_url=self.app.config["REDIS_URL"] ) self.app.redis = FlaskRedis() self.app.redis.init_app(self.app) self.logger.info("Connection to redis successful") def connect_queue(self): self.app.queue = None self.app.register_blueprint(rqb.bp) rqb.init_app(self.app) def connect_db(self): self.app.config["MONGODB_SETTINGS"] = loads(self.app.config["MONGODB_CONFIG"]) self.logger.info( "Connecting to MongoDB...", mongo=self.app.config["MONGODB_SETTINGS"] ) db.init_app(self.app) self.logger.info( "Connected to MongoDB successfully.", mongo=self.app.config["MONGODB_SETTINGS"], ) def load_executor(self): name = self.config.EXECUTOR parts = name.split(".") executor_module = __import__(".".join(parts), None, None, [parts[-1]], 0) self.app.executor_module = executor_module bp = getattr(executor_module, "bp", None) if bp is not None: self.app.register_blueprint(bp) self.app.executor = self.app.executor_module.Executor(self.app) def load_error_handlers(self): self.app.error_handlers = [] for handler_name in self.app.config["ERROR_HANDLERS"]: parts = handler_name.split(".") obj = __import__(".".join(parts[:-1]), None, None, [parts[-1]], 0) obj = getattr(obj, parts[-1]) self.app.error_handlers.append(obj(self.app)) self.app.report_error = self.report_error def report_error(self, err, metadata=None): for handler in self.app.error_handlers: handler.report(err, metadata) def run(self, host, port): server = pywsgi.WSGIServer( (host, port), self.app, handler_class=WebSocketHandler ) server.serve_forever() def _mock_redis(self, connected): def handle(): self.app.redis._redis_client.connected = connected return handle PK!fastlane/api/enqueue.py# Standard Library from datetime import datetime, timezone # 3rd Party from flask import (Blueprint, current_app, g, jsonify, make_response, request, url_for) from rq_scheduler import Scheduler # Fastlane from fastlane.models.task import Task from fastlane.utils import parse_time from fastlane.worker.job import run_job try: from ujson import loads except ImportError: from json import loads bp = Blueprint("enqueue", __name__) def get_details(): details = request.get_json() if details is None and request.get_data(): details = loads(request.get_data()) return details def create_job(details, task, logger): logger.debug("Creating job...") retries = details.get("retries", 0) expiration = details.get("expiration") # people to notify when job succeeds, fails or finishes notify = details.get("notify", {"succeeds": [], "fails": [], "finishes": []}) hard_limit = current_app.config["HARD_EXECUTION_TIMEOUT_SECONDS"] timeout = details.get("timeout", hard_limit) timeout = min( timeout, hard_limit ) # ensure jobs can't specify more than hard limit j = task.create_job() j.metadata["retries"] = retries j.metadata["notify"] = notify j.metadata["retry_count"] = 0 j.metadata["expiration"] = expiration j.metadata["timeout"] = timeout j.metadata["envs"] = details.get("envs", {}) j.save() logger.debug("Job created successfully...", job_id=str(j.id)) return j def enqueue_job(task, job, image, command, start_at, start_in, cron, logger): scheduler = Scheduler("jobs", connection=current_app.redis) args = [task.task_id, str(job.id), image, command] queue_job_id = None if start_at is not None: dt = datetime.utcfromtimestamp(int(start_at)) logger.debug("Enqueuing job execution in the future...", start_at=dt) result = scheduler.enqueue_at(dt, run_job, *args) job.metadata["enqueued_id"] = str(result.id) queue_job_id = str(result.id) job.save() logger.info("Job execution enqueued successfully.", start_at=dt) elif start_in is not None: dt = datetime.now(tz=timezone.utc) + start_in logger.debug("Enqueuing job execution in the future...", start_at=dt) result = scheduler.enqueue_at(dt, run_job, *args) job.metadata["enqueued_id"] = str(result.id) queue_job_id = str(result.id) job.save() logger.info("Job execution enqueued successfully.", start_at=dt) elif cron is not None: logger.debug("Enqueuing job execution using cron...", cron=cron) result = scheduler.cron( cron, # A cron string (e.g. "0 0 * * 0") func=run_job, args=args, repeat=None, queue_name="jobs", ) job.metadata["enqueued_id"] = str(result.id) queue_job_id = str(result.id) job.metadata["cron"] = cron job.scheduled = True job.save() logger.info("Job execution enqueued successfully.", cron=cron) else: logger.debug("Enqueuing job execution...") result = current_app.job_queue.enqueue(run_job, *args, timeout=-1) queue_job_id = result.id job.metadata["enqueued_id"] = result.id job.save() logger.info("Job execution enqueued successfully.") return queue_job_id @bp.route("/tasks/", methods=("POST",)) def create_task(task_id): details = get_details() if details is None or details == "": msg = "Failed to enqueue task because JSON body could not be parsed." g.logger.warn(msg) return make_response(msg, 400) image = details.get("image", None) command = details.get("command", None) if image is None or command is None: return make_response("image and command must be filled in the request.", 400) logger = g.logger.bind(task_id=task_id, image=image, command=command) logger.debug("Creating task...") task = Task.objects(task_id=task_id).modify(task_id=task_id, upsert=True, new=True) logger.info("Task created successfully.") j = create_job(details, task, logger) job_id = str(j.id) queue_job_id = None start_at = details.get("startAt", None) start_in = parse_time(details.get("startIn", None)) cron = details.get("cron", None) if len(list(filter(lambda item: item is not None, (start_at, start_in, cron)))) > 1: return make_response( "Only ONE of 'startAt', 'startIn' and 'cron' should be in the request.", 400 ) queue_job_id = enqueue_job( task, j, image, command, start_at, start_in, cron, logger ) job_url = url_for("task.get_job", task_id=task_id, job_id=job_id, _external=True) task_url = url_for("task.get_task", task_id=task_id, _external=True) return jsonify( { "taskId": task_id, "jobId": job_id, "queueJobId": queue_job_id, "jobUrl": job_url, "taskUrl": task_url, } ) PK!fastlane/api/healthcheck.py# 3rd Party from flask import Blueprint, current_app, jsonify # Fastlane from fastlane.models import db bp = Blueprint("healthcheck", __name__, url_prefix="/healthcheck") @bp.route("/", methods=("GET",)) def healthcheck(): status = {"redis": True, "mongo": True, "errors": []} try: res = current_app.redis.ping() assert res, f"Connection to redis failed ({res})." except Exception as err: status["errors"].append({"source": "redis", "message": str(err)}) status["redis"] = False try: database = current_app.config["MONGODB_SETTINGS"]["db"] conn = getattr(db.connection, database) res = tuple(conn.jobs.find().limit(1)) assert isinstance(res, (tuple,)), f"Connection to mongoDB failed ({res})." except Exception as err: status["errors"].append({"source": "mongo", "message": str(err)}) status["mongo"] = False code = 200 if len(status["errors"]) > 0: code = 500 return jsonify(status), code PK!/fastlane/api/metrics.py# Standard Library from datetime import datetime from uuid import uuid4 # 3rd Party from flask import Blueprint, current_app, g, request bp = Blueprint('metrics', __name__) def init_app(app): @app.before_request def start_timer(): request_id = request.headers.get('X-Request-ID', str(uuid4())) g.logger = current_app.logger.bind(request_id=request_id) g.request_id = request_id g.start = datetime.now() @app.after_request def log_request(response): if request.path == '/favicon.ico': return response now = datetime.now() duration = int(round((now - g.start).microseconds / 1000, 2)) ip = request.headers.get('X-Forwarded-For', request.remote_addr) host = request.host.split(':', 1)[0] log_params = { 'method': request.method, 'path': request.path, 'status': response.status_code, 'duration': duration, 'ip': ip, 'host': host, } request_id = g.request_id if request_id: log_params['request_id'] = request_id if response.status_code < 400: current_app.logger.info('Request succeeded', **log_params) elif response.status_code < 500: current_app.logger.info('Bad Request', **log_params) else: current_app.logger.error('Internal Server Error', **log_params) return response PK!o/fastlane/api/rqb.py# 3rd Party from flask import Blueprint from rq import Queue from rq_scheduler import Scheduler bp = Blueprint("rq", __name__) class JobQueue: def __init__(self, queue_name, app): self.app = app self.queue_name = queue_name self._queue = None self._scheduler = None def enqueue_in(self, *args, **kw): self.app.logger.info("Scheduling execution for the future.", **kw) return self._scheduler.enqueue_in(*args, **kw) def enqueue_at(self, *args, **kw): self.app.logger.info("Scheduling execution for a specific timestamp.", **kw) return self._scheduler.enqueue_at(*args, **kw) def enqueue(self, *args, **kw): return self.queue.enqueue(*args, **kw) @property def queue(self): if self._queue is None: self._queue = Queue(self.queue_name, connection=self.app.redis) return self._queue @property def scheduler(self): if self._scheduler is None: self._scheduler = Scheduler(queue=self._queue) return self._scheduler def init_app(app): for qn in ["jobs", "monitor", "notify"]: key = qn.rstrip("s") setattr(app, "%s_queue" % key, JobQueue(qn, app)) PK!iq  fastlane/api/status.py# Standard Library from datetime import datetime # 3rd Party import croniter from flask import Blueprint, current_app, jsonify, url_for # Fastlane from fastlane.models.job import Job from fastlane.models.task import Task bp = Blueprint("status", __name__, url_prefix="/status") @bp.route("/", methods=("GET",)) def status(): executor = current_app.executor status = {"hosts": [], "containers": {"running": []}} containers = executor.get_running_containers() blacklist = executor.get_blacklisted_hosts() for host, port, container_id in containers["running"]: status["containers"]["running"].append( {"host": host, "port": port, "id": container_id} ) for host in containers["available"]: status["hosts"].append({"host": host, "blacklisted": host in blacklist}) status["queues"] = {"jobs": {}, "monitor": {}, "error": {}} for queue in ["jobs", "monitor", "error"]: jobs_queue_size = current_app.redis.llen(f"rq:queue:{queue}") status["queues"][queue]["length"] = jobs_queue_size status["tasks"] = {"count": Task.objects.count()} status["jobs"] = {"count": Job.objects.count()} status["jobs"]["scheduled"] = [] scheduled_jobs = Job.objects(scheduled=True).all() for job in scheduled_jobs: j = job.to_dict(include_executions=False) itr = croniter.croniter(job.metadata["cron"], datetime.utcnow()) j["nextScheduledAt"] = itr.get_next(datetime).isoformat() task_id = job.task.task_id job_url = url_for( "task.get_job", task_id=task_id, job_id=str(job.id), _external=True ) j["url"] = job_url stop_job_url = url_for( "task.stop_job", task_id=task_id, job_id=str(job.id), _external=True ) j["stopUrl"] = stop_job_url task_url = url_for("task.get_task", task_id=task_id, _external=True) del j["taskId"] j["task"] = {"id": task_id, "url": task_url} status["scheduled"].append(j) return jsonify(status), 200 PK!  fastlane/api/stream.py# Standard Library import time from multiprocessing import Process # 3rd Party from flask import Blueprint, current_app # Fastlane from fastlane.models.job import Job, JobExecution bp = Blueprint("stream", __name__) def stream_log(executor, task_id, job, ex, ws): if not ws.closed and ex.status == JobExecution.Status.done: ws.send("EXIT CODE: %s\n" % ex.exit_code) ws.send(ex.log) ws.close(message="wsdone") return if not ws.closed and ex.status == JobExecution.Status.failed: ws.send("EXIT CODE: %s\n" % ex.exit_code) ws.send(ex.error) ws.close(message="wsdone") return if not ws.closed and ex.status != JobExecution.Status.running: ws.close(message="wsretry") return for log in executor.get_streaming_logs(task_id, job, ex): ws.send(log) ws.close(message="wsdone") @bp.route("/tasks//jobs//ws") def ws(ws, task_id, job_id): executor = current_app.executor logger = current_app.logger.bind(task_id=task_id, job_id=job_id) job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: logger.error("Job not found in task.") ws.close() return ex = job.get_last_execution() if ex is None: logger.error("No executions found in job.") ws.close(message="wsretry") return p = Process(target=stream_log, args=(executor, task_id, job, ex, ws)) p.start() while not ws.closed: time.sleep(10) p.terminate() PK!9fastlane/api/task.py# Standard Library import time # 3rd Party from flask import (Blueprint, Response, abort, current_app, g, jsonify, render_template, request, url_for) from rq_scheduler import Scheduler # Fastlane from fastlane.models.job import Job, JobExecution from fastlane.models.task import Task from fastlane.worker.job import run_job bp = Blueprint("task", __name__) @bp.route("/tasks/", methods=("GET",)) def get_task(task_id): logger = g.logger.bind(operation="get_task", task_id=task_id) logger.debug("Getting job...") task = Task.get_by_task_id(task_id) if task is None: logger.error("Task not found.") abort(404) return logger.debug("Task retrieved successfully...") jobs = [] for job_id in task.jobs: url = url_for( "task.get_job", task_id=task_id, job_id=str(job_id.id), _external=True ) job = {"id": str(job_id.id), "url": url} jobs.append(job) return jsonify({"taskId": task_id, "jobs": jobs}) @bp.route("/tasks//jobs/", methods=("GET",)) def get_job(task_id, job_id): logger = g.logger.bind(operation="get_job", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: logger.error("Job not found in task.") abort(404) return logger.debug("Job retrieved successfully...") details = job.to_dict( include_log=True, include_error=True, blacklist=current_app.config["ENV_BLACKLISTED_WORDS"].lower().split(","), ) task_url = url_for("task.get_task", task_id=task_id, _external=True) return jsonify({"task": {"id": task_id, "url": task_url}, "job": details}) @bp.route("/tasks//jobs//stop", methods=("POST",)) def stop_job(task_id, job_id): logger = g.logger.bind(operation="stop", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: logger.error("Job not found in task.") abort(404) return execution = job.get_last_execution() if execution is not None and execution.status == JobExecution.Status.running: logger.debug("Stopping current execution...") executor = current_app.executor executor.stop_job(job.task, job, execution) logger.debug("Current execution stopped.") scheduler = Scheduler("jobs", connection=current_app.redis) if "enqueued_id" in job.metadata and job.metadata["enqueued_id"] in scheduler: scheduler.cancel(job.metadata["enqueued_id"]) job.scheduled = False job.save() logger.debug("Job stopped.") return get_job_summary(task_id, job_id) @bp.route("/tasks//jobs//retry", methods=("POST",)) def retry_job(task_id, job_id): logger = g.logger.bind(operation="retry", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: logger.error("Job not found in task.") abort(404) return execution = job.get_last_execution() if execution is None: logger.error("No execution yet to retry.") abort(Response(response="No execution yet to retry.", status=400)) return scheduler = Scheduler("jobs", connection=current_app.redis) if "enqueued_id" in job.metadata and job.metadata["enqueued_id"] in scheduler: msg = "Can't retry a scheduled job." logger.error(msg) abort(Response(response=msg, status=400)) return if execution.status == JobExecution.Status.running: logger.debug("Stopping current execution...") executor = current_app.executor executor.stop_job(job.task, job, execution) logger.debug("Current execution stopped.") execution.status = JobExecution.Status.failed job.save() logger.debug("Enqueuing job execution...") args = [task_id, job_id, execution.image, execution.command] result = current_app.job_queue.enqueue(run_job, *args, timeout=-1) job.metadata["enqueued_id"] = result.id job.save() logger.info("Job execution enqueued successfully.") return get_job_summary(task_id, job_id) def get_job_summary(task_id, job_id): job_url = url_for("task.get_job", task_id=task_id, job_id=job_id, _external=True) task_url = url_for("task.get_task", task_id=task_id, _external=True) return jsonify( {"taskId": task_id, "jobId": job_id, "jobUrl": job_url, "taskUrl": task_url} ) @bp.route("/tasks//jobs//stream") def stream_job(task_id, job_id): if request.url.startswith("https"): protocol = "wss" else: protocol = "ws" url = url_for("task.stream_job", task_id=task_id, job_id=job_id, external=True) url = "/".join(url.split("/")[:-1]) ws_url = "%s://%s/%s/ws" % (protocol, request.host.rstrip("/"), url.lstrip("/")) return render_template("stream.html", task_id=task_id, job_id=job_id, ws_url=ws_url) def get_response(task_id, job_id, get_data_fn): logger = g.logger.bind(operation="get_response", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: logger.error("Job not found in task.") abort(404) return if not job.executions: logger.error("No executions found in job.") abort(400) return execution = job.get_last_execution() headers = {"Fastlane-Exit-Code": str(execution.exit_code)} return Response(headers=headers, response=get_data_fn(execution), status=200) @bp.route("/tasks//jobs//stdout") def stdout(task_id, job_id): return get_response(task_id, job_id, lambda execution: execution.log) @bp.route("/tasks//jobs//stderr") def stderr(task_id, job_id): return get_response(task_id, job_id, lambda execution: execution.error) PK!++fastlane/cli/__init__.pyfrom fastlane.cli.core import main # NOQA PK!(7fastlane/cli/api.py""" isort:skip_file """ # 3rd Party from gevent import monkey # Must be before other imports monkey.patch_all() # isort:skip # Fastlane from fastlane.api.app import Application # NOQA from fastlane.config import Config # NOQA class APIHandler: def __init__(self, click, host, port, config, log_level): self.config_path = config self.config = None self.click = click self.host = host self.port = port self.log_level = log_level self.load_config() def load_config(self): # self.click.echo(f'Loading configuration from {self.config_path}...') self.config = Config.load(self.config_path) def __call__(self): app = Application(self.config, self.log_level) app.logger.info( "fastlane is runnning.", host=self.host, port=self.port, environment=self.config.ENV, ) app.run(self.host, self.port) PK!3X fastlane/cli/core.py# Standard Library import sys from os.path import abspath, dirname, join # 3rd Party import click # Fastlane from fastlane.cli.api import APIHandler from fastlane.cli.prune import PruneHandler from fastlane.cli.worker import WorkerHandler ROOT_CONFIG = abspath(join(dirname(__file__), "../config/local.conf")) LEVELS = {0: "ERROR", 1: "WARN", 2: "INFO", 3: "DEBUG"} @click.group() def main(): pass @click.command() @click.option("-b", "--host", default="0.0.0.0") @click.option("-p", "--port", default=10000) @click.option("-v", "--verbose", default=0, count=True) @click.option( "-c", "--config", default=ROOT_CONFIG, help="configuration file to use with fastlane", ) def api(host, port, verbose, config): """Runs fastlane API in the specified host and port.""" log_level = LEVELS.get(verbose, "ERROR") handler = APIHandler(click, host, port, config, log_level) handler() @click.command() @click.option("-i", "--id", default=None, help="ID for this worker") @click.option( "-j", "--no-jobs", default=False, help="""Process the 'jobs' queue?""", is_flag=True ) @click.option( "-m", "--no-monitor", default=False, help="""Process the 'monitor' queue?""", is_flag=True, ) @click.option( "-n", "--no-notify", default=False, help="""Process the 'notify' queue?""", is_flag=True, ) @click.option("-v", "--verbose", default=0, count=True) @click.option( "-c", "--config", default=ROOT_CONFIG, help="configuration file to use with fastlane", ) def worker(id, no_jobs, no_monitor, no_notify, verbose, config): """Runs an fastlane Worker with the specified queue name and starts processing.""" jobs = not no_jobs monitor = not no_monitor notify = not no_notify if not jobs and not monitor and not notify: click.echo("Worker must monitor at least one queue: jobs, monitor or notify") sys.exit(1) log_level = LEVELS.get(verbose, "ERROR") handler = WorkerHandler(click, id, jobs, monitor, notify, config, log_level) handler() @click.command() def config(): """Prints the default config for fastlane""" from fastlane.config import Config print(Config.get_config_text()) @click.command() @click.option("-v", "--verbose", default=0, count=True) @click.option( "-c", "--config", default=ROOT_CONFIG, help="configuration file to use with fastlane", ) def prune(verbose, config): """Removes all containers that have already been processed by fastlane.""" log_level = LEVELS.get(verbose, "ERROR") handler = PruneHandler(click, config, log_level) handler() main.add_command(api) main.add_command(worker) main.add_command(config) main.add_command(prune) PK!"okfastlane/cli/prune.py# Fastlane from fastlane.api.app import Application from fastlane.config import Config class PruneHandler: def __init__(self, click, config, log_level): self.config_path = config self.config = None self.click = click self.log_level = log_level self.load_config() def load_config(self): self.config = Config.load(self.config_path) def __call__(self): app = Application(self.config, self.log_level).app app.logger.info(f"Running fastlane prune...") with app.app_context(): removed = app.executor.remove_done() app.logger.info(f"Prune done.", removed=removed) PK!*  fastlane/cli/worker.py# Standard Library import time from uuid import uuid4 # 3rd Party import rq from rq import Connection, Worker # Fastlane from fastlane.api.app import Application from fastlane.config import Config from fastlane.worker.scheduler import QueueScheduler class WorkerHandler: def __init__(self, click, worker_id, jobs, monitor, notify, config, log_level): self.config_path = config self.config = None self.click = click self.worker_id = worker_id self.log_level = log_level self.queues = [] if jobs: self.queues.append("jobs") if monitor: self.queues.append("monitor") if notify: self.queues.append("notify") self.load_config() def load_config(self): # self.click.echo(f'Loading configuration from {self.config_path}...') self.config = Config.load(self.config_path) def __call__(self): # self.click.echo( # f'Running fastlane worker processing queues {",".join(self.queues)}.') app = Application(self.config, self.log_level) app.logger.info( f'Running fastlane worker processing queues {",".join(self.queues)}.' ) interval = app.config["WORKER_SLEEP_TIME_MS"] / 1000.0 with app.app.app_context(): worker_kw = dict(connection=app.app.redis) if self.worker_id is None: app.logger.warn( "The worker id was not set for this worker and a random one will be used." ) self.worker_id = str(uuid4()) app.logger = app.logger.bind(worker_id=self.worker_id, queues=self.queues) app.app.logger = app.logger worker_kw["name"] = self.worker_id worker = Worker(self.queues, **worker_kw) schedulers = {} with Connection(app.app.redis): for queue in self.queues: schedulers[queue] = QueueScheduler(queue, app=app.app) app.schedulers = schedulers # app.logger.debug('Processing enqueued items...') try: while True: for queue in self.queues: # app.logger.debug("Processing scheduler...", queue=queue) schedulers[queue].move_jobs() # app.logger.debug('Processing queues...') worker.work(burst=True) time.sleep(interval) except rq.worker.StopRequested: app.logger.info("Worker exiting gracefully.") return PK!IH H fastlane/config/__init__.pyfrom derpconf.config import Config Config.allow_environment_variables() Config.define("ENV", "development", "Environment application is running in", "General") Config.define( "SECRET_KEY", "OTNDN0VCRDAtRTMyMS00NUM0LUFFQUYtNEI4QUE4RkFCRjUzCg==", "Secret key to use in flask.", "General", ) Config.define( "REDIS_URL", "redis://localhost:10100/0", "Redis connection string", "Redis" ) Config.define( "WORKER_SLEEP_TIME_MS", 10, "Number of milliseconds that fastlane must sleep before getting the next job", "Worker", ) Config.define( "HARD_EXECUTION_TIMEOUT_SECONDS", 30 * 60, "Number of seconds that fastlane must wait before killing an execution", "Worker", ) Config.define( "EXPONENTIAL_BACKOFF_MIN_MS", 1000, "Number of milliseconds that fastlane must wait before the first retry in each job", "Worker", ) Config.define( "EXPONENTIAL_BACKOFF_FACTOR", 2, "Factor to multiply backoff by in each retry", "Worker", ) Config.define( "EXECUTOR", "fastlane.worker.docker_executor", "Module full name where to find the Executor class", "Worker", ) Config.define( "DOCKER_HOSTS", '[{"match": "", "hosts": ["localhost:2376"], "maxRunning":2}]', "Docker Hosts to add to pool", "Docker Executor", ) Config.define( "MONGODB_CONFIG", """{ "host": "mongodb://localhost:10101/fastlane", "db": "fastlane", "serverSelectionTimeoutMS": 100, "connect": false }""", "MongoDB configuration", "Models", ) Config.define( "ERROR_HANDLERS", ["fastlane.errors.sentry.SentryErrorHandler"], "List of configured error handlers", "Errors", ) Config.define("SENTRY_DSN", "", "Sentry DSN to send errors to", "Errors") Config.define( "ENV_BLACKLISTED_WORDS", "password,key,secret,client_id", "Words that if present in environment variables are redacted", "Errors", ) Config.define( "SMTP_USE_SSL", False, "Wheter the SMTP server used to send notifications uses SSL", "Email", ) Config.define( "SMTP_HOST", None, "Host of the SMTP server used to send notifications", "Email" ) Config.define( "SMTP_PORT", None, "Port of the SMTP server used to send notifications", "Email" ) Config.define( "SMTP_USER", None, "User of the SMTP server used to send notifications", "Email" ) Config.define( "SMTP_PASSWORD", None, "Password of the SMTP server used to send notifications", "Email", ) Config.define( "SMTP_FROM", None, "From E-mail of the SMTP server used to send notifications", "Email", ) PK!}wwfastlane/config/local.confDEBUG=True SERVER_NAME='localhost:10000' DOCKER_HOSTS='[{"match": "", "hosts": ["localhost:1234"], "maxRunning": 2}]' PK!fastlane/errors/__init__.pyclass ErrorReporter: def __init__(self, app): self.app = app def report(self, err, metadata=None): raise NotImplementedError() PK!t22fastlane/errors/sentry.py# import sentry_sdk # 3rd Party from raven import Client # Fastlane from fastlane.errors import ErrorReporter class SentryErrorHandler(ErrorReporter): def __init__(self, app): super(SentryErrorHandler, self).__init__(app) self.client = None self.send = True if app.config["SENTRY_DSN"] == "": self.send = False else: app.logger.info( "Sentry configured properly.", sentry_dsn=app.config["SENTRY_DSN"] ) self.client = Client(app.config["SENTRY_DSN"], auto_log_stacks=True) def report(self, err, metadata=None): if not self.send: return if metadata is None: metadata = {} exc_info = (err.__class__, err, err.__traceback__) self.client.captureException(exc_info=exc_info, extra=metadata) # with sentry_sdk.configure_scope() as scope: # scope.level = "error" # for key, val in metadata.items(): # scope.set_extra(key, val) # sentry_sdk.capture_exception(err) PK!EH>>fastlane/models/__init__.pyfrom flask_mongoengine import MongoEngine db = MongoEngine() PK!Nfastlane/models/job.py# Standard Library import datetime from uuid import uuid4 # 3rd Party import mongoengine.errors from mongoengine import (BooleanField, DateTimeField, DictField, EmbeddedDocumentField, IntField, ListField, ReferenceField, StringField) # Fastlane from fastlane.models import db class JobExecution(db.EmbeddedDocument): class Status: enqueued = "enqueued" pulling = "pulling" running = "running" done = "done" failed = "failed" timedout = "timedout" expired = "expired" stopped = "stopped" created_at = DateTimeField(required=True) started_at = DateTimeField(required=False) finished_at = DateTimeField(required=False) execution_id = StringField(required=True) image = StringField(required=True) command = StringField(required=True) status = StringField(required=True, default=Status.enqueued) log = StringField(required=False) error = StringField(required=False) exit_code = IntField(required=False) metadata = DictField(required=False) def to_dict(self, include_log=False, include_error=False): s_at = self.started_at.isoformat() if self.started_at is not None else None f_at = self.finished_at.isoformat() if self.finished_at is not None else None res = { "createdAt": self.created_at.isoformat(), "startedAt": s_at, "finishedAt": f_at, "image": self.image, "command": self.command, "metadata": self.metadata, "status": self.status, "exitCode": self.exit_code, } if self.finished_at is not None: res["finishedAt"] = self.finished_at.isoformat() if include_log: res["log"] = self.log if include_error: res["error"] = self.error return res class Job(db.Document): created_at = DateTimeField(required=True) last_modified_at = DateTimeField(required=True, default=datetime.datetime.now) job_id = StringField(required=True) executions = ListField(EmbeddedDocumentField(JobExecution)) task = ReferenceField( "Task", required=True, reverse_delete_rule=mongoengine.CASCADE ) metadata = DictField(required=False) scheduled = BooleanField(required=True, default=False) def save(self, *args, **kwargs): if self.executions is None: self.executions = [] if not self.created_at: self.created_at = datetime.datetime.utcnow() self.last_modified_at = datetime.datetime.utcnow() return super(Job, self).save(*args, **kwargs) def create_execution(self, image, command): ex_id = str(uuid4()) ex = JobExecution( execution_id=ex_id, image=image, command=command, created_at=datetime.datetime.utcnow(), ) self.executions.append(ex) self.save() return ex def get_metadata(self, blacklist): if "envs" in self.metadata: envs = {} for key, val in self.metadata["envs"].items(): for word in blacklist: if word in key.lower(): val = "*" * len(str(val)) break envs[key] = val self.metadata["envs"] = envs return self.metadata def to_dict( self, include_log=False, include_error=False, include_executions=True, blacklist=None, ): if blacklist is None: blacklist = [] meta = self.get_metadata(blacklist) res = { "createdAt": self.created_at.isoformat(), "lastModifiedAt": self.last_modified_at.isoformat(), "taskId": self.task.task_id, "scheduled": self.scheduled, "executionCount": len(self.executions), "metadata": meta, } if include_executions: executions = [ ex.to_dict(include_log, include_error) for ex in self.executions ] res["executions"] = executions return res @classmethod def get_by_id(cls, task_id, job_id): from fastlane.models.task import Task if task_id is None or task_id == "" or job_id is None or job_id == "": raise RuntimeError( "Task ID and Job ID are required and can't be None or empty." ) t = Task.objects(task_id=task_id).first() j = cls.objects(task=t, job_id=job_id).first() return j def get_execution_by_id(self, execution_id): for job_execution in self.executions: if job_execution.execution_id == execution_id: return job_execution return None def get_last_execution(self): if not self.executions: return None return self.executions[-1] PK!#IBBfastlane/models/task.py# Standard Library import datetime # 3rd Party import mongoengine.errors from bson.objectid import ObjectId from mongoengine import (BooleanField, DateTimeField, ListField, ReferenceField, StringField) # Fastlane from fastlane.models import db class Task(db.Document): created_at = DateTimeField(required=True) last_modified_at = DateTimeField( required=True, default=datetime.datetime.now) task_id = StringField(required=True) jobs = ListField(ReferenceField('Job')) def _validate(self): errors = {} if self.task_id == "": errors["task_id"] = mongoengine.errors.ValidationError( 'Field is required', field_name="task_id") if errors: message = 'ValidationError (%s:%s) ' % (self._class_name, self.pk) raise mongoengine.errors.ValidationError(message, errors=errors) def save(self, *args, **kwargs): self._validate() if not self.created_at: self.created_at = datetime.datetime.now() self.last_modified_at = datetime.datetime.now() return super(Task, self).save(*args, **kwargs) @classmethod def create_task(cls, task_id): t = cls(task_id=task_id) t.save() return t @classmethod def get_by_task_id(cls, task_id): if task_id is None or task_id == "": raise RuntimeError( "Task ID is required and can't be None or empty.") t = cls.objects(task_id=task_id).no_dereference().first() return t def create_job(self): from fastlane.models.job import Job job_id = ObjectId() j = Job( id=job_id, job_id=str(job_id), ) j.task = self j.save() self.jobs.append(j) self.save() return j PK!Ofastlane/templates/stream.html {{ task_id }} - {{ job_id }} - Fastlane

PK!bGfastlane/utils.py# Standard Library import re from datetime import timedelta regex = re.compile( r'((?P\d+?)h)?((?P\d+?)m)?((?P\d+?)s)?') def parse_time(time_str): if time_str is None: return None parts = regex.match(time_str) if not parts: return None parts = parts.groupdict() time_params = {} for (name, param) in parts.items(): if param: time_params[name] = int(param) return timedelta(**time_params) PK!Jߕfastlane/worker/__init__.pyclass ExecutionResult: class Status: created = 'created' running = 'running' failed = 'failed' done = 'done' def __init__(self, status): self.status = status self.exit_code = None self.started_at = None self.finished_at = None self.log = '' self.error = '' def set_log(self, log): self.log = log PK!)w**"fastlane/worker/docker_executor.py# Standard Library import random import re from json import loads # 3rd Party import docker from dateutil.parser import parse from flask import Blueprint, current_app, g, make_response, request # Fastlane from fastlane.worker import ExecutionResult # https://docs.docker.com/engine/reference/commandline/ps/#examples # One of created, restarting, running, removing, paused, exited, or dead STATUS = { "created": ExecutionResult.Status.created, "exited": ExecutionResult.Status.done, "dead": ExecutionResult.Status.failed, "running": ExecutionResult.Status.running, } bp = Blueprint("docker", __name__, url_prefix="/docker-executor") blacklist_key = "docker-executor::blacklisted-hosts" job_prefix = "fastlane-job" def get_details(): details = request.get_json() if details is None and request.get_data(): details = loads(request.get_data()) return details @bp.route("/blacklist", methods=["POST", "PUT"]) def add_to_blacklist(): redis = current_app.redis data = get_details() if data is None or data == "": msg = "Failed to add host to blacklist because JSON body could not be parsed." g.logger.warn(msg) return make_response(msg, 400) if "host" not in data: msg = "Failed to add host to blacklist because 'host' attribute was not found in JSON body." g.logger.warn(msg) return make_response(msg, 400) host = data["host"] redis.sadd(blacklist_key, host) return "" @bp.route("/blacklist", methods=["DEL", "DELETE"]) def remove_from_blacklist(): redis = current_app.redis data = get_details() if data is None or data == "": msg = "Failed to remove host from blacklist because JSON body could not be parsed." g.logger.warn(msg) return make_response(msg, 400) if "host" not in data: msg = "Failed to remove host from blacklist because 'host' attribute was not found in JSON body." g.logger.warn(msg) return make_response(msg, 400) host = data["host"] redis.srem(blacklist_key, host) return "" class DockerPool: def __init__(self, docker_hosts): self.docker_hosts = docker_hosts self.max_running = {} self.clients_per_regex = [] self.clients = {} self.__init_clients() def __init_clients(self): for regex, docker_hosts, max_running in self.docker_hosts: clients = (regex, []) self.clients_per_regex.append(clients) self.max_running[regex] = max_running for address in docker_hosts: host, port = address.split(":") cl = docker.DockerClient(base_url=address) self.clients[address] = (host, port, cl) clients[1].append((host, port, cl)) def get_client(self, task_id, host=None, port=None, blacklist=None): if host is not None and port is not None: return self.clients.get(f"{host}:{port}") if blacklist is None: blacklist = set() for regex, clients in self.clients_per_regex: filtered = [ (host, port, client) for (host, port, client) in clients if f"{host}:{port}" not in blacklist ] if not filtered or (regex is not None and not regex.match(task_id)): continue return random.choice(filtered) raise RuntimeError(f"Failed to find a docker host for task id {task_id}.") class Executor: def __init__(self, app, pool=None): self.app = app self.pool = pool if pool is None: docker_hosts = [] clusters = loads(self.app.config["DOCKER_HOSTS"]) for cluster in clusters: regex = cluster["match"] if not regex: regex = None else: regex = re.compile(regex) hosts = cluster["hosts"] max_running = cluster.get("maxRunning", 10) docker_hosts.append((regex, hosts, max_running)) self.pool = DockerPool(docker_hosts) def validate_max_running_executions(self, task_id): total_running = 0 max_running = 0 for regex, clients in self.pool.clients_per_regex: if regex is not None and not regex.match(task_id): continue total_running = len(self.get_running_containers(regex)["running"]) max_running = self.pool.max_running[regex] break return total_running == 0 or total_running <= max_running def update_image(self, task, job, execution, image, tag): host, port, cl = self.pool.get_client( task.task_id, blacklist=self.get_blacklisted_hosts() ) cl.images.pull(image, tag=tag) execution.metadata["docker_host"] = host execution.metadata["docker_port"] = port def run(self, task, job, execution, image, tag, command): host, port, cl = None, None, None if "docker_host" in execution.metadata: h = execution.metadata["docker_host"] p = execution.metadata["docker_port"] host, port, cl = self.pool.get_client(task.task_id, h, p) else: host, port, cl = self.pool.get_client( task.task_id, blacklist=self.get_blacklisted_hosts() ) execution.metadata["docker_host"] = host execution.metadata["docker_port"] = port container = cl.containers.run( image=f"{image}:{tag}", name=f"{job_prefix}-{execution.execution_id}", command=command, detach=True, environment=job.metadata.get("envs", {}), ) execution.metadata["container_id"] = container.id return True def stop_job(self, task, job, execution): if "container_id" not in execution.metadata: return h = execution.metadata["docker_host"] p = execution.metadata["docker_port"] host, port, cl = self.pool.get_client(task.task_id, h, p) container = cl.containers.get(execution.metadata["container_id"]) container.stop() def convert_date(self, dt): return parse(dt) def get_result(self, task, job, execution): h = execution.metadata["docker_host"] p = execution.metadata["docker_port"] host, port, cl = self.pool.get_client(task.task_id, h, p) container_id = execution.metadata["container_id"] container = cl.containers.get(container_id) # container.attrs['State'] # {'Status': 'exited', 'Running': False, 'Paused': False, 'Restarting': False, # 'OOMKilled': False, 'Dead': False, 'Pid': 0, 'ExitCode': 0, 'Error': '', # 'StartedAt': '2018-08-27T17:14:14.1951232Z', 'FinishedAt': '2018-08-27T17:14:14.2707026Z'} result = ExecutionResult( STATUS.get(container.status, ExecutionResult.Status.done) ) state = container.attrs["State"] result.exit_code = state["ExitCode"] result.error = state["Error"] result.started_at = self.convert_date(state["StartedAt"]) if ( result.status == ExecutionResult.Status.done or result.status == ExecutionResult.Status.failed ): result.finished_at = self.convert_date(state["FinishedAt"]) result.log = container.logs(stdout=True, stderr=False) if result.error != "": result.error += ( f"\n\nstderr:\n{container.logs(stdout=False, stderr=True)}" ) else: result.error = container.logs(stdout=False, stderr=True) return result def get_running_containers(self, regex=None): running = [] clients = self.pool.clients.values() if regex is not None: for r, cl in self.pool.clients_per_regex: if r is not None and r != regex: continue clients = cl for (host, port, client) in clients: containers = client.containers.list( sparse=False, filters={"status": "running"} ) for container in containers: if not container.name.startswith(job_prefix): continue running.append((host, port, container.id)) return { "available": [f"{host}:{port}" for (host, port, client) in clients], "running": running, } def get_current_logs(self, task_id, job, execution): h = execution.metadata["docker_host"] p = execution.metadata["docker_port"] host, port, cl = self.pool.get_client(task_id, h, p) container_id = execution.metadata["container_id"] container = cl.containers.get(container_id) log = container.logs(stdout=True, stderr=True).decode("utf-8") return log def get_streaming_logs(self, task_id, job, execution): h = execution.metadata["docker_host"] p = execution.metadata["docker_port"] host, port, cl = self.pool.get_client(task_id, h, p) container_id = execution.metadata["container_id"] container = cl.containers.get(container_id) for log in container.logs(stdout=True, stderr=True, stream=True): yield log.decode("utf-8") def get_blacklisted_hosts(self): redis = current_app.redis hosts = redis.smembers(blacklist_key) return set([host.decode("utf-8") for host in hosts]) def mark_as_done(self, task, job, execution): h = execution.metadata["docker_host"] p = execution.metadata["docker_port"] host, port, cl = self.pool.get_client(task.task_id, h, p) container_id = execution.metadata["container_id"] container = cl.containers.get(container_id) container.rename(f"defunct-{container.name}") def remove_done(self): removed_containers = [] clients = self.pool.clients.values() for (host, port, client) in clients: containers = client.containers.list( sparse=False, all=True, filters={"name": f"defunct-{job_prefix}"} ) for container in containers: removed_containers.append( { "host": f"{host}:{port}", "name": container.name, "id": container.id, "image": container.image.attrs["RepoTags"][0], } ) container.remove() return removed_containers PK!&Gl@l@fastlane/worker/job.py# Standard Library import calendar import json import math import smtplib import time import traceback from datetime import datetime, timedelta from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText # 3rd Party from flask import current_app, url_for from rq_scheduler import Scheduler # Fastlane from fastlane.models.job import Job, JobExecution from fastlane.worker import ExecutionResult def validate_max_concurrent(executor, task_id, job, image, command, logger): if not executor.validate_max_running_executions(task_id): logger.debug( "Maximum number of global container executions reached. Enqueuing job execution..." ) args = [task_id, job.id, image, command] result = current_app.job_queue.enqueue(run_job, *args, timeout=-1) job.metadata["enqueued_id"] = result.id job.save() logger.info( "Job execution re-enqueued successfully due to max number of container executions." ) return False return True def validate_expiration(job, ex, logger): d = datetime.utcnow() unixtime = calendar.timegm(d.utctimetuple()) if ( job.metadata.get("expiration") is not None and job.metadata["expiration"] < unixtime ): expiration_utc = datetime.utcfromtimestamp(job.metadata["expiration"]) ex.status = JobExecution.Status.expired ex.error = "Job was supposed to be done before %s, but was started at %s." % ( expiration_utc.isoformat(), d.isoformat(), ) ex.finished_at = datetime.utcnow() job.save() logger.info( "Job execution canceled due to being expired.", job_expiration=job.metadata["expiration"], current_ts=unixtime, ) return False return True def download_image(executor, job, ex, image, tag, command, logger): try: logger.debug("Downloading updated container image...", image=image, tag=tag) before = time.time() executor.update_image(job.task, job, ex, image, tag) ellapsed = time.time() - before logger.info( "Image downloaded successfully.", image=image, tag=tag, ellapsed=ellapsed ) except Exception as err: error = traceback.format_exc() logger.error("Failed to download image.", error=error) ex.error = error ex.status = JobExecution.Status.failed job.save() current_app.report_error( err, metadata=dict( operation="Downloading Image", task_id=job.task.task_id, job_id=job.id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) return False return True def run_container(executor, job, ex, image, tag, command, logger): logger.debug( "Running command in container...", image=image, tag=tag, command=command ) try: ex.started_at = datetime.utcnow() job.save() before = time.time() executor.run(job.task, job, ex, image, tag, command) ellapsed = time.time() - before logger.info( "Container started successfully.", image=image, tag=tag, command=command, ellapsed=ellapsed, ) except Exception as err: error = traceback.format_exc() logger.error("Failed to run command", error=error) ex.error = error ex.status = JobExecution.Status.failed job.save() current_app.report_error( err, metadata=dict( operation="Running Container", task_id=job.task.task_id, job_id=job.id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) return False return True def run_job(task_id, job_id, image, command): app = current_app logger = app.logger.bind( operation="run_job", task_id=task_id, job_id=job_id, image=image, command=command, ) try: executor = app.executor job = Job.get_by_id(task_id, job_id) if job is None: logger.error("Job was not found with task id and job id.") return False if not validate_max_concurrent(executor, task_id, job, image, command, logger): return False tag = "latest" if ":" in image: image, tag = image.split(":") logger = logger.bind(image=image, tag=tag) logger.debug("Changing job status...", status=JobExecution.Status.pulling) ex = job.create_execution(image=image, command=command) ex.status = JobExecution.Status.enqueued job.save() logger.debug( "Job status changed successfully.", status=JobExecution.Status.pulling ) logger = logger.bind(execution_id=ex.execution_id) except Exception as err: error = traceback.format_exc() logger.error("Failed to create job execution. Skipping job...", error=error) current_app.report_error( err, metadata=dict(task_id=task_id, job_id=job_id, image=image, command=command), ) return False try: if not validate_expiration(job, ex, logger): return False logger.info("Started processing job.") if not download_image(executor, job, ex, image, tag, command, logger): return False if not run_container(executor, job, ex, image, tag, command, logger): return False logger.debug("Changing job status...", status=JobExecution.Status.running) ex.status = JobExecution.Status.running job.save() logger.debug( "Job status changed successfully.", status=JobExecution.Status.running ) app.monitor_queue.enqueue( monitor_job, task_id, job_id, ex.execution_id, timeout=-1 ) return True except Exception as err: error = traceback.format_exc() logger.error("Failed to run job", error=error) ex.status = JobExecution.Status.failed ex.error = "Job failed to run with error: %s" % error job.save() current_app.report_error( err, metadata=dict( operation="Running Container", task_id=task_id, job_id=job_id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) def notify_users(task, job, execution, logger): task_id = task.task_id job_id = str(job.id) execution_id = str(execution.execution_id) succeed = job.metadata.get("notify", {}).get("succeeds", []) fails = job.metadata.get("notify", {}).get("fails", []) finishes = job.metadata.get("notify", {}).get("finishes", []) if execution.status == JobExecution.Status.done: logger.info("Notifying users of success...") for email in succeed: logger.info("Notifying user of success...", email=email) subject = "Job %s/%s succeeded!" % (task_id, job_id) args = [task_id, job_id, execution_id, subject, email] current_app.notify_queue.enqueue(send_email, *args, timeout=-1) if execution.status == JobExecution.Status.failed: logger.info("Notifying users of failure...") for email in fails: logger.info( "Notifying user of failure...", email=email, exit_code=execution.exit_code, ) subject = "Job %s/%s failed with exit code %d!" % ( task_id, job_id, execution.exit_code, ) args = [task_id, job_id, execution_id, subject, email] current_app.notify_queue.enqueue(send_email, *args, timeout=-1) logger.info("Notifying users of completion...") for email in finishes: logger.info( "Notifying user of completion...", email=email, exit_code=execution.exit_code, ) subject = "Job %s/%s finished with exit code %d!" % ( task_id, job_id, execution.exit_code, ) args = [task_id, job_id, execution_id, subject, email] current_app.job_queue.enqueue(send_email, *args, timeout=-1) def monitor_job(task_id, job_id, execution_id): try: app = current_app executor = app.executor job = Job.get_by_id(task_id, job_id) logger = app.logger.bind( operation="monitor_job", task_id=task_id, job_id=job_id, execution_id=execution_id, ) if job is None: logger.error("Failed to retrieve task or job.") return False execution = job.get_execution_by_id(execution_id) result = executor.get_result(job.task, job, execution) logger.info( "Container result obtained.", container_status=result.status, container_exit_code=result.exit_code, ) if result.status in ( ExecutionResult.Status.created, ExecutionResult.Status.running, ): ellapsed = (datetime.utcnow() - execution.started_at).total_seconds() if ellapsed > job.metadata["timeout"]: execution.finished_at = datetime.utcnow() execution.status = JobExecution.Status.timedout execution.error = "Job execution timed out after %d seconds." % ellapsed executor.stop_job(job.task, job, execution) logger.debug( "Job execution timed out. Storing job details in mongo db.", status=execution.status, ellapsed=ellapsed, error=result.error, ) job.save() logger.info("Job execution timed out.", status=execution.status) return False scheduler = Scheduler("monitor", connection=app.redis) logger.info( "Job has not finished. Retrying monitoring in the future.", container_status=result.status, seconds=1, ) interval = timedelta(seconds=5) scheduler.enqueue_in(interval, monitor_job, task_id, job_id, execution_id) return True if ( result.exit_code != 0 and "retry_count" in job.metadata and job.metadata["retry_count"] < job.metadata["retries"] ): retry_logger = logger.bind( exit_code=result.exit_code, retry_count=job.metadata["retry_count"], retries=job.metadata["retries"], ) retry_logger.debug("Job failed. Enqueuing job retry...") job.metadata["retry_count"] += 1 scheduler = Scheduler("jobs", connection=current_app.redis) args = [task_id, job_id, execution.image, execution.command] factor = app.config["EXPONENTIAL_BACKOFF_FACTOR"] min_backoff = app.config["EXPONENTIAL_BACKOFF_MIN_MS"] / 1000.0 delta = timedelta(seconds=min_backoff) if job.metadata["retries"] > 0: delta = timedelta( seconds=math.pow(factor, job.metadata["retry_count"]) * min_backoff ) dt = datetime.utcnow() + delta enqueued = scheduler.enqueue_at(dt, run_job, *args) job.metadata["enqueued_id"] = enqueued.id job.save() retry_logger.info("Job execution enqueued successfully.") # still need to finish current execution as the retry # will be a new execution execution.finished_at = datetime.utcnow() execution.exit_code = result.exit_code execution.status = ( JobExecution.Status.done if execution.exit_code == 0 else JobExecution.Status.failed ) execution.log = result.log.decode("utf-8") execution.error = result.error.decode("utf-8") logger.debug( "Job finished. Storing job details in mongo db.", status=execution.status, log=result.log, error=result.error, ) job.save() logger.info("Job details stored in mongo db.", status=execution.status) executor.mark_as_done(job.task, job, execution) notify_users(job.task, job, execution, logger) return True except Exception as err: error = traceback.format_exc() logger.error("Failed to monitor job", error=error) current_app.report_error( err, metadata=dict( operation="Monitoring Job", task_id=task_id, job_id=job_id, execution_id=execution_id, ), ) raise err def send_email(task_id, job_id, execution_id, subject, to_email): app = current_app job = Job.get_by_id(task_id, job_id) logger = app.logger.bind( operation="send_email", task_id=task_id, job_id=job_id, to_email=to_email, execution_id=execution_id, subject=subject, ) if job is None: logger.error("Failed to retrieve task or job.") return False execution = job.get_execution_by_id(execution_id) logger.info("Execution loaded successfully") smtp_host = app.config["SMTP_HOST"] smtp_port = app.config["SMTP_PORT"] smtp_from = app.config["SMTP_FROM"] if smtp_host is None or smtp_port is None or smtp_from is None: logger.error( "SMTP_HOST, SMTP_PORT and SMTP_FROM must be configured. Skipping sending e-mail." ) return False try: smtp_port = int(smtp_port) logger = logger.bind(smtp_host=smtp_host, smtp_port=smtp_port) logger.info("Connecting to SMTP Server...") server = smtplib.SMTP(smtp_host, smtp_port) server.set_debuglevel(0) if app.config.get("SMTP_USE_SSL"): logger.info("Starting TLS...") server.starttls() smtp_user = app.config.get("SMTP_USER") smtp_password = app.config.get("SMTP_PASSWORD") if smtp_user and smtp_password: logger.info( "Authenticating with SMTP...", smtp_user=smtp_user, smtp_password=smtp_password, ) server.login(smtp_user, smtp_password) from_email = app.config["SMTP_FROM"] task_url = url_for("task.get_task", task_id=task_id, _external=True) job_url = url_for( "task.get_job", task_id=task_id, job_id=job_id, _external=True ) job_data = json.dumps( execution.to_dict(include_log=True, include_error=True), indent=4, sort_keys=True, ) body = ( """ Automatic message. Please do not reply to this! Job Details: %s """ % job_data ) subj = "[Fastlane] %s" % subject msg = MIMEMultipart("alternative") msg["Subject"] = subj msg["From"] = from_email msg["To"] = to_email part1 = MIMEText(body, "plain") html_body = """

Job Details:

%s
---

[View Task Details] | [View Job Details]

---

Automatic message. Please do not reply to this!

""" % ( job_data, task_url, job_url, ) part2 = MIMEText(html_body, "html") msg.attach(part1) msg.attach(part2) logger.info("Sending email...") server.sendmail(from_email, to_email, msg.as_string()) server.quit() logger.info("Email sent successfully.") except Exception as exc: error = traceback.format_exc() logger.error("Sending e-mail failed with exception!", error=error) raise exc PK!D44fastlane/worker/scheduler.py# 3rd Party from rq_scheduler import Scheduler class QueueScheduler: def __init__(self, queue_name, app): self.app = app self.logger = self.app.logger.bind(queue_name=queue_name) self.scheduler = Scheduler(queue_name=queue_name, connection=app.redis) def move_jobs(self): if self.scheduler.acquire_lock(): try: jobs = self.scheduler.get_jobs() self.logger.debug( "Lock acquired. Enqueuing scheduled jobs...", jobs=jobs ) self.scheduler.enqueue_jobs() finally: self.scheduler.remove_lock() else: self.logger.debug( "Lock could not be acquired. Enqueuing scheduled jobs skipped. Trying again next cycle." ) PK!H+l.C)fastlane-0.4.3.dist-info/entry_points.txtN+I/N.,()JK,.IK1s2r3rrPK!DE"'33 fastlane-0.4.3.dist-info/LICENSEMIT License Copyright (c) 2018 Bernardo Heynemann Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HWYfastlane-0.4.3.dist-info/WHEEL A н#Z;/"b&F]xzwC;dhfCSTֻ0*Ri.4œh6-]{H, JPK!Hf$h-!fastlane-0.4.3.dist-info/METADATAn0 ܀X!#vP`]6vʌŖ:GY64_š>~R_YmhmY1c[&!s%ͣ{uLDˆʚԊ@Gf&'l$8FuY-_l[<;gJW=ht %tL(?9U}h1b' |ӪѬ뼂[&}I3V+rfF>2LQ[OX ВWo3,69M^LOhؔ#D ȕy(W4[lÒ!v-6dy"z)"l凶kZi2s_ܿ)xV5 g_~*1\pV k2x|5+.OAPmN(>gcl C0C?|jWKcSȄdvܶ}KjFɰy1 <,267PK!HSL fastlane-0.4.3.dist-info/RECORD}ǖV< aDh! "IاhGAU,:l?h)>?SYttzV TA9ţ*GI},g%% lqA\fHͪS/Wͭw] ~ 8f=;ю\V'9R$q9VVtN$Tx}`L e,@}x4Auv[jI,ÂD8b*&IXua6l,B((8_pG#ܫz.;ǀfLdrAX(|<TIMFݰ`ҵ|s#LdS&x;\h+!1뇹k'zAԝ;DN9񡒜A=;1:BK̆ޱG7Tnx k蜤IQ1/S4bɈ2^/e\bl ] wUz@xVMb%xЎ)tB&Bi; ~5L=]dMG$,x,,עlaWB.Iot衪TɃS+KuE)x5">YX7oRsPf%gdRSք/LӸhy'/-ii4m¢.c+Ы)K)m:3`?Y]̪7fOE\\授ӉἋy*}}@^Ɩ=p u3l|1&CX &53^ 4`Cs>ox.] *BZJ&]3ob=E"dSY]ХZ!V-c<bw[!M*Z7?\04.uKyxD|ױ[^' R̩w(clGBM>Џ'h'転Ə9}>`{v".Bu ąI'@Z>xo_N_yNd,qU8nLb<}|@գ֣q{ps "ǩ#}JƟe<>)GOyoգlE1/艳>Z9m>fastlane/models/__init__.pyPK!N!fastlane/models/job.pyPK!#IBBԤfastlane/models/task.pyPK!OKfastlane/templates/stream.htmlPK!bGfastlane/utils.pyPK!Jߕfastlane/worker/__init__.pyPK!)w**"_fastlane/worker/docker_executor.pyPK!&Gl@l@fastlane/worker/job.pyPK!D44Ofastlane/worker/scheduler.pyPK!H+l.C)"fastlane-0.4.3.dist-info/entry_points.txtPK!DE"'33 2#fastlane-0.4.3.dist-info/LICENSEPK!HWY'fastlane-0.4.3.dist-info/WHEELPK!Hf$h-!6(fastlane-0.4.3.dist-info/METADATAPK!HSL O*fastlane-0.4.3.dist-info/RECORDPK!!) n0