PK! qfastlane/__init__.py__version__ = '0.1.0' PK!fastlane/api/__init__.pyPK!8fIIfastlane/api/app.py# Standard Library import logging import sys from json import loads # 3rd Party import rq_dashboard import structlog from flask import Flask from flask_redis import FlaskRedis from flask_redis_sentinel import SentinelExtension from flask_sockets import Sockets from gevent import pywsgi from geventwebsocket.handler import WebSocketHandler from structlog.processors import ( JSONRenderer, StackInfoRenderer, TimeStamper, format_exc_info, ) from structlog.stdlib import add_log_level, add_logger_name, filter_by_level # Fastlane import fastlane.api.gzipped as gzipped import fastlane.api.metrics as metrics import fastlane.api.rqb as rqb from fastlane.api.enqueue import bp as enqueue from fastlane.api.execution import bp as execution_api from fastlane.api.healthcheck import bp as healthcheck from fastlane.api.status import bp as status from fastlane.api.stream import bp as stream from fastlane.api.task import bp as task_api from fastlane.models import db class Application: def __init__(self, config, log_level, testing=False): self.config = config self.logger = None self.log_level = log_level self.create_app(testing) def create_app(self, testing): self.app = Flask("fastlane") self.testing = testing self.app.testing = testing self.app.config.from_object(rq_dashboard.default_settings) self.app.error_handlers = [] for key in self.config.items.keys(): self.app.config[key] = self.config[key] self.app.config.DEBUG = self.config.DEBUG self.app.config.ENV = self.config.ENV self.app.original_config = self.config self.app.log_level = self.log_level self.configure_logging() self.connect_redis() self.connect_queue() self.connect_db() self.load_executor() self.load_error_handlers() metrics.init_app(self.app) self.app.register_blueprint(metrics.bp) self.app.register_blueprint(healthcheck) self.app.register_blueprint(enqueue) self.app.register_blueprint(task_api) self.app.register_blueprint(execution_api) self.app.register_blueprint(status) self.app.register_blueprint(gzipped.bp) gzipped.init_app(self.app) sockets = Sockets(self.app) sockets.register_blueprint(stream) def configure_logging(self): if self.app.testing: structlog.reset_defaults() disabled = [ "docker.utils.config", "docker.auth", "docker.api.build", "docker.api.swarm", "docker.api.image", "rq.worker", "werkzeug", "requests", "urllib3", ] for logger in disabled: log = logging.getLogger(logger) log.setLevel(logging.ERROR) log.disabled = True self.app.logger.disabled = True logging.basicConfig( level=self.log_level, stream=sys.stdout, format="%(message)s" ) chain = [ filter_by_level, add_log_level, add_logger_name, TimeStamper(fmt="iso"), StackInfoRenderer(), format_exc_info, JSONRenderer(indent=1, sort_keys=True), ] logger = logging.getLogger(__name__) if self.testing: chain = [] logger = structlog.ReturnLogger() log = structlog.wrap_logger( logger, processors=chain, context_class=dict, wrapper_class=structlog.stdlib.BoundLogger, # cache_logger_on_first_use=True, ) self.logger = log self.app.logger = self.logger def connect_redis(self): self.logger.debug("Connecting to redis...") if self.app.testing: self.logger.info("Configuring Fake Redis...") import fakeredis self.app.redis = FlaskRedis.from_custom_provider(fakeredis.FakeStrictRedis) self.app.redis.connect = self._mock_redis(True) self.app.redis.disconnect = self._mock_redis(False) self.app.redis.init_app(self.app) elif self.app.config["REDIS_URL"].startswith("redis+sentinel"): self.logger.info( "Configuring Redis Sentinel...", redis_url=self.app.config["REDIS_URL"] ) redis_sentinel = SentinelExtension() redis_connection = redis_sentinel.default_connection redis_sentinel.init_app(self.app) self.app.redis = redis_connection else: self.logger.info( "Configuring Redis...", redis_url=self.app.config["REDIS_URL"] ) self.app.redis = FlaskRedis() self.app.redis.init_app(self.app) self.logger.info("Connection to redis successful") def connect_queue(self): self.app.queue = None self.app.register_blueprint(rqb.bp) rqb.init_app(self.app) def connect_db(self): settings = self.app.config["MONGODB_CONFIG"] if isinstance(settings, (dict,)): self.app.config["MONGODB_SETTINGS"] = settings else: self.app.config["MONGODB_SETTINGS"] = loads( self.app.config["MONGODB_CONFIG"] ) self.logger.info( "Connecting to MongoDB...", mongo=self.app.config["MONGODB_SETTINGS"] ) db.init_app(self.app) self.logger.info( "Connected to MongoDB successfully.", mongo=self.app.config["MONGODB_SETTINGS"], ) def load_executor(self): name = self.config.EXECUTOR parts = name.split(".") executor_module = __import__(".".join(parts), None, None, [parts[-1]], 0) self.app.executor_module = executor_module blueprint = getattr(executor_module, "bp", None) if blueprint is not None: self.app.register_blueprint(blueprint) self.app.executor = self.app.executor_module.Executor(self.app) def load_error_handlers(self): self.app.error_handlers = [] for handler_name in self.app.config["ERROR_HANDLERS"]: parts = handler_name.split(".") obj = __import__(".".join(parts[:-1]), None, None, [parts[-1]], 0) obj = getattr(obj, parts[-1]) self.app.error_handlers.append(obj(self.app)) self.app.report_error = self.report_error def report_error(self, err, metadata=None): for handler in self.app.error_handlers: handler.report(err, metadata) def run(self, host, port): server = pywsgi.WSGIServer( (host, port), self.app, handler_class=WebSocketHandler ) server.serve_forever() def _mock_redis(self, connected): def handle(): self.app.redis._redis_client.connected = connected return handle PK!v j fastlane/api/enqueue.py# Standard Library from datetime import datetime, timezone from uuid import UUID # 3rd Party from flask import Blueprint, current_app, g, jsonify, make_response, request, url_for from rq_scheduler import Scheduler # Fastlane from fastlane.helpers import loads from fastlane.models import JobExecution, Task from fastlane.utils import parse_time from fastlane.worker.job import run_job bp = Blueprint("enqueue", __name__) # pylint: disable=invalid-name def get_details(): try: details = request.get_json() except Exception: details = None if details is None and request.get_data(): details = loads(request.get_data()) return details def get_ip_addr(): if "X-Real-Ip" in request.headers: return request.headers["X-Real-Ip"] if "X-Forwarded-For" in request.headers: addresses = request.headers["X-Forwarded-For"].split(",") if addresses: return addresses[0] return request.remote_addr def create_job(details, task, logger, get_new_job_fn): logger.debug("Creating job...") retries = details.get("retries", 0) expiration = details.get("expiration") # people to notify when job succeeds, fails or finishes notify = details.get("notify", {"succeeds": [], "fails": [], "finishes": []}) # people to notify when job succeeds, fails or finishes webhooks = details.get("webhooks", {"succeeds": [], "fails": [], "finishes": []}) # additional metadata metadata = details.get("metadata", {}) if not isinstance(metadata, (dict,)): metadata = {} hard_limit = current_app.config["HARD_EXECUTION_TIMEOUT_SECONDS"] timeout = details.get("timeout", hard_limit) timeout = min( timeout, hard_limit ) # ensure jobs can't specify more than hard limit j = get_new_job_fn(task) j.metadata["retries"] = retries j.metadata["notify"] = notify j.metadata["webhooks"] = webhooks j.metadata["retry_count"] = 0 j.metadata["expiration"] = expiration j.metadata["timeout"] = timeout j.metadata["envs"] = details.get("envs", {}) j.request_ip = get_ip_addr() if metadata: j.metadata["custom"] = metadata j.save() logger.debug("Job created successfully...", job_id=str(j.job_id)) return j def enqueue_job(task, job, image, command, start_at, start_in, cron, logger): execution = None scheduler = Scheduler("jobs", connection=current_app.redis) args = [task.task_id, str(job.job_id), None, image, command] queue_job_id = None if start_at is not None: future_date = datetime.utcfromtimestamp(int(start_at)) logger.debug("Enqueuing job execution in the future...", start_at=future_date) result = scheduler.enqueue_at(future_date, run_job, *args) job.metadata["enqueued_id"] = str(result.id) queue_job_id = str(result.id) job.save() logger.info("Job execution enqueued successfully.", start_at=future_date) elif start_in is not None: future_date = datetime.now(tz=timezone.utc) + start_in logger.debug("Enqueuing job execution in the future...", start_at=future_date) result = scheduler.enqueue_at(future_date, run_job, *args) job.metadata["enqueued_id"] = str(result.id) queue_job_id = str(result.id) job.save() logger.info("Job execution enqueued successfully.", start_at=future_date) elif cron is not None: logger.debug("Enqueuing job execution using cron...", cron=cron) result = scheduler.cron( cron, # A cron string (e.g. "0 0 * * 0") func=run_job, args=args, repeat=None, queue_name="jobs", ) job.metadata["enqueued_id"] = str(result.id) queue_job_id = str(result.id) job.metadata["cron"] = cron job.scheduled = True job.save() logger.info("Job execution enqueued successfully.", cron=cron) else: logger.debug("Enqueuing job execution...") execution = job.create_execution(image, command) execution.request_ip = get_ip_addr() execution.status = JobExecution.Status.enqueued job.save() args = [ task.task_id, str(job.job_id), str(execution.execution_id), image, command, ] result = current_app.job_queue.enqueue(run_job, *args, timeout=-1) queue_job_id = result.id job.metadata["enqueued_id"] = result.id job.save() logger.info("Job execution enqueued successfully.") return queue_job_id, execution def get_task_and_details(task_id): details = get_details() if details is None or details == "": msg = "Failed to enqueue task because JSON body could not be parsed." g.logger.warn(msg) return None, None, None, make_response(msg, 400) image = details.get("image", None) command = details.get("command", None) if image is None or command is None: return ( None, None, None, make_response("image and command must be filled in the request.", 400), ) logger = g.logger.bind(task_id=task_id, image=image, command=command) logger.debug("Creating task...") task = Task.objects(task_id=task_id).modify(task_id=task_id, upsert=True, new=True) logger.info("Task created successfully.") return task, details, logger, None def validate_and_enqueue(details, task, job, logger): image = details.get("image", None) command = details.get("command", None) start_at = details.get("startAt", None) start_in = parse_time(details.get("startIn", None)) cron = details.get("cron", None) if len(list(filter(lambda item: item is not None, (start_at, start_in, cron)))) > 1: return ( None, None, make_response( "Only ONE of 'startAt', 'startIn' and 'cron' should be in the request.", 400, ), ) result, execution = enqueue_job( task, job, image, command, start_at, start_in, cron, logger ) return result, execution, None @bp.route("/tasks/", methods=("POST",)) def create_task(task_id): task, details, logger, response = get_task_and_details(task_id) if response is not None: return response job = create_job(details, task, logger, lambda task: task.create_job()) enqueued_id, execution, response = validate_and_enqueue(details, task, job, logger) if response is not None: return response return get_enqueue_response(task, job, execution, enqueued_id) @bp.route("/tasks//jobs/", methods=("PUT",)) def create_or_update_task(task_id, job_id): try: job_id = str(UUID(job_id)) except ValueError: return make_response( f"The job_id {job_id} is not a valid UUID4. All job IDs must be UUID4.", 400 ) task, details, logger, response = get_task_and_details(task_id) if response is not None: return response job = create_job( details, task, logger, lambda task: task.create_or_update_job(job_id) ) enqueued_id, execution, response = validate_and_enqueue(details, task, job, logger) if response is not None: return response return get_enqueue_response(task, job, execution, enqueued_id) def get_enqueue_response(task, job, execution, enqueued_id): task_id = str(task.task_id) job_id = str(job.job_id) task_url = task.get_url() job_url = url_for("task.get_job", task_id=task_id, job_id=job_id, _external=True) if execution is None: execution_url = None execution_id = None else: execution_url = url_for( "execution.get_job_execution", task_id=task_id, job_id=job_id, execution_id=str(execution.execution_id), _external=True, ) execution_id = str(execution.execution_id) return jsonify( { "taskId": task_id, "jobId": job_id, "executionId": execution_id, "executionUrl": execution_url, "queueJobId": enqueued_id, "jobUrl": job_url, "taskUrl": task_url, } ) PK!},9fastlane/api/execution.py# 3rd Party from flask import ( Blueprint, Response, current_app, g, jsonify, render_template, request, url_for, ) from rq_scheduler import Scheduler # Fastlane from fastlane.api.helpers import return_error from fastlane.models import Job, JobExecution bp = Blueprint("execution", __name__) # pylint: disable=invalid-name @bp.route("/tasks//jobs//executions//", methods=("GET",)) def get_job_execution(task_id, job_id, execution_id): logger = g.logger.bind( operation="get_job_execution", task_id=task_id, job_id=job_id, execution_id=execution_id, ) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: msg = f"Task ({task_id}) or Job ({job_id}) not found." return return_error(msg, "get_job_execution", status=404, logger=logger) execution = job.get_execution_by_id(execution_id) if execution is None: msg = f"Job Execution ({execution_id}) not found in job ({job_id})." return return_error(msg, "get_job_execution", status=404, logger=logger) logger.debug("Job execution retrieved successfully...") return format_execution_details(job.task, job, execution) def format_execution_details(task, job, execution, shallow=False): task_id = str(task.task_id) if shallow: execution_url = url_for( "execution.get_job_execution", task_id=task_id, job_id=str(job.job_id), execution_id=str(execution.execution_id), _external=True, ) details = {"id": str(execution.execution_id), "url": execution_url} else: details = execution.to_dict(include_log=True, include_error=True) job_url = url_for( "task.get_job", task_id=task_id, job_id=str(job.job_id), _external=True ) task_url = url_for("task.get_task", task_id=task_id, _external=True) return jsonify( { "task": {"id": task_id, "url": task_url}, "job": {"id": str(job.job_id), "url": job_url}, "execution": details, } ) def retrieve_execution_details(task_id, job_id, execution_id=None, get_data_fn=None): if get_data_fn is None: get_data_fn = lambda execution: execution.log # noqa: E731 logger = g.logger.bind(operation="get_response", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: msg = f"Task ({task_id}) or Job ({job_id}) not found." return return_error( msg, "retrieve_execution_details", status=404, logger=logger ) if not job.executions: msg = f"No executions found in job ({job_id})." return return_error( msg, "retrieve_execution_details", status=400, logger=logger ) if execution_id is None: execution = job.get_last_execution() else: execution = job.get_execution_by_id(execution_id) if not execution: msg = "No executions found in job with specified arguments." return return_error( msg, "retrieve_execution_details", status=400, logger=logger ) headers = {"Fastlane-Exit-Code": str(execution.exit_code)} if execution.status in [JobExecution.Status.running, JobExecution.Status.enqueued]: logs = "" else: logs = get_data_fn(execution) return Response(headers=headers, response=logs, status=200) @bp.route( "/tasks//jobs//executions//stdout/", methods=("GET",) ) def get_job_execution_stdout(task_id, job_id, execution_id): return retrieve_execution_details(task_id, job_id, execution_id) @bp.route( "/tasks//jobs//executions//stderr/", methods=("GET",) ) def get_job_execution_stderr(task_id, job_id, execution_id): return retrieve_execution_details( task_id, job_id, execution_id, lambda execution: execution.error ) @bp.route( "/tasks//jobs//executions//logs/", methods=("GET",) ) def get_job_execution_logs(task_id, job_id, execution_id): func = lambda execution: ( # NOQA: 731 f"{execution.log}\n-=-\n{execution.error}" if execution.status not in [JobExecution.Status.running, JobExecution.Status.enqueued] else "" ) return retrieve_execution_details(task_id, job_id, execution_id, func) def perform_stop_job_execution(job, execution, logger, stop_schedule=True): if execution is None: if not job.executions: msg = "No executions found in job." return ( False, return_error(msg, "stop_job_execution", status=400, logger=logger), ) execution = job.get_last_execution() if execution is not None and execution.status == JobExecution.Status.running: logger.debug("Stopping current execution...") executor = current_app.executor executor.stop_job(job.task, job, execution) logger.debug("Current execution stopped.") if "retries" in job.metadata: job.metadata["retry_count"] = job.metadata["retries"] + 1 job.save() scheduler = Scheduler("jobs", connection=current_app.redis) if ( stop_schedule and "enqueued_id" in job.metadata and job.metadata["enqueued_id"] in scheduler ): scheduler.cancel(job.metadata["enqueued_id"]) job.scheduled = False logger.debug("Job stopped.") return True, None @bp.route( "/tasks//jobs//executions//stop/", methods=("POST",) ) def stop_job_execution(task_id, job_id, execution_id): logger = g.logger.bind( operation="stop_job_execution", task_id=task_id, job_id=job_id, execution_id=execution_id, ) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: msg = f"Task ({task_id}) or Job ({job_id}) not found." return return_error(msg, "stop_job_execution", status=404, logger=logger) execution = job.get_execution_by_id(execution_id) if execution is None: msg = f"Job Execution ({execution_id}) not found in Job ({job_id})." return return_error(msg, "stop_job_execution", status=404, logger=logger) _, response = perform_stop_job_execution( job, execution=execution, logger=logger, stop_schedule=False ) if response is not None: return response return format_execution_details(job.task, job, execution, shallow=True) @bp.route("/tasks//jobs//executions//stream/") def stream_job(task_id, job_id, execution_id): if request.url.startswith("https"): protocol = "wss" else: protocol = "ws" url = url_for( "execution.stream_job", task_id=task_id, job_id=job_id, execution_id=execution_id, external=True, ) url = "/".join(url.split("/")[:-2]) ws_url = "%s://%s/%s/ws/" % (protocol, request.host.rstrip("/"), url.lstrip("/")) return render_template( "stream.html", task_id=task_id, job_id=job_id, execution_id=execution_id, ws_url=ws_url, ) PK!-D}}fastlane/api/gzipped.py# Standard Library import gzip from io import BytesIO as IO # 3rd Party from flask import Blueprint, request bp = Blueprint("gzip", __name__) # pylint: disable=invalid-name def init_app(app): def gzip_response(response): if response.headers["Content-Type"] != "application/json": return response accept_encoding = request.headers.get("Accept-Encoding", "") if "gzip" not in accept_encoding.lower(): return response response.direct_passthrough = False if ( response.status_code < 200 or response.status_code >= 300 or "Content-Encoding" in response.headers ): return response gzip_buffer = IO() gzip_file = gzip.GzipFile(mode="wb", fileobj=gzip_buffer) gzip_file.write(response.data) gzip_file.close() response.data = gzip_buffer.getvalue() response.headers["Content-Encoding"] = "gzip" response.headers["Vary"] = "Accept-Encoding" response.headers["Content-Length"] = len(response.data) return response app.after_request(gzip_response) PK!}jYYfastlane/api/healthcheck.py# 3rd Party from flask import Blueprint, current_app, jsonify # Fastlane from fastlane.models import db bp = Blueprint( # pylint: disable=invalid-name "healthcheck", __name__, url_prefix="/healthcheck" ) @bp.route("/", methods=("GET",)) def healthcheck(): status = {"redis": True, "mongo": True, "errors": []} try: res = current_app.redis.ping() if not res: raise RuntimeError(f"Connection to redis failed ({res}).") except Exception as err: status["errors"].append({"source": "redis", "message": str(err)}) status["redis"] = False try: database = current_app.config["MONGODB_SETTINGS"]["db"] conn = getattr(db.connection, database) res = tuple(conn.jobs.find().limit(1)) if not isinstance(res, (tuple,)): raise RuntimeError(f"Connection to mongoDB failed ({res}).") except Exception as err: status["errors"].append({"source": "mongo", "message": str(err)}) status["mongo"] = False code = 200 if status["errors"]: code = 500 return jsonify(status), code PK!Ժ[[fastlane/api/helpers.py# 3rd Party from flask import current_app, make_response # Fastlane from fastlane.helpers import dumps def return_error(msg, operation, status=500, logger=None): if logger is None: logger = current_app.bind(operation=operation) logger.error(msg) return make_response(dumps({"error": msg, "operation": operation}), status) PK!&Ghfastlane/api/metrics.py# Standard Library from datetime import datetime from uuid import uuid4 # 3rd Party from flask import Blueprint, current_app, g, request bp = Blueprint("metrics", __name__) # pylint: disable=invalid-name def init_app(app): def start_timer(): request_id = request.headers.get("X-Request-ID", str(uuid4())) g.logger = current_app.logger.bind(request_id=request_id) g.request_id = request_id g.start = datetime.now() app.before_request(start_timer) def log_request(response): if request.path == "/favicon.ico": return response now = datetime.now() duration = int(round((now - g.start).microseconds / 1000, 2)) user_ip = request.headers.get("X-Forwarded-For", request.remote_addr) host = request.host.split(":", 1)[0] log_params = { "method": request.method, "path": request.path, "status": response.status_code, "duration": duration, "ip": user_ip, "host": host, } request_id = g.request_id if request_id: log_params["request_id"] = request_id if response.status_code < 400: current_app.logger.info("Request succeeded", **log_params) elif response.status_code < 500: current_app.logger.info("Bad Request", **log_params) else: current_app.logger.error("Internal Server Error", **log_params) return response app.after_request(log_request) PK!pafastlane/api/rqb.py# 3rd Party from flask import Blueprint from rq import Queue from rq_scheduler import Scheduler bp = Blueprint("rq", __name__) # pylint: disable=invalid-name class JobQueue: def __init__(self, queue_name, app): self.app = app self.queue_name = queue_name self._queue = None self._scheduler = None def enqueue_in(self, *args, **kw): self.app.logger.info("Scheduling execution for the future.", **kw) return self._scheduler.enqueue_in(*args, **kw) def enqueue_at(self, *args, **kw): self.app.logger.info("Scheduling execution for a specific timestamp.", **kw) return self._scheduler.enqueue_at(*args, **kw) def enqueue(self, *args, **kw): return self.queue.enqueue(*args, **kw) @property def queue(self): if self._queue is None: self._queue = Queue(self.queue_name, connection=self.app.redis) return self._queue @property def scheduler(self): if self._scheduler is None: self._scheduler = Scheduler(queue=self._queue) return self._scheduler def init_app(app): for queue_name in ["jobs", "monitor", "notify", "webhooks"]: key = queue_name.rstrip("s") setattr(app, "%s_queue" % key, JobQueue(queue_name, app)) PK!:fastlane/api/status.py# Standard Library from datetime import datetime # 3rd Party import croniter import pkg_resources from flask import Blueprint, current_app, jsonify, url_for # Fastlane from fastlane.models import Job, Task bp = Blueprint("status", __name__, url_prefix="/status") # pylint: disable=invalid-name @bp.route("/", methods=("GET",)) def status(): executor = current_app.executor version = pkg_resources.get_distribution("fastlane").version metadata = {"hosts": [], "containers": {"running": []}} containers = executor.get_running_containers() for host, port, container_id in containers["running"]: metadata["containers"]["running"].append( {"host": host, "port": port, "id": container_id} ) metadata["hosts"] = [] + containers["available"] + containers["unavailable"] metadata["queues"] = {"jobs": {}, "monitor": {}, "error": {}} for queue in ["jobs", "monitor", "error"]: jobs_queue_size = current_app.redis.llen(f"rq:queue:{queue}") metadata["queues"][queue]["length"] = jobs_queue_size metadata["tasks"] = {"count": Task.objects.count()} metadata["jobs"] = {"count": Job.objects.count()} metadata["jobs"]["scheduled"] = [] scheduled_jobs = Job.objects(scheduled=True).all() metadata["fastlane"] = { "version": version, "executor": current_app.config["EXECUTOR"], } for job in scheduled_jobs: j = job.to_dict(include_executions=False) itr = croniter.croniter(job.metadata["cron"], datetime.utcnow()) j["nextScheduledAt"] = itr.get_next(datetime).isoformat() task_id = job.task.task_id job_url = url_for( "task.get_job", task_id=task_id, job_id=str(job.id), _external=True ) j["url"] = job_url stop_job_url = url_for( "task.stop_job", task_id=task_id, job_id=str(job.id), _external=True ) j["stopUrl"] = stop_job_url task_url = url_for("task.get_task", task_id=task_id, _external=True) del j["taskId"] j["task"] = {"id": task_id, "url": task_url} metadata["scheduled"].append(j) return jsonify(metadata), 200 PK!j fastlane/api/stream.py# Standard Library import time from multiprocessing import Process # 3rd Party from flask import Blueprint, current_app # Fastlane from fastlane.models import Job, JobExecution bp = Blueprint("stream", __name__) # pylint: disable=invalid-name def stream_log(executor, task_id, job, ex, websocket): try: if ( not websocket.closed and ex.status == JobExecution.Status.done or ex.status == JobExecution.Status.failed ): websocket.send("EXIT CODE: %s\n" % ex.exit_code) websocket.send(ex.log) websocket.send("\n-=-\n") websocket.send(ex.error) websocket.close(message="wsdone") return if not websocket.closed and ex.status != JobExecution.Status.running: websocket.close(message="wsretry") return for log in executor.get_streaming_logs(task_id, job, ex): if websocket.closed: return websocket.send(log) except BrokenPipeError: return websocket.close(message="wsdone") def process_job_execution_logs(websocket, task_id, job_id, execution_id, logger): job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: logger.error(f"Job ({job_id}) not found in task ({task_id}).") websocket.close() return if execution_id is None: execution = job.get_last_execution() else: execution = job.get_execution_by_id(execution_id) if execution is None: logger.error("No executions found in job ({execution_id}).") websocket.close(message="wsretry") return executor = current_app.executor process = Process( target=stream_log, args=(executor, task_id, job, execution, websocket) ) process.start() while not websocket.closed: time.sleep(10) process.terminate() @bp.route("/tasks//jobs//ws/") def websocket_listen(websocket, task_id, job_id): logger = current_app.logger.bind(task_id=task_id, job_id=job_id) process_job_execution_logs(websocket, task_id, job_id, None, logger) @bp.route("/tasks//jobs//executions//ws/") def websocket_execution_listen(websocket, task_id, job_id, execution_id): logger = current_app.logger.bind(task_id=task_id, job_id=job_id) process_job_execution_logs(websocket, task_id, job_id, execution_id, logger) PK!O=fastlane/api/task.py# 3rd Party from flask import ( Blueprint, abort, current_app, g, jsonify, render_template, request, url_for, ) from rq_scheduler import Scheduler # Fastlane from fastlane.api.execution import ( perform_stop_job_execution, retrieve_execution_details, ) from fastlane.api.helpers import return_error from fastlane.models import Job, JobExecution, Task from fastlane.worker.job import run_job bp = Blueprint("task", __name__) # pylint: disable=invalid-name @bp.route("/tasks/", methods=("GET",)) def get_tasks(): logger = g.logger.bind(operation="get_tasks") try: page = int(request.args.get("page", 1)) except ValueError: logger.error(f"Tasks pagination page param should be an integer.") abort(404) per_page = current_app.config["PAGINATION_PER_PAGE"] logger.debug(f"Getting tasks page={page} per_page={per_page}...") paginator = Task.get_tasks(page=page, per_page=per_page) logger.debug("Tasks retrieved successfully...") tasks_url = url_for("task.get_tasks", _external=True) next_url = None if paginator.has_next: next_url = f"{tasks_url}?page={paginator.next_num}" prev_url = None if paginator.has_prev: prev_url = f"{tasks_url}?page={paginator.prev_num}" data = { "items": [], "total": paginator.total, "page": paginator.page, "pages": paginator.pages, "perPage": paginator.per_page, "hasNext": paginator.has_next, "hasPrev": paginator.has_prev, "nextUrl": next_url, "prevUrl": prev_url, } for task in paginator.items: data["items"].append(task.to_dict()) return jsonify(data) @bp.route("/tasks//", methods=("GET",)) def get_task(task_id): logger = g.logger.bind(operation="get_task", task_id=task_id) logger.debug("Getting job...") task = Task.get_by_task_id(task_id) if task is None: return return_error("Task not found.", "get_task", status=404, logger=logger) logger.debug("Task retrieved successfully...") task_jobs = Job.objects(id__in=[str(job_id.id) for job_id in task.jobs]) jobs = [] for job in task_jobs: url = url_for( "task.get_job", task_id=task_id, job_id=str(job.job_id), _external=True ) job = {"id": str(job.job_id), "url": url} jobs.append(job) return jsonify({"taskId": task_id, "jobs": jobs}) @bp.route("/tasks//jobs//", methods=("GET",)) def get_job(task_id, job_id): logger = g.logger.bind(operation="get_job", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: return return_error( "Job not found in task.", "get_task", status=404, logger=logger ) logger.debug("Job retrieved successfully...") details = job.to_dict( include_log=True, include_error=True, blacklist=current_app.config["ENV_BLACKLISTED_WORDS"].lower().split(","), ) for execution in details["executions"]: exec_url = url_for( "execution.get_job_execution", task_id=task_id, job_id=job_id, execution_id=execution["executionId"], _external=True, ) execution["url"] = exec_url task_url = url_for("task.get_task", task_id=task_id, _external=True) return jsonify({"task": {"id": task_id, "url": task_url}, "job": details}) @bp.route("/tasks//jobs//stop/", methods=("POST",)) def stop_job(task_id, job_id): logger = g.logger.bind(operation="stop_job", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: return return_error( "Job not found in task.", "stop_job", status=404, logger=logger ) execution = job.get_last_execution() _, response = perform_stop_job_execution( job, execution=execution, logger=logger, stop_schedule=True ) if response is not None: return response return get_job_summary(task_id, job_id) @bp.route("/tasks//jobs//retry/", methods=("POST",)) def retry_job(task_id, job_id): logger = g.logger.bind(operation="retry", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: return return_error( "Job not found in task.", "retry_job", status=404, logger=logger ) execution = job.get_last_execution() if execution is None: return return_error( "No execution yet to retry.", "retry_job", status=400, logger=logger ) scheduler = Scheduler("jobs", connection=current_app.redis) if "enqueued_id" in job.metadata and job.metadata["enqueued_id"] in scheduler: msg = "Can't retry a scheduled job." return return_error(msg, "retry_job", status=400, logger=logger) if execution.status == JobExecution.Status.running: logger.debug("Stopping current execution...") executor = current_app.executor executor.stop_job(job.task, job, execution) logger.debug("Current execution stopped.") execution.status = JobExecution.Status.failed job.save() new_exec = job.create_execution(execution.image, execution.command) new_exec.status = JobExecution.Status.enqueued logger.debug("Enqueuing job execution...") args = [task_id, job_id, new_exec.execution_id, execution.image, execution.command] result = current_app.job_queue.enqueue(run_job, *args, timeout=-1) job.metadata["enqueued_id"] = result.id job.save() logger.info("Job execution enqueued successfully.") return get_job_summary(task_id, job_id) def get_job_summary(task_id, job_id): job_url = url_for("task.get_job", task_id=task_id, job_id=job_id, _external=True) task_url = url_for("task.get_task", task_id=task_id, _external=True) return jsonify( {"taskId": task_id, "jobId": job_id, "jobUrl": job_url, "taskUrl": task_url} ) @bp.route("/tasks//jobs//stream/") def stream_job(task_id, job_id): if request.url.startswith("https"): protocol = "wss" else: protocol = "ws" url = url_for("task.stream_job", task_id=task_id, job_id=job_id, external=True) url = "/".join(url.split("/")[:-2]) ws_url = "%s://%s/%s/ws/" % (protocol, request.host.rstrip("/"), url.lstrip("/")) return render_template("stream.html", task_id=task_id, job_id=job_id, ws_url=ws_url) @bp.route("/tasks//jobs//stdout/") def stdout(task_id, job_id): return retrieve_execution_details( task_id, job_id, get_data_fn=lambda execution: execution.log ) @bp.route("/tasks//jobs//stderr/") def stderr(task_id, job_id): return retrieve_execution_details( task_id, job_id, get_data_fn=lambda execution: execution.error ) @bp.route("/tasks//jobs//logs/") def logs(task_id, job_id): func = lambda execution: f"{execution.logs}\n{execution.error}" # NOQA: 731 return retrieve_execution_details(task_id, job_id, get_data_fn=func) PK!++fastlane/cli/__init__.pyfrom fastlane.cli.core import main # NOQA PK!ytfastlane/cli/api.py""" isort:skip_file """ # 3rd Party from gevent import monkey # Must be before other imports monkey.patch_all() # isort:skip # Fastlane from fastlane.api.app import Application # NOQA pylint: disable=wrong-import-position from fastlane.config import Config # NOQA pylint: disable=wrong-import-position class APIHandler: def __init__(self, click, host, port, config, log_level): self.config_path = config self.config = None self.click = click self.host = host self.port = port self.log_level = log_level self.load_config() def load_config(self): # self.click.echo(f'Loading configuration from {self.config_path}...') self.config = Config.load(self.config_path) def __call__(self): app = Application(self.config, self.log_level) app.logger.info( "fastlane is runnning.", host=self.host, port=self.port, environment=self.config.ENV, ) app.run(self.host, self.port) PK!W.aܮ fastlane/cli/core.py# Standard Library import sys from os.path import abspath, dirname, join # 3rd Party import click import pkg_resources # Fastlane from fastlane.cli.api import APIHandler from fastlane.cli.prune import PruneHandler from fastlane.cli.worker import WorkerHandler ROOT_CONFIG = abspath(join(dirname(__file__), "../config/local.conf")) LEVELS = {0: "ERROR", 1: "WARN", 2: "INFO", 3: "DEBUG"} @click.group() def main(): pass @click.command() def version(): """Returns fastlane version.""" click.echo(pkg_resources.get_distribution("fastlane").version) @click.command() @click.option("-b", "--host", default="0.0.0.0") @click.option("-p", "--port", default=10000) @click.option("-v", "--verbose", default=0, count=True) @click.option( "-c", "--config-file", default=ROOT_CONFIG, help="configuration file to use with fastlane", ) def api(host, port, verbose, config_file): """Runs fastlane API in the specified host and port.""" log_level = LEVELS.get(verbose, "ERROR") handler = APIHandler(click, host, port, config_file, log_level) handler() @click.command() @click.option("-i", "--worker-id", default=None, help="ID for this worker") @click.option( "-j", "--no-jobs", default=False, help="""Process the 'jobs' queue?""", is_flag=True ) @click.option( "-m", "--no-monitor", default=False, help="""Process the 'monitor' queue?""", is_flag=True, ) @click.option( "-n", "--no-notify", default=False, help="""Process the 'notify' queue?""", is_flag=True, ) @click.option( "-w", "--no-webhooks", default=False, help="""Process the 'webhooks' queue?""", is_flag=True, ) @click.option("-v", "--verbose", default=0, count=True) @click.option( "-c", "--config-file", default=ROOT_CONFIG, help="configuration file to use with fastlane", ) def worker( worker_id, no_jobs, no_monitor, no_notify, no_webhooks, verbose, config_file ): """Runs an fastlane Worker with the specified queue name and starts processing.""" jobs = not no_jobs monitor = not no_monitor notify = not no_notify webhooks = not no_webhooks if not jobs and not monitor and not notify and not webhooks: click.echo( "Worker must monitor at least one queue: jobs, monitor, notify or webhooks" ) sys.exit(1) log_level = LEVELS.get(verbose, "ERROR") handler = WorkerHandler( click, worker_id, jobs, monitor, notify, webhooks, config_file, log_level ) handler() @click.command() def config(): """Prints the default config for fastlane""" from fastlane.config import Config print(Config.get_config_text()) @click.command() @click.option("-v", "--verbose", default=0, count=True) @click.option( "-c", "--config-file", default=ROOT_CONFIG, help="configuration file to use with fastlane", ) def prune(verbose, config_file): """Removes all containers that have already been processed by fastlane.""" log_level = LEVELS.get(verbose, "ERROR") handler = PruneHandler(click, config_file, log_level) handler() main.add_command(version) main.add_command(api) main.add_command(worker) main.add_command(config) main.add_command(prune) PK!"okfastlane/cli/prune.py# Fastlane from fastlane.api.app import Application from fastlane.config import Config class PruneHandler: def __init__(self, click, config, log_level): self.config_path = config self.config = None self.click = click self.log_level = log_level self.load_config() def load_config(self): self.config = Config.load(self.config_path) def __call__(self): app = Application(self.config, self.log_level).app app.logger.info(f"Running fastlane prune...") with app.app_context(): removed = app.executor.remove_done() app.logger.info(f"Prune done.", removed=removed) PK!J fastlane/cli/worker.py# Standard Library import time from uuid import uuid4 # 3rd Party import rq from rq import Connection, Worker # Fastlane from fastlane.api.app import Application from fastlane.config import Config from fastlane.worker.scheduler import QueueScheduler class WorkerHandler: def __init__( self, click, worker_id, jobs, monitor, notify, webhooks, config, log_level ): self.config_path = config self.config = None self.click = click self.worker_id = worker_id self.log_level = log_level self.queues = [] if jobs: self.queues.append("jobs") if monitor: self.queues.append("monitor") if notify: self.queues.append("notify") if webhooks: self.queues.append("webhooks") self.load_config() def load_config(self): # self.click.echo(f'Loading configuration from {self.config_path}...') self.config = Config.load(self.config_path) def __call__(self): # self.click.echo( # f'Running fastlane worker processing queues {",".join(self.queues)}.') app = Application(self.config, self.log_level) app.logger.info( f'Running fastlane worker processing queues {",".join(self.queues)}.' ) interval = app.config["WORKER_SLEEP_TIME_MS"] / 1000.0 with app.app.app_context(): worker_kw = dict(connection=app.app.redis) if self.worker_id is None: app.logger.warn( "The worker id was not set for this worker and a random one will be used." ) self.worker_id = str(uuid4()) app.logger = app.logger.bind(worker_id=self.worker_id, queues=self.queues) app.app.logger = app.logger worker_kw["name"] = self.worker_id worker = Worker(self.queues, **worker_kw) schedulers = {} with Connection(app.app.redis): for queue in self.queues: schedulers[queue] = QueueScheduler(queue, app=app.app) app.schedulers = schedulers # app.logger.debug('Processing enqueued items...') try: while True: for queue in self.queues: # app.logger.debug("Processing scheduler...", queue=queue) schedulers[queue].move_jobs() # app.logger.debug('Processing queues...') worker.work(burst=True) time.sleep(interval) except rq.worker.StopRequested: app.logger.info("Worker exiting gracefully.") return PK!=3NNfastlane/config/__init__.py# 3rd Party from derpconf.config import Config Config.allow_environment_variables() Config.define( "ENV", "production", "This configuration details the environment fastlane is running in.", "General", ) Config.define( "SECRET_KEY", "OTNDN0VCRDAtRTMyMS00NUM0LUFFQUYtNEI4QUE4RkFCRjUzCg==", """This configuration specifies the `SECRET_KEY` for Fastlane API. This should be unique per environment.""", "General", ) Config.define( "REDIS_URL", "redis://localhost:10100/0", """Redis connection string in the form of 'redis://' protocol. If `redis+sentinel` is used as protocol, instead, fastlane will connect to sentinel to get redis host.""", "Redis", ) Config.define( "WORKER_SLEEP_TIME_MS", 10, "Number of milliseconds that fastlane must sleep before getting the next job", "Worker", ) Config.define( "HARD_EXECUTION_TIMEOUT_SECONDS", 30 * 60, "Number of seconds that fastlane must wait before killing an execution", "Worker", ) Config.define( "EXPONENTIAL_BACKOFF_MIN_MS", 1000, "Number of milliseconds that fastlane must wait before the first retry in each job", "Worker", ) Config.define( "EXPONENTIAL_BACKOFF_FACTOR", 2, "Factor to multiply backoff by in each retry", "Worker", ) Config.define( "WEBHOOKS_EXPONENTIAL_BACKOFF_MIN_MS", 5000, "Number of milliseconds that fastlane must wait before " "the first retry in each webhook dispatch", "Worker", ) Config.define( "WEBHOOKS_EXPONENTIAL_BACKOFF_FACTOR", 2, "Factor to multiply backoff by in each retry for webhook dispatch", "Worker", ) Config.define( "EXECUTOR", "fastlane.worker.docker_executor", "Module full name where to find the Executor class", "Worker", ) Config.define( "DOCKER_HOSTS", [{"match": "", "hosts": ["localhost:2375"], "maxRunning": 2}], """Docker cluster definitions. The `match` portion of each host definition specifies a regular expression that must be match in order for the job to execute in one of the docker hosts in the `hosts` key. The `maxRunning` portion, specifies the maximum number of concurrent jobs in this cluster. If the regex is empty ("") it means that it matches anything. The empty regex cluster definition should be the last one, otherwise it will work all jobs. """, "Docker Executor", ) Config.define( "DOCKER_CIRCUIT_BREAKER_MAX_FAILS", 5, "Maximum number of failures to docker host to stop sending new jobs", "Docker Executor", ) Config.define( "DOCKER_CIRCUIT_BREAKER_RESET_TIMEOUT_SECONDS", 60, "Number of seconds to reopen circuit and start sending new jobs to a docker host", "Docker Executor", ) Config.define( "MONGODB_CONFIG", { "host": "mongodb://localhost:10101/fastlane", "db": "fastlane", "serverSelectionTimeoutMS": 100, "connect": False, }, "MongoDB connection details.", "Models", ) Config.define( "ERROR_HANDLERS", ["fastlane.errors.sentry.SentryErrorHandler"], "List of configured error handlers", "Errors", ) Config.define("SENTRY_DSN", "", "Sentry DSN to send errors to", "Errors") Config.define( "ENV_BLACKLISTED_WORDS", "password,key,secret,client_id", "Words that if present in environment variables are redacted", "API", ) Config.define( "SMTP_USE_SSL", False, "Wheter the SMTP server used to send notifications uses SSL", "Email", ) Config.define( "SMTP_HOST", None, "Host of the SMTP server used to send notifications", "Email" ) Config.define( "SMTP_PORT", None, "Port of the SMTP server used to send notifications", "Email" ) Config.define( "SMTP_USER", None, "User of the SMTP server used to send notifications", "Email" ) Config.define( "SMTP_PASSWORD", None, "Password of the SMTP server used to send notifications", "Email", ) Config.define( "SMTP_FROM", None, "From E-mail of the SMTP server used to send notifications", "Email", ) Config.define( "PAGINATION_PER_PAGE", 10, "Total items per page to be used on api pagination methods", "API", ) PK!֠cuufastlane/config/local.confDEBUG=True SERVER_NAME='localhost:10000' DOCKER_HOSTS=[{"match": "", "hosts": ["localhost:1234"], "maxRunning": 2}] PK!fastlane/errors/__init__.pyclass ErrorReporter: def __init__(self, app): self.app = app def report(self, err, metadata=None): raise NotImplementedError() PK!t22fastlane/errors/sentry.py# import sentry_sdk # 3rd Party from raven import Client # Fastlane from fastlane.errors import ErrorReporter class SentryErrorHandler(ErrorReporter): def __init__(self, app): super(SentryErrorHandler, self).__init__(app) self.client = None self.send = True if app.config["SENTRY_DSN"] == "": self.send = False else: app.logger.info( "Sentry configured properly.", sentry_dsn=app.config["SENTRY_DSN"] ) self.client = Client(app.config["SENTRY_DSN"], auto_log_stacks=True) def report(self, err, metadata=None): if not self.send: return if metadata is None: metadata = {} exc_info = (err.__class__, err, err.__traceback__) self.client.captureException(exc_info=exc_info, extra=metadata) # with sentry_sdk.configure_scope() as scope: # scope.level = "error" # for key, val in metadata.items(): # scope.set_extra(key, val) # sentry_sdk.capture_exception(err) PK!,1 rfastlane/helpers.pytry: from ujson import loads, dumps # pylint: disable=unused-import except ImportError: from json import loads, dumps # pylint: disable=unused-import PK!|wwfastlane/models/__init__.py""" isort:skip_file """ # 3rd Party from flask_mongoengine import MongoEngine db = MongoEngine() # isort:skip pylint: disable=invalid-name from fastlane.models.task import ( # NOQA pylint: disable=unused-import,wrong-import-position Task, ) from fastlane.models.job import ( # NOQA pylint: disable=unused-import,wrong-import-position JobExecution, Job, ) PK!Vߗfastlane/models/job.py# Standard Library import datetime from uuid import uuid4 # 3rd Party import mongoengine.errors from mongoengine import ( BooleanField, DateTimeField, DictField, EmbeddedDocumentField, IntField, ListField, ReferenceField, StringField, ) # Fastlane from fastlane.models import db class JobExecution(db.EmbeddedDocument): # pylint: disable=no-member class Status: enqueued = "enqueued" pulling = "pulling" running = "running" done = "done" failed = "failed" timedout = "timedout" expired = "expired" stopped = "stopped" created_at = DateTimeField(required=True) started_at = DateTimeField(required=False) finished_at = DateTimeField(required=False) execution_id = StringField(required=True) image = StringField(required=True) command = StringField(required=True) request_ip = StringField(required=False) status = StringField(required=True, default=Status.enqueued) log = StringField(required=False) error = StringField(required=False) exit_code = IntField(required=False) metadata = DictField(required=False) def to_dict(self, include_log=False, include_error=False): s_at = self.started_at.isoformat() if self.started_at is not None else None f_at = self.finished_at.isoformat() if self.finished_at is not None else None res = { "executionId": str(self.execution_id), "createdAt": self.created_at.isoformat(), "requestIPAddress": self.request_ip, "startedAt": s_at, "finishedAt": f_at, "image": self.image, "command": self.command, "metadata": self.metadata, "status": self.status, "exitCode": self.exit_code, } if self.finished_at is not None: res["finishedAt"] = self.finished_at.isoformat() if include_log: res["log"] = self.log if include_error: res["error"] = self.error return res class Job(db.Document): created_at = DateTimeField(required=True) last_modified_at = DateTimeField(required=True, default=datetime.datetime.now) job_id = StringField(required=True) executions = ListField(EmbeddedDocumentField(JobExecution)) task = ReferenceField( "Task", required=True, reverse_delete_rule=mongoengine.CASCADE ) request_ip = StringField(required=False) metadata = DictField(required=False) scheduled = BooleanField(required=True, default=False) def save(self, *args, **kwargs): if self.executions is None: self.executions = [] if not self.created_at: self.created_at = datetime.datetime.utcnow() self.last_modified_at = datetime.datetime.utcnow() return super(Job, self).save(*args, **kwargs) def create_execution(self, image, command): ex_id = str(uuid4()) ex = JobExecution( execution_id=ex_id, image=image, command=command, created_at=datetime.datetime.utcnow(), ) self.executions.append(ex) self.save() return ex def get_metadata(self, blacklist): if "envs" in self.metadata: envs = {} for key, val in self.metadata["envs"].items(): for word in blacklist: if word in key.lower(): val = "*" * len(str(val)) break envs[key] = val self.metadata["envs"] = envs return self.metadata def to_dict( self, include_log=False, include_error=False, include_executions=True, blacklist=None, ): if blacklist is None: blacklist = [] meta = self.get_metadata(blacklist) res = { "createdAt": self.created_at.isoformat(), "lastModifiedAt": self.last_modified_at.isoformat(), "taskId": self.task.task_id, "scheduled": self.scheduled, "executionCount": len(self.executions), "requestIPAddress": self.request_ip, "metadata": meta, } if include_executions: executions = [ ex.to_dict(include_log, include_error) for ex in sorted(self.executions, key=lambda ex: ex.created_at)[-20:] ] res["executions"] = executions return res @classmethod def get_by_id(cls, task_id, job_id): from fastlane.models.task import Task if task_id is None or task_id == "" or job_id is None or job_id == "": raise RuntimeError( "Task ID and Job ID are required and can't be None or empty." ) task = Task.objects(task_id=task_id).first() job = cls.objects(task=task, job_id=job_id).first() return job def get_execution_by_id(self, execution_id): for job_execution in self.executions: if job_execution.execution_id == execution_id: return job_execution return None def get_last_execution(self): if not self.executions: return None return self.executions[-1] PK!X8#g fastlane/models/task.py# Standard Library import datetime from uuid import uuid4 # 3rd Party import mongoengine.errors from flask import url_for from mongoengine import DateTimeField, ListField, ReferenceField, StringField # Fastlane from fastlane.models import db class Task(db.Document): created_at = DateTimeField(required=True) last_modified_at = DateTimeField(required=True, default=datetime.datetime.now) task_id = StringField(required=True) jobs = ListField(ReferenceField("Job")) def _validate(self): errors = {} if self.task_id == "": errors["task_id"] = mongoengine.errors.ValidationError( "Field is required", field_name="task_id" ) if errors: message = "ValidationError (%s:%s) " % (self._class_name, self.pk) raise mongoengine.errors.ValidationError(message, errors=errors) def save(self, *args, **kwargs): self._validate() if not self.created_at: self.created_at = datetime.datetime.now() self.last_modified_at = datetime.datetime.now() return super(Task, self).save(*args, **kwargs) def get_url(self): return url_for("task.get_task", task_id=self.task_id, _external=True) def to_dict(self): res = { "taskId": self.task_id, "createdAt": self.created_at.timestamp(), "lastModifiedAt": self.last_modified_at.timestamp(), "url": self.get_url(), "jobsCount": len(self.jobs), } return res @classmethod def create_task(cls, task_id): new_task = cls(task_id=task_id) new_task.save() return new_task @classmethod def get_tasks(cls, page=1, per_page=20): return cls.objects.paginate(page, per_page) # pylint: disable=no-member @classmethod def get_by_task_id(cls, task_id): if task_id is None or task_id == "": raise RuntimeError("Task ID is required and can't be None or empty.") return cls.objects(task_id=task_id).no_dereference().first() def create_job(self): from fastlane.models.job import Job job_id = uuid4() j = Job(job_id=str(job_id)) j.task = self j.save() self.jobs.append(j) self.save() return j def create_or_update_job(self, job_id): from fastlane.models.job import Job jobs = list(filter(lambda job: str(job.job_id) == job_id, self.jobs)) if not jobs: j = Job(job_id=str(job_id)) j.task = self j.save() self.jobs.append(j) self.save() else: j = jobs[0] return j PK!-*fastlane/templates/stream.html {% if execution_id %} {{ task_id }} - {{ job_id }} - {{ execution_id }} - Fastlane {% else %} {{ task_id }} - {{ job_id }} - Fastlane {% endif %}

PK!$7fastlane/utils.py# Standard Library import re from datetime import timedelta REGEX = re.compile(r"((?P\d+?)h)?((?P\d+?)m)?((?P\d+?)s)?") def parse_time(time_str): if time_str is None: return None parts = REGEX.match(time_str) if not parts: return None parts = parts.groupdict() time_params = {} for (name, param) in parts.items(): if param: time_params[name] = int(param) return timedelta(**time_params) PK!Jߕfastlane/worker/__init__.pyclass ExecutionResult: class Status: created = 'created' running = 'running' failed = 'failed' done = 'done' def __init__(self, status): self.status = status self.exit_code = None self.started_at = None self.finished_at = None self.log = '' self.error = '' def set_log(self, log): self.log = log PK!EZڻaa"fastlane/worker/docker_executor.py# Standard Library import random import re import traceback from json import loads # 3rd Party import docker import pybreaker import requests from dateutil.parser import parse from flask import Blueprint, current_app, g, make_response, request # Fastlane from fastlane.worker import ExecutionResult from fastlane.worker.errors import HostUnavailableError, NoAvailableHostsError # http://docs.docker.com/engine/reference/commandline/ps/#examples # One of created, restarting, running, removing, paused, exited, or dead STATUS = { "created": ExecutionResult.Status.created, "exited": ExecutionResult.Status.done, "dead": ExecutionResult.Status.failed, "running": ExecutionResult.Status.running, } bp = Blueprint( # pylint: disable=invalid-name "docker", __name__, url_prefix="/docker-executor" ) BLACKLIST_KEY = "docker-executor::blacklisted-hosts" JOB_PREFIX = "fastlane-job" def convert_date(date_to_parse): return parse(date_to_parse) def get_details(): details = request.get_json() if details is None and request.get_data(): details = loads(request.get_data()) return details @bp.route("/blacklist", methods=["POST", "PUT"]) def add_to_blacklist(): redis = current_app.redis data = get_details() if data is None or data == "": msg = "Failed to add host to blacklist because JSON body could not be parsed." g.logger.warn(msg) return make_response(msg, 400) if "host" not in data: msg = "Failed to add host to blacklist because 'host' attribute was not found in JSON body." g.logger.warn(msg) return make_response(msg, 400) host = data["host"] redis.sadd(BLACKLIST_KEY, host) return "" @bp.route("/blacklist", methods=["DEL", "DELETE"]) def remove_from_blacklist(): redis = current_app.redis data = get_details() if data is None or data == "": msg = "Failed to remove host from blacklist because JSON body could not be parsed." g.logger.warn(msg) return make_response(msg, 400) if "host" not in data: msg = ( "Failed to remove host from blacklist because 'host'" " attribute was not found in JSON body." ) g.logger.warn(msg) return make_response(msg, 400) host = data["host"] redis.srem(BLACKLIST_KEY, host) return "" class DockerPool: def __init__(self, docker_hosts): self.docker_hosts = docker_hosts self.max_running = {} self.clients_per_regex = [] self.clients = {} self.__init_clients() def __init_clients(self): for regex, docker_hosts, max_running in self.docker_hosts: client_list = [] clients = (regex, client_list) self.clients_per_regex.append(clients) self.max_running[regex] = max_running for address in docker_hosts: host, port = address.split(":") docker_client = docker.DockerClient(base_url=address) self.clients[address] = (host, int(port), docker_client) client_list.append((host, int(port), docker_client)) @staticmethod def refresh_circuits(executor, clients, blacklisted_hosts, logger): def docker_ps(client): client.containers.list(sparse=False) for host, port, client in clients: if f"{host}:{port}" in blacklisted_hosts: continue try: logger.debug("Refreshing host...", host=host, port=port) circuit = executor.get_circuit(f"{host}:{port}") circuit.call(docker_ps, client) except (requests.exceptions.ConnectionError, pybreaker.CircuitBreakerError): error = traceback.format_exc() logger.error("Failed to refresh host.", error=error) def get_client(self, executor, task_id, host=None, port=None, blacklist=None): logger = current_app.logger.bind( task_id=task_id, host=host, port=port, blacklist=blacklist ) if host is not None and port is not None: logger.debug("Custom host returned.") docker_client = self.clients.get(f"{host}:{port}") if docker_client is None: return host, port, None return docker_client if blacklist is None: blacklist = set() for regex, clients in self.clients_per_regex: logger.debug("Trying to retrieve docker client...", regex=regex) if regex is not None and not regex.match(task_id): logger.debug("Task ID does not match regex.", regex=regex) continue DockerPool.refresh_circuits(executor, clients, blacklist, logger) filtered = [ (host, port, client) for (host, port, client) in clients if f"{host}:{port}" not in blacklist and executor.get_circuit(f"{host}:{port}").current_state == "closed" ] if not filtered: logger.debug( "No non-blacklisted and closed circuit clients found for farm.", regex=regex, ) continue logger.info( "Returning random choice out of the remaining clients.", clients=[f"{host}:{port}" for (host, port, client) in filtered], ) host, port, client = random.choice(filtered) return host, int(port), client msg = f"Failed to find a docker host for task id {task_id}." logger.error(msg) raise NoAvailableHostsError(msg) class Executor: def __init__(self, app, pool=None): self.app = app self.logger = app.logger self.pool = pool self.circuits = {} if pool is None: docker_hosts = [] hosts = self.app.config["DOCKER_HOSTS"] if isinstance(hosts, (tuple, list)): clusters = list(hosts) elif isinstance(hosts, (dict)): clusters = [hosts] else: clusters = loads(hosts) self.logger.debug("Initializing docker pool...", clusters=clusters) for i, cluster in enumerate(clusters): regex = cluster["match"] if not regex: regex = None if i != len(clusters): self.logger.warn( "Farm with no regex found before the end of the DOCKER_HOSTS " "definition. This means all the subsequent farms will never be" " used as this one will always match anything that reaches it. " "Please ensure that the farm with no regex match is the last one." ) else: regex = re.compile(regex) hosts = cluster["hosts"] max_running = cluster.get("maxRunning", 10) self.logger.info( "Found farm definition.", regex=cluster["match"], hosts=hosts, max_running=max_running, ) docker_hosts.append((regex, hosts, max_running)) self.pool = DockerPool(docker_hosts) def get_container_by_id(self, container_id, host, port, client): logger = self.logger.bind( host=host, port=port, container_id=container_id, operation="docker_host.get_container_by_id", ) circuit = self.get_circuit(f"{host}:{port}") @circuit def run(logger): try: logger = logger.bind(container_id=container_id) logger.debug("Finding container...") container = client.containers.get(container_id) logger.info("Container found.") return container except requests.exceptions.ConnectionError as err: raise HostUnavailableError(host, port, err) from err try: return run(logger) except pybreaker.CircuitBreakerError as err: raise HostUnavailableError(host, port, err) from err def validate_max_running_executions(self, task_id): total_running = 0 max_running = 0 logger = self.logger.bind( task_id=task_id, operation="docker_host.validate_max_running_executions" ) for regex, _ in self.pool.clients_per_regex: if regex is not None and not regex.match(task_id): logger.debug("Farm does not match task_id.", regex=regex) continue running = self.get_running_containers(regex) total_running = len(running["running"]) max_running = self.pool.max_running[regex] logger.debug( "Found number of running containers.", total_running=total_running, max_running=max_running, ) break return total_running == 0 or total_running <= max_running def get_circuit(self, key): max_fails = int(current_app.config["DOCKER_CIRCUIT_BREAKER_MAX_FAILS"]) reset_timeout = int( current_app.config["DOCKER_CIRCUIT_BREAKER_RESET_TIMEOUT_SECONDS"] ) return self.circuits.setdefault( key, pybreaker.CircuitBreaker( fail_max=max_fails, reset_timeout=reset_timeout, state_storage=pybreaker.CircuitRedisStorage( pybreaker.STATE_CLOSED, current_app.redis, namespace=key ), ), ) def update_image(self, task, job, execution, image, tag, blacklisted_hosts=None): if blacklisted_hosts is None: blacklisted_hosts = self.get_blacklisted_hosts() logger = self.logger.bind( task_id=task.task_id, job_id=str(job.job_id), execution_id=str(execution.execution_id), image=image, tag=tag, blacklisted_hosts=blacklisted_hosts, operation="docker_executor.update_image", ) host, port, client = self.pool.get_client( self, task.task_id, blacklist=blacklisted_hosts ) circuit = self.get_circuit(f"{host}:{port}") logger = logger.bind(host=host, port=port) @circuit def run(logger): try: logger.debug("Updating image in docker host...") client.images.pull(image, tag=tag) execution.metadata["docker_host"] = host execution.metadata["docker_port"] = port logger.info( "Image updated successfully. Docker host and port " "stored in Job Execution for future reference." ) except requests.exceptions.ConnectionError as err: error = traceback.format_exc() logger.error( "Failed to connect to Docker Host. Will retry job later with a new host.", error=error, ) if "docker_host" in execution.metadata: del execution.metadata["docker_host"] if "docker_port" in execution.metadata: del execution.metadata["docker_port"] raise HostUnavailableError(host, port, err) from err try: run(logger) except pybreaker.CircuitBreakerError as err: raise HostUnavailableError(host, port, err) from err def run(self, task, job, execution, image, tag, command, blacklisted_hosts=None): logger = self.logger.bind( task_id=task.task_id, job_id=str(job.job_id), execution_id=str(execution.execution_id), image=image, tag=tag, command=command, blacklisted_hosts=blacklisted_hosts, operation="docker_executor.run", ) if "docker_host" not in execution.metadata: raise RuntimeError( "Can't run job without docker_host and docker_port in execution metadata." ) docker_host = execution.metadata["docker_host"] docker_port = execution.metadata["docker_port"] host, port, client = self.pool.get_client( self, task.task_id, docker_host, docker_port ) logger = logger.bind(host=host, port=port) circuit = self.get_circuit(f"{host}:{port}") @circuit def run(logger): try: container_name = f"{JOB_PREFIX}-{execution.execution_id}" envs = job.metadata.get("envs", {}) logger = logger.bind(container_name=container_name, envs=envs) logger.debug("Running the Job in Docker Host...") container = client.containers.run( image=f"{image}:{tag}", name=container_name, command=command, detach=True, environment=envs, ) execution.metadata["container_id"] = container.id logger.info( "Container started successfully. Container ID " "stored as Job Execution metadata.", container_id=container.id, ) except (requests.exceptions.ConnectionError,) as err: error = traceback.format_exc() logger.error( "Failed to connect to Docker Host. Will retry job later with a new host.", error=error, ) if "docker_host" in execution.metadata: del execution.metadata["docker_host"] if "docker_port" in execution.metadata: del execution.metadata["docker_port"] raise HostUnavailableError(host, port, err) from err try: run(logger) except pybreaker.CircuitBreakerError as err: raise HostUnavailableError(host, port, err) from err return True def stop_job(self, task, job, execution): logger = self.logger.bind( task_id=task.task_id, job_id=str(job.job_id), execution_id=str(execution.execution_id), operation="docker_executor.stop_job", ) if "container_id" not in execution.metadata: logger.warn( "Can't stop Job Execution, since it has not been started. Aborting..." ) return False docker_host = execution.metadata["docker_host"] docker_port = execution.metadata["docker_port"] host, port, client = self.pool.get_client( self, task.task_id, docker_host, docker_port ) logger = logger.bind(host=host, port=port) circuit = self.get_circuit(f"{host}:{port}") container_id = execution.metadata["container_id"] logger = logger.bind(container_id=container_id) container = self.get_container_by_id(container_id, host, port, client) @circuit def run(logger): try: logger.info("Container found.") logger.debug("Stopping container...") container.stop() logger.info("Container stopped.") except requests.exceptions.ConnectionError as err: error = traceback.format_exc() logger.error("Failed to connect to Docker Host.", error=error) raise HostUnavailableError(host, port, err) from err try: run(logger) except pybreaker.CircuitBreakerError as err: raise HostUnavailableError(host, port, err) from err return True def get_result(self, task, job, execution): execution_host = execution.metadata["docker_host"] execution_port = execution.metadata["docker_port"] host, port, client = self.pool.get_client( self, task.task_id, execution_host, execution_port ) logger = self.logger.bind( task_id=task.task_id, job_id=str(job.job_id), execution_id=str(execution.execution_id), operation="docker_executor.get_result", ) container_id = execution.metadata["container_id"] container = self.get_container_by_id(container_id, host, port, client) # container.attrs['State'] # {'Status': 'exited', 'Running': False, 'Paused': False, 'Restarting': False, # 'OOMKilled': False, 'Dead': False, 'Pid': 0, 'ExitCode': 0, 'Error': '', # 'StartedAt': '2018-08-27T17:14:14.1951232Z', 'FinishedAt': '2018-08-27T17:14:14.2707026Z'} logger = logger.bind(container_id=container_id) result = ExecutionResult( STATUS.get(container.status, ExecutionResult.Status.done) ) state = container.attrs["State"] result.exit_code = state["ExitCode"] result.error = state["Error"] result.started_at = convert_date(state["StartedAt"]) logger = logger.bind( status=container.status, state=state, exit_code=result.exit_code, error=result.error, ) logger.debug("Container result found.") if ( result.status == ExecutionResult.Status.done or result.status == ExecutionResult.Status.failed ): # TODO: Use circuit in this point result.finished_at = convert_date(state["FinishedAt"]) result.log = container.logs(stdout=True, stderr=False) if result.error != "": logs = container.logs(stdout=False, stderr=True).decode("utf-8") result.error += f"\n\nstderr:\n{logs}" else: result.error = container.logs(stdout=False, stderr=True) logger.info("Container finished executing.", finished_at=result.finished_at) return result def _get_all_clients(self, regex): clients = self.pool.clients.values() if regex is not None: for cluster_regex, cluster_clients in self.pool.clients_per_regex: if cluster_regex is not None and cluster_regex != regex: continue clients = cluster_clients break return [ (host, port, client, self.get_circuit(f"{host}:{port}")) for host, port, client in clients ] def _list_containers(self, host, port, client, circuit): @circuit def run(): running = [] containers = client.containers.list( sparse=False, filters={"status": "running"} ) for container in containers: if not container.name.startswith(JOB_PREFIX): continue running.append((host, port, container.id)) return running return run() def get_running_containers(self, regex=None, blacklisted_hosts=None): if blacklisted_hosts is None: blacklisted_hosts = self.get_blacklisted_hosts() running = [] unavailable_clients = [] unavailable_clients_set = set() clients = self._get_all_clients(regex) for (host, port, client, circuit) in clients: if f"{host}:{port}" in blacklisted_hosts: unavailable_clients_set.add(f"{host}:{port}") unavailable_clients.append( (host, port, RuntimeError("server is blacklisted")) ) continue try: running += self._list_containers(host, port, client, circuit) except Exception as err: unavailable_clients_set.add(f"{host}:{port}") unavailable_clients.append((host, port, err)) return { "available": [ { "host": host, "port": port, "available": True, "blacklisted": f"{host}:{port}" in blacklisted_hosts, "circuit": circuit.current_state, "error": None, } for (host, port, client, circuit) in clients if f"{host}:{port}" not in unavailable_clients_set ], "unavailable": [ { "host": host, "port": port, "available": False, "blacklisted": f"{host}:{port}" in blacklisted_hosts, "circuit": self.get_circuit(f"{host}:{port}").current_state, "error": str(err), } for (host, port, err) in unavailable_clients ], "running": running, } def get_current_logs(self, task_id, execution): execution_host = execution.metadata["docker_host"] execution_port = execution.metadata["docker_port"] _, _, client = self.pool.get_client( self, task_id, execution_host, execution_port ) container_id = execution.metadata["container_id"] container = self.get_container_by_id( container_id, execution_host, execution_port, client ) log = container.logs(stdout=True, stderr=True).decode("utf-8") return log def get_streaming_logs(self, task_id, job, execution): execution_host = execution.metadata["docker_host"] execution_port = execution.metadata["docker_port"] host, port, client = self.pool.get_client( self, task_id, execution_host, execution_port ) container_id = execution.metadata["container_id"] logger = self.logger.bind( task_id=task_id, job=str(job.job_id), execution_id=str(execution.execution_id), operation="docker_host.get_streaming_logs", host=host, port=port, container_id=container_id, ) logger.debug("Getting container...") container = self.get_container_by_id( container_id, execution_host, execution_port, client ) logger.info("Container found successfully.") for log in container.logs(stdout=True, stderr=True, stream=True): yield log.decode("utf-8") def get_blacklisted_hosts(self): redis = self.app.redis hosts = redis.smembers(BLACKLIST_KEY) return {host.decode("utf-8") for host in hosts} def mark_as_done(self, task, job, execution): execution_host = execution.metadata["docker_host"] execution_port = execution.metadata["docker_port"] host, port, client = self.pool.get_client( self, task.task_id, execution_host, execution_port ) container_id = execution.metadata["container_id"] logger = self.logger.bind( task_id=task.task_id, job=str(job.job_id), execution_id=str(execution.execution_id), operation="docker_host.mark_as_done", host=host, port=port, container_id=container_id, ) container = self.get_container_by_id(container_id, host, port, client) try: new_name = f"defunct-{container.name}" logger.debug("Renaming container...", new_name=new_name) container.rename(new_name) logger.debug("Container renamed.", new_name=new_name) except ( pybreaker.CircuitBreakerError, requests.exceptions.ConnectionError, ) as err: error = traceback.format_exc() logger.error("Failed to connect to Docker Host.", error=error) raise HostUnavailableError(host, port, err) from err def remove_done(self): removed_containers = [] clients = self.pool.clients.values() for (host, port, client) in clients: containers = client.containers.list( sparse=False, all=True, filters={"name": f"defunct-{JOB_PREFIX}"} ) for container in containers: removed_containers.append( { "host": f"{host}:{port}", "name": container.name, "id": container.id, "image": container.image.attrs["RepoTags"][0], } ) container.remove() self.logger.info( "Removed all defunct containers.", removed_containers=removed_containers ) return removed_containers PK! fastlane/worker/errors.pyclass HostUnavailableError(RuntimeError): def __init__(self, host, port, error): msg = f"Connection to host {host}:{port} failed with error: {error}" super(HostUnavailableError, self).__init__(msg) self.host = host self.port = port self.error = error self.message = f"Connection to host {self.host}:{self.port} failed with error: {self.error}" class NoAvailableHostsError(RuntimeError): pass PK!R\\fastlane/worker/job.py# Standard Library import calendar import math import smtplib import time import traceback from datetime import datetime, timedelta from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText # 3rd Party from flask import current_app, url_for from rq_scheduler import Scheduler # Fastlane from fastlane.helpers import dumps, loads from fastlane.models import Job, JobExecution from fastlane.worker import ExecutionResult from fastlane.worker.errors import HostUnavailableError from fastlane.worker.webhooks import WebhooksDispatcher, WebhooksDispatchError def validate_max_concurrent( executor, task_id, job, execution_id, image, command, logger ): if not executor.validate_max_running_executions(task_id): logger.debug( "Maximum number of global container executions reached. Enqueuing job execution..." ) args = [task_id, job.id, execution_id, image, command] result = current_app.job_queue.enqueue(run_job, *args, timeout=-1) job.metadata["enqueued_id"] = result.id job.save() logger.info( "Job execution re-enqueued successfully due to max number of container executions." ) return False return True def validate_expiration(job, ex, logger): now = datetime.utcnow() unixtime = calendar.timegm(now.utctimetuple()) if ( job.metadata.get("expiration") is not None and job.metadata["expiration"] < unixtime ): expiration_utc = datetime.utcfromtimestamp(job.metadata["expiration"]) ex.status = JobExecution.Status.expired ex.error = "Job was supposed to be done before %s, but was started at %s." % ( expiration_utc.isoformat(), now.isoformat(), ) ex.finished_at = datetime.utcnow() job.save() logger.info( "Job execution canceled due to being expired.", job_expiration=job.metadata["expiration"], current_ts=unixtime, ) return False return True def reenqueue_job_due_to_break(task_id, job_id, execution_id, image, command): args = [task_id, job_id, execution_id, image, command] delta = timedelta(seconds=1.0) scheduler = Scheduler("jobs", connection=current_app.redis) future_date = datetime.utcnow() + delta enqueued = scheduler.enqueue_at(future_date, run_job, *args) return enqueued.id def download_image(executor, job, ex, image, tag, command, logger): try: logger.debug("Downloading updated container image...", image=image, tag=tag) before = time.time() executor.update_image(job.task, job, ex, image, tag) ellapsed = time.time() - before logger.info( "Image downloaded successfully.", image=image, tag=tag, ellapsed=ellapsed ) except HostUnavailableError: enqueued_id = reenqueue_job_due_to_break( job.task.task_id, str(job.job_id), image, command ) job.metadata["enqueued_id"] = enqueued_id job.save() logger.warn("Job execution re-enqueued successfully.") return False except Exception as err: error = traceback.format_exc() logger.error("Failed to download image.", error=error) ex.error = error ex.status = JobExecution.Status.failed job.save() current_app.report_error( err, metadata=dict( operation="Downloading Image", task_id=job.task.task_id, job_id=job.id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) return False return True def run_container(executor, job, ex, image, tag, command, logger): logger.debug( "Running command in container...", image=image, tag=tag, command=command ) try: ex.started_at = datetime.utcnow() job.save() before = time.time() executor.run(job.task, job, ex, image, tag, command) ellapsed = time.time() - before logger.info( "Container started successfully.", image=image, tag=tag, command=command, ellapsed=ellapsed, ) except HostUnavailableError: enqueued_id = reenqueue_job_due_to_break( job.task.task_id, str(job.job_id), image, command ) job.metadata["enqueued_id"] = enqueued_id job.save() logger.warn("Job execution re-enqueued successfully.") return False except Exception as err: error = traceback.format_exc() logger.error("Failed to run command", error=error) ex.error = error ex.status = JobExecution.Status.failed job.save() current_app.report_error( err, metadata=dict( operation="Running Container", task_id=job.task.task_id, job_id=job.id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) return False return True def run_job(task_id, job_id, execution_id, image, command): app = current_app logger = app.logger.bind( operation="run_job", task_id=task_id, job_id=job_id, image=image, command=command, ) try: executor = app.executor job = Job.get_by_id(task_id, job_id) if job is None: logger.error("Job was not found with task id and job id.") return False if not validate_max_concurrent( executor, task_id, job, execution_id, image, command, logger ): return False tag = "latest" if ":" in image: image, tag = image.split(":") logger = logger.bind(image=image, tag=tag) logger.debug("Changing job status...", status=JobExecution.Status.pulling) if execution_id is None: ex = job.create_execution(image=image, command=command) ex.status = JobExecution.Status.enqueued job.save() else: ex = job.get_execution_by_id(execution_id) logger.debug( "Job status changed successfully.", status=JobExecution.Status.pulling ) logger = logger.bind(execution_id=ex.execution_id) except Exception as err: error = traceback.format_exc() logger.error("Failed to create job execution. Skipping job...", error=error) current_app.report_error( err, metadata=dict(task_id=task_id, job_id=job_id, image=image, command=command), ) return False try: if not validate_expiration(job, ex, logger): return False logger.info("Started processing job.") if not download_image(executor, job, ex, image, tag, command, logger): return False if not run_container(executor, job, ex, image, tag, command, logger): return False logger.debug("Changing job status...", status=JobExecution.Status.running) ex.status = JobExecution.Status.running job.save() logger.debug( "Job status changed successfully.", status=JobExecution.Status.running ) app.monitor_queue.enqueue( monitor_job, task_id, job_id, ex.execution_id, timeout=-1 ) return True except Exception as err: error = traceback.format_exc() logger.error("Failed to run job", error=error) ex.status = JobExecution.Status.failed ex.error = "Job failed to run with error: %s" % error job.save() current_app.report_error( err, metadata=dict( operation="Running Container", task_id=task_id, job_id=job_id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) def _webhook_dispatch(task, job, execution, collection, logger): task_id = task.task_id job_id = str(job.id) execution_id = str(execution.execution_id) for webhook in collection: method = "POST" url = webhook.get("url") if url is None: logger.warn("Webhook with empty URL found. Skipping...") continue headers = webhook.get("headers", {}) retries = webhook.get("retries", 0) hook_logger = logger.bind( method=method, url=url, headers=headers, retries=retries, retry_count=0 ) hook_logger.debug("Enqueueing webhook...") args = [task_id, job_id, execution_id, method, url, headers, retries, 0] current_app.webhook_queue.enqueue(send_webhook, *args, timeout=-1) hook_logger.info("Webhook enqueued successfully.") def send_webhooks(task, job, execution, logger): if execution.status == JobExecution.Status.done: succeed = job.metadata.get("webhooks", {}).get("succeeds", []) logger.debug("Sending success webhooks...") _webhook_dispatch(task, job, execution, succeed, logger) if execution.status == JobExecution.Status.failed: fails = job.metadata.get("webhooks", {}).get("fails", []) logger.debug("Sending failed webhooks...") _webhook_dispatch(task, job, execution, fails, logger) finishes = job.metadata.get("webhooks", {}).get("finishes", []) logger.info("Sending completion webhooks...") _webhook_dispatch(task, job, execution, finishes, logger) def notify_users(task, job, execution, logger): task_id = task.task_id job_id = str(job.id) execution_id = str(execution.execution_id) succeed = job.metadata.get("notify", {}).get("succeeds", []) fails = job.metadata.get("notify", {}).get("fails", []) finishes = job.metadata.get("notify", {}).get("finishes", []) if execution.status == JobExecution.Status.done: logger.info("Notifying users of success...") for email in succeed: logger.info("Notifying user of success...", email=email) subject = "Job %s/%s succeeded!" % (task_id, job_id) args = [task_id, job_id, execution_id, subject, email] current_app.notify_queue.enqueue(send_email, *args, timeout=-1) if execution.status == JobExecution.Status.failed: logger.info("Notifying users of failure...") for email in fails: logger.info( "Notifying user of failure...", email=email, exit_code=execution.exit_code, ) subject = "Job %s/%s failed with exit code %d!" % ( task_id, job_id, execution.exit_code, ) args = [task_id, job_id, execution_id, subject, email] current_app.notify_queue.enqueue(send_email, *args, timeout=-1) logger.info("Notifying users of completion...") for email in finishes: logger.info( "Notifying user of completion...", email=email, exit_code=execution.exit_code, ) subject = "Job %s/%s finished with exit code %d!" % ( task_id, job_id, execution.exit_code, ) args = [task_id, job_id, execution_id, subject, email] current_app.notify_queue.enqueue(send_email, *args, timeout=-1) def reenqueue_monitor_due_to_break(task_id, job_id, execution_id): args = [task_id, job_id, execution_id] delta = timedelta(seconds=1.0) scheduler = Scheduler("monitor", connection=current_app.redis) future_date = datetime.utcnow() + delta enqueued = scheduler.enqueue_at(future_date, monitor_job, *args) return enqueued.id def monitor_job(task_id, job_id, execution_id): try: app = current_app executor = app.executor job = Job.get_by_id(task_id, job_id) logger = app.logger.bind( operation="monitor_job", task_id=task_id, job_id=job_id, execution_id=execution_id, ) if job is None: logger.error("Failed to retrieve task or job.") return False execution = job.get_execution_by_id(execution_id) try: result = executor.get_result(job.task, job, execution) except HostUnavailableError: reenqueue_monitor_due_to_break(task_id, job_id, execution_id) logger.warn("Job monitor re-enqueued successfully.") return False logger.info( "Container result obtained.", container_status=result.status, container_exit_code=result.exit_code, ) if result.status in ( ExecutionResult.Status.created, ExecutionResult.Status.running, ): ellapsed = (datetime.utcnow() - execution.started_at).total_seconds() if ellapsed > job.metadata["timeout"]: execution.finished_at = datetime.utcnow() execution.status = JobExecution.Status.timedout execution.error = "Job execution timed out after %d seconds." % ellapsed try: executor.stop_job(job.task, job, execution) except HostUnavailableError: reenqueue_monitor_due_to_break(task_id, job_id, execution_id) logger.warn("Job monitor re-enqueued successfully.") return False logger.debug( "Job execution timed out. Storing job details in mongo db.", status=execution.status, ellapsed=ellapsed, error=result.error, ) job.save() logger.info("Job execution timed out.", status=execution.status) return False scheduler = Scheduler("monitor", connection=app.redis) logger.info( "Job has not finished. Retrying monitoring in the future.", container_status=result.status, seconds=1, ) interval = timedelta(seconds=5) scheduler.enqueue_in(interval, monitor_job, task_id, job_id, execution_id) return True if ( result.exit_code != 0 and "retry_count" in job.metadata and job.metadata["retry_count"] < job.metadata["retries"] ): retry_logger = logger.bind( exit_code=result.exit_code, retry_count=job.metadata["retry_count"], retries=job.metadata["retries"], ) retry_logger.debug("Job failed. Enqueuing job retry...") job.metadata["retry_count"] += 1 scheduler = Scheduler("jobs", connection=current_app.redis) new_exec = job.create_execution(execution.image, execution.command) new_exec.status = JobExecution.Status.enqueued args = [ task_id, job_id, new_exec.execution_id, execution.image, execution.command, ] factor = app.config["EXPONENTIAL_BACKOFF_FACTOR"] min_backoff = app.config["EXPONENTIAL_BACKOFF_MIN_MS"] / 1000.0 delta = timedelta(seconds=min_backoff) if job.metadata["retries"] > 0: delta = timedelta( seconds=math.pow(factor, job.metadata["retry_count"]) * min_backoff ) future_date = datetime.utcnow() + delta enqueued = scheduler.enqueue_at(future_date, run_job, *args) job.metadata["enqueued_id"] = enqueued.id job.save() retry_logger.info("Job execution enqueued successfully.") # still need to finish current execution as the retry # will be a new execution execution.finished_at = datetime.utcnow() execution.exit_code = result.exit_code execution.status = ( JobExecution.Status.done if execution.exit_code == 0 else JobExecution.Status.failed ) execution.log = result.log.decode("utf-8") execution.error = result.error.decode("utf-8") logger.debug( "Job finished. Storing job details in mongo db.", status=execution.status, log=result.log, error=result.error, ) job.save() logger.info("Job details stored in mongo db.", status=execution.status) try: executor.mark_as_done(job.task, job, execution) except HostUnavailableError: reenqueue_monitor_due_to_break(task_id, job_id, execution_id) logger.warn("Job monitor re-enqueued successfully.") return False send_webhooks(job.task, job, execution, logger) notify_users(job.task, job, execution, logger) return True except Exception as err: error = traceback.format_exc() logger.error("Failed to monitor job", error=error) current_app.report_error( err, metadata=dict( operation="Monitoring Job", task_id=task_id, job_id=job_id, execution_id=execution_id, ), ) raise err def send_email(task_id, job_id, execution_id, subject, to_email): job = Job.get_by_id(task_id, job_id) logger = current_app.logger.bind( operation="send_email", task_id=task_id, job_id=job_id, to_email=to_email, execution_id=execution_id, subject=subject, ) if job is None: logger.error("Failed to retrieve task or job.") return False execution = job.get_execution_by_id(execution_id) logger.info("Execution loaded successfully") smtp_host = current_app.config["SMTP_HOST"] smtp_port = current_app.config["SMTP_PORT"] smtp_from = current_app.config["SMTP_FROM"] if smtp_host is None or smtp_port is None or smtp_from is None: logger.error( "SMTP_HOST, SMTP_PORT and SMTP_FROM must be configured. Skipping sending e-mail." ) return False try: smtp_port = int(smtp_port) logger = logger.bind(smtp_host=smtp_host, smtp_port=smtp_port) logger.info("Connecting to SMTP Server...") server = smtplib.SMTP(smtp_host, smtp_port) server.set_debuglevel(0) if current_app.config.get("SMTP_USE_SSL"): logger.info("Starting TLS...") server.starttls() smtp_user = current_app.config.get("SMTP_USER") smtp_password = current_app.config.get("SMTP_PASSWORD") if smtp_user and smtp_password: logger.info( "Authenticating with SMTP...", smtp_user=smtp_user, smtp_password=smtp_password, ) server.login(smtp_user, smtp_password) from_email = current_app.config["SMTP_FROM"] task_url = url_for("task.get_task", task_id=task_id, _external=True) job_url = url_for( "task.get_job", task_id=task_id, job_id=job_id, _external=True ) job_data = dumps( execution.to_dict(include_log=True, include_error=True), indent=4, sort_keys=True, ) body = ( """ Automatic message. Please do not reply to this! Job Details: %s """ % job_data ) subj = "[Fastlane] %s" % subject msg = MIMEMultipart("alternative") msg["Subject"] = subj msg["From"] = from_email msg["To"] = to_email part1 = MIMEText(body, "plain") html_body = """

Job Details:

%s
---

[View Task Details] | [View Job Details]

---

Automatic message. Please do not reply to this!

""" % ( job_data, task_url, job_url, ) part2 = MIMEText(html_body, "html") msg.attach(part1) msg.attach(part2) logger.info("Sending email...") server.sendmail(from_email, to_email, msg.as_string()) server.quit() logger.info("Email sent successfully.") except Exception as exc: logger.error( "Sending e-mail failed with exception!", error=traceback.format_exc() ) raise exc return True def send_webhook( task_id, job_id, execution_id, method, url, headers, retries, retry_count ): app = current_app job = Job.get_by_id(task_id, job_id) logger = app.logger.bind( operation="send_webhook", task_id=task_id, job_id=job_id, execution_id=execution_id, method=method, url=url, headers=headers, retries=retries, retry_count=retry_count, ) if job is None: logger.error("Failed to retrieve task or job.") return False execution = job.get_execution_by_id(execution_id) logger.info("Execution loaded successfully") data = execution.to_dict(include_log=True, include_error=True) data = loads(dumps(data)) if "webhookDispatch" in data["metadata"]: del data["metadata"]["webhookDispatch"] data["metadata"]["custom"] = job.metadata.get("custom", {}) data = dumps(data) try: dispatcher = WebhooksDispatcher() response = dispatcher.dispatch(method, url, data, headers) execution.metadata.setdefault("webhookDispatch", []) execution.metadata["webhookDispatch"].append( { "timestamp": datetime.utcnow().isoformat(), "url": url, "statusCode": response.status_code, "body": response.body, "headers": response.headers, } ) job.save() logger.info("Webhook dispatched successfully.") except WebhooksDispatchError as err: error = traceback.format_exc() execution.metadata.setdefault("webhookDispatch", []) execution.metadata["webhookDispatch"].append( { "timestamp": datetime.utcnow().isoformat(), "url": url, "statusCode": err.status_code, "body": err.body, "headers": err.headers, "error": error, } ) job.save() logger.error("Failed to dispatch webhook.", err=error) if retry_count < retries: logger.debug("Retrying...") args = [ task_id, job_id, execution_id, method, url, headers, retries, retry_count + 1, ] scheduler = Scheduler("webhooks", connection=current_app.redis) factor = app.config["WEBHOOKS_EXPONENTIAL_BACKOFF_FACTOR"] min_backoff = app.config["WEBHOOKS_EXPONENTIAL_BACKOFF_MIN_MS"] / 1000.0 delta = timedelta(seconds=math.pow(factor, retry_count) * min_backoff) scheduler.enqueue_in(delta, send_webhook, *args) logger.info("Webhook dispatch retry scheduled.", date=delta) return True PK!W}GGfastlane/worker/scheduler.py# 3rd Party from rq_scheduler import Scheduler class QueueScheduler: def __init__(self, queue_name, app): self.app = app self.logger = self.app.logger.bind(queue_name=queue_name) self.scheduler = Scheduler(queue_name=queue_name, connection=app.redis) def move_jobs(self): if self.scheduler.acquire_lock(): try: jobs = self.scheduler.get_jobs() self.logger.debug( "Lock acquired. Enqueuing scheduled jobs...", jobs=jobs ) self.scheduler.enqueue_jobs() finally: self.scheduler.remove_lock() else: self.logger.debug( "Lock could not be acquired. Enqueuing scheduled " "jobs skipped. Trying again next cycle." ) PK!Lfastlane/worker/webhooks.py# 3rd Party from requests import Request, Session class WebhooksDispatchError(RuntimeError): def __init__(self, status_code, method, url, body, headers, error=None): super(WebhooksDispatchError, self).__init__("Webhook dispatch error") self.status_code = status_code self.method = method self.url = url self.body = body self.headers = headers self.error = error def __str__(self): if self.error is None: return ( f"The {self.method.upper()} request to {self.url} with body" f' of "{self.body[:10]}..." failed with status code of {self.status_code}.' ) error = "{}: {}".format(type(self.error).__name__, self.error) return ( f"The {self.method.upper()} request to {self.url} with body " f'of "{self.body[:10]}..." failed with exception of {error}.' ) def __repr__(self): return str(self) class Response: def __init__(self, status_code, body, headers): self.status_code = status_code self.body = body self.headers = headers class WebhooksDispatcher: def dispatch(self, method, url, body, headers, timeout=1): try: session = Session() req = Request(method, url, data=body, headers=headers) prepped = session.prepare_request(req) prepped.body = body resp = session.send(prepped, timeout=timeout, verify=False) if resp.status_code > 399: raise WebhooksDispatchError( resp.status_code, method, url, body, headers ) return Response(resp.status_code, resp.text, resp.headers) except Exception as err: if isinstance(err, WebhooksDispatchError): raise err raise WebhooksDispatchError(500, method, url, body, headers, error=err) PK!H+l.C*fastlane-0.5.12.dist-info/entry_points.txtN+I/N.,()JK,.IK1s2r3rrPK!DE"'33!fastlane-0.5.12.dist-info/LICENSEMIT License Copyright (c) 2018 Bernardo Heynemann Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HnHTUfastlane-0.5.12.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HI]*"fastlane-0.5.12.dist-info/METADATAMo0 :v@$r>벢Eb3[J$})YU>zD|IF Ze$ڊ2Yf,уkaWHK1P_UBG0d',T%`e,5Г(mt䚃6 F ShrZ(5Q#.\=4`]( ޤl>YOY}r/]Q^ݭPCM$xw7zJd&8{o'}HpO)?!d~Ĝ-&E(rْ:f؏;6{a'b$P(p,_-?;d *< B[s|>3Řs[{Ӓ0NsX1 W][tli,.i-o-7.Yy%\ժ߮t^?Vy$г~;s_PK!Hz4 fastlane-0.5.12.dist-info/RECORD}ǒH}? Tb! !VxGw:qW 8yOf47ae[ATNGPZy-WvG݉#d^RɓSWI};#ϜWd`` E6]ԬwZ.}¾?@ru Fkmݨǝgu4 h jKz1@ϐ) ӹ+6ݑw05@woKe. +|m5=YGkXe"a')tJvjG k }aSz(TB ']- ʽ;HcNJ RԀN.#ӈN< v`VZ܍Ǚb jl%{38Fg3C9k_~\ڣsQi rR;5:BM̆bk7xp8/R`hZͼ=N${٘Hz`OBں6+QDIUd9bl}RXpMƚqhM `00._qޜ>&3|(o'> S_t᱄kNzKoC&?D^KC|VNֶw [cUv(Dhj]^7s3zFpܑGCwOM2OG;N5)-RꢖE@uW̏@腎;q'4#0<<=I7%#R7nPz0~mD30yrͺAߕ<}cSsw=GIDBRx0H|x,jq^Ĉq^GN;f޴i(Gq{ iGdq,t!il/b[<& aڍO̙7Fw<˯;סxW^u\DFPɱy PK! qfastlane/__init__.pyPK!Hfastlane/api/__init__.pyPK!8fII~fastlane/api/app.pyPK!v j fastlane/api/enqueue.pyPK!},9<fastlane/api/execution.pyPK!-D}}Yfastlane/api/gzipped.pyPK!}jYY^fastlane/api/healthcheck.pyPK!Ժ[[8cfastlane/api/helpers.pyPK!&Ghdfastlane/api/metrics.pyPK!pajfastlane/api/rqb.pyPK!:@pfastlane/api/status.pyPK!j xfastlane/api/stream.pyPK!O=΂fastlane/api/task.pyPK!++fastlane/cli/__init__.pyPK!ytfastlane/cli/api.pyPK!W.aܮ 8fastlane/cli/core.pyPK!"okfastlane/cli/prune.pyPK!J fastlane/cli/worker.pyPK!=3NNfastlane/config/__init__.pyPK!֠cuu|fastlane/config/local.confPK!)fastlane/errors/__init__.pyPK!t22fastlane/errors/sentry.pyPK!,1 rcfastlane/helpers.pyPK!|ww4fastlane/models/__init__.pyPK!Vߗfastlane/models/job.pyPK!X8#g fastlane/models/task.pyPK!-*fastlane/templates/stream.htmlPK!$7dfastlane/utils.pyPK!Jߕwfastlane/worker/__init__.pyPK!EZڻaa"?fastlane/worker/docker_executor.pyPK! `bfastlane/worker/errors.pyPK!R\\Ydfastlane/worker/job.pyPK!W}GGfastlane/worker/scheduler.pyPK!Lfastlane/worker/webhooks.pyPK!H+l.C*lfastlane-0.5.12.dist-info/entry_points.txtPK!DE"'33!fastlane-0.5.12.dist-info/LICENSEPK!HnHTUTfastlane-0.5.12.dist-info/WHEELPK!HI]*"fastlane-0.5.12.dist-info/METADATAPK!Hz4 fastlane-0.5.12.dist-info/RECORDPK'' A