PK! qfastlane/__init__.py__version__ = '0.1.0' PK!fastlane/api/__init__.pyPK!2lsfastlane/api/app.py# Standard Library import logging import sys import re from json import loads # 3rd Party import redis_sentinel_url import structlog from flask import Flask from flask_cors import CORS from flask_sockets import Sockets from gevent import pywsgi from geventwebsocket.handler import WebSocketHandler from structlog.processors import ( JSONRenderer, StackInfoRenderer, TimeStamper, format_exc_info, ) from structlog.stdlib import add_log_level, add_logger_name, filter_by_level # Fastlane import fastlane.api.gzipped as gzipped import fastlane.api.metrics as metrics from fastlane.api.enqueue import bp as enqueue from fastlane.api.execution import bp as execution_api from fastlane.api.healthcheck import bp as healthcheck from fastlane.api.routes import bp as routes_api from fastlane.api.status import bp as status from fastlane.api.stream import bp as stream from fastlane.api.task import bp as task_api from fastlane.models import db from fastlane.models.categories import QueueNames from fastlane.queue import Queue, QueueGroup from flask_basicauth import BasicAuth class Application: def __init__(self, config, log_level, testing=False): self.config = config self.logger = None self.log_level = log_level self.create_app(testing) def create_app(self, testing): self.app = Flask("fastlane") self.testing = testing self.app.testing = testing self.app.error_handlers = [] for key in self.config.items.keys(): self.app.config[key] = self.config[key] self.app.config["ENV"] = self.config.ENV self.app.config["DEBUG"] = self.config.DEBUG self.app.original_config = self.config self.app.log_level = self.log_level self.configure_logging() self.connect_redis() self.configure_queue() # self.connect_queue() self.config_blacklist_words_fn() self.configure_basic_auth() self.connect_db() self.load_executor() self.load_error_handlers() enable_cors = self.app.config["ENABLE_CORS"] if ( isinstance(enable_cors, (str, bytes)) and enable_cors.lower() == "true" ) or (isinstance(enable_cors, (bool)) and enable_cors): origin = self.app.config["CORS_ORIGINS"] self.app.logger.info(f"Configured CORS to allow access from '{origin}'.") CORS(self.app) metrics.init_app(self.app) self.app.register_blueprint(metrics.bp) self.app.register_blueprint(healthcheck) self.app.register_blueprint(enqueue) self.app.register_blueprint(task_api) self.app.register_blueprint(execution_api) self.app.register_blueprint(status) self.app.register_blueprint(routes_api) self.app.register_blueprint(gzipped.bp) gzipped.init_app(self.app) sockets = Sockets(self.app) sockets.register_blueprint(stream) def configure_basic_auth(self): self.basic_auth = None if ( self.app.config["BASIC_AUTH_USERNAME"] is not None and self.app.config["BASIC_AUTH_PASSWORD"] is not None ): self.basic_auth = BasicAuth(self.app) self.app.config["BASIC_AUTH_FORCE"] = True def configure_logging(self): if self.app.testing: structlog.reset_defaults() disabled = [ "docker.utils.config", "docker.auth", "docker.api.build", "docker.api.swarm", "docker.api.image", "werkzeug", "requests", "urllib3", ] for logger in disabled: log = logging.getLogger(logger) log.setLevel(logging.ERROR) log.disabled = True self.app.logger.disabled = True logging.basicConfig( level=self.log_level, stream=sys.stdout, format="%(message)s" ) chain = [ filter_by_level, add_log_level, add_logger_name, TimeStamper(fmt="iso"), StackInfoRenderer(), format_exc_info, JSONRenderer(indent=1, sort_keys=True), ] logger = logging.getLogger(__name__) if self.testing: chain = [] logger = structlog.ReturnLogger() log = structlog.wrap_logger( logger, processors=chain, context_class=dict, wrapper_class=structlog.stdlib.BoundLogger, # cache_logger_on_first_use=True, ) self.logger = log self.app.logger = self.logger def connect_redis(self): self.logger.debug("Connecting to redis...") redis_url = self.app.config["REDIS_URL"] self.logger.info("Configuring Redis...", redis_url=redis_url) sentinel, client = redis_sentinel_url.connect(redis_url) self.app.sentinel = sentinel self.app.redis = client self.logger.info("Connection to redis successful") def configure_queue(self): self.logger.debug("Configuring queue...") queues = [] for queue_name in [ QueueNames.Job, QueueNames.Monitor, QueueNames.Notify, QueueNames.Webhook, ]: queue = Queue(self.app.logger, self.app.redis, queue_name) setattr(self.app, f"{queue_name}_queue", queue) queues.append(queue) self.app.queue_group = QueueGroup(self.logger, self.app.redis, queues) def config_blacklist_words_fn(self): blacklist_words = map(str.strip, self.app.config["ENV_BLACKLISTED_WORDS"].split(",")) blacklist_pattern = r"(%s)" % "|".join(blacklist_words) re_blacklist = re.compile(blacklist_pattern, re.RegexFlag.IGNORECASE) self.app.blacklist_words_fn = re_blacklist.search def connect_db(self): settings = self.app.config["MONGODB_CONFIG"] if isinstance(settings, (dict,)): self.app.config["MONGODB_SETTINGS"] = settings else: self.app.config["MONGODB_SETTINGS"] = loads( self.app.config["MONGODB_CONFIG"] ) self.logger.info( "Connecting to MongoDB...", mongo=self.app.config["MONGODB_SETTINGS"] ) db.init_app(self.app) self.logger.info( "Connected to MongoDB successfully.", mongo=self.app.config["MONGODB_SETTINGS"], ) def load_executor(self): name = self.config.EXECUTOR parts = name.split(".") executor_module = __import__(".".join(parts), None, None, [parts[-1]], 0) self.app.executor_module = executor_module blueprint = getattr(executor_module, "bp", None) if blueprint is not None: self.app.register_blueprint(blueprint) self.app.executor = self.app.executor_module.Executor(self.app) def load_error_handlers(self): self.app.error_handlers = [] for handler_name in self.app.config["ERROR_HANDLERS"]: parts = handler_name.split(".") obj = __import__(".".join(parts[:-1]), None, None, [parts[-1]], 0) obj = getattr(obj, parts[-1]) self.app.error_handlers.append(obj(self.app)) self.app.report_error = self.report_error def report_error(self, err, metadata=None): for handler in self.app.error_handlers: handler.report(err, metadata) def run(self, host, port): server = pywsgi.WSGIServer( (host, port), self.app, handler_class=WebSocketHandler ) server.serve_forever() def _mock_redis(self, connected): def handle(): self.app.redis.connected = connected return handle PK!Zfastlane/api/enqueue.py# Standard Library from uuid import UUID # 3rd Party from flask import Blueprint, current_app, g, jsonify, make_response, request, url_for # Fastlane from fastlane.helpers import loads from fastlane.models import JobExecution, Task bp = Blueprint("enqueue", __name__) # pylint: disable=invalid-name def get_details(): try: details = request.get_json() except Exception: details = None if details is None and request.get_data(): try: details = loads(request.get_data()) except Exception: details = None return details def get_ip_addr(): if "X-Real-Ip" in request.headers: return request.headers["X-Real-Ip"] if "X-Forwarded-For" in request.headers: addresses = request.headers["X-Forwarded-For"].split(",") if addresses: return addresses[0] return request.remote_addr def get_additional_dns_entries(details): additional_dns_entries = details.get("additionalDNSEntries") if not additional_dns_entries: return [] return list(additional_dns_entries.items()) def create_job(details, task, logger, get_new_job_fn): logger.debug("Creating job...") retries = details.get("retries", 0) expiration = details.get("expiration") additional_dns_entries = get_additional_dns_entries(details) # people to notify when job succeeds, fails or finishes notify = details.get("notify", {"succeeds": [], "fails": [], "finishes": []}) # people to notify when job succeeds, fails or finishes webhooks = details.get("webhooks", {"succeeds": [], "fails": [], "finishes": []}) # additional metadata metadata = details.get("metadata", {}) if not isinstance(metadata, (dict,)): metadata = {} hard_limit = current_app.config["HARD_EXECUTION_TIMEOUT_SECONDS"] timeout = details.get("timeout", hard_limit) timeout = min( timeout, hard_limit ) # ensure jobs can't specify more than hard limit j = get_new_job_fn(task, details.get("image"), details.get("command")) j.metadata["retries"] = retries j.metadata["notify"] = notify j.metadata["webhooks"] = webhooks j.metadata["retry_count"] = 0 j.metadata["expiration"] = expiration j.metadata["timeout"] = timeout j.metadata["envs"] = details.get("envs", {}) j.metadata["additional_dns_entries"] = additional_dns_entries j.request_ip = get_ip_addr() if metadata: j.metadata["custom"] = metadata j.save() logger.debug("Job created successfully...", job_id=str(j.job_id)) return j def enqueue_job(task, job, image, command, start_at, start_in, cron, logger): execution = None queue_job_id = None if any([start_at, start_in, cron]): queue_job_id = job.schedule_job( current_app, dict(startAt=start_at, startIn=start_in, cron=cron) ) return queue_job_id, execution logger.debug("Enqueuing job execution...") execution = job.create_execution(image, command) execution.request_ip = get_ip_addr() execution.status = JobExecution.Status.enqueued queue_job_id = job.enqueue(current_app, execution.execution_id) job.metadata["enqueued_id"] = queue_job_id execution.save() job.save() logger.info("Job execution enqueued successfully.") return queue_job_id, execution def get_task_and_details(task_id): details = get_details() if details is None or details == "": msg = "Failed to enqueue task because JSON body could not be parsed." g.logger.warn(msg) return None, None, None, make_response(msg, 400) image = details.get("image", None) command = details.get("command", None) if image is None or command is None: return ( None, None, None, make_response("image and command must be filled in the request.", 400), ) logger = g.logger.bind(task_id=task_id, image=image, command=command) logger.debug("Creating task...") task = Task.objects(task_id=task_id).modify(task_id=task_id, upsert=True, new=True) logger.info("Task created successfully.") return task, details, logger, None def validate_and_enqueue(details, task, job, logger): image = details.get("image", None) command = details.get("command", None) start_at = details.get("startAt", None) start_in = details.get("startIn", None) cron = details.get("cron", None) if len(list(filter(lambda item: item is not None, (start_at, start_in, cron)))) > 1: return ( None, None, make_response( "Only ONE of 'startAt', 'startIn' and 'cron' should be in the request.", 400, ), ) result, execution = enqueue_job( task, job, image, command, start_at, start_in, cron, logger ) return result, execution, None @bp.route("/tasks//", methods=("POST",), strict_slashes=False) def create_task(task_id): task, details, logger, response = get_task_and_details(task_id) if response is not None: return response job = create_job( details, task, logger, lambda task, image, command: task.create_job(image, command), ) enqueued_id, execution, response = validate_and_enqueue(details, task, job, logger) if response is not None: return response return get_enqueue_response(task, job, execution, enqueued_id) @bp.route("/tasks//jobs//", methods=("PUT",), strict_slashes=False) def create_or_update_task(task_id, job_id): try: job_id = str(UUID(job_id)) except ValueError: return make_response( f"The job_id {job_id} is not a valid UUID4. All job IDs must be UUID4.", 400 ) task, details, logger, response = get_task_and_details(task_id) if response is not None: return response job = create_job( details, task, logger, lambda task, image, command: task.create_or_update_job(job_id, image, command), ) if "enqueued_id" in job.metadata: current_app.jobs_queue.deschedule(job.metadata["enqueued_id"]) job.scheduled = False enqueued_id, execution, response = validate_and_enqueue(details, task, job, logger) if response is not None: return response return get_enqueue_response(task, job, execution, enqueued_id) def get_enqueue_response(task, job, execution, enqueued_id): task_id = str(task.task_id) job_id = str(job.job_id) task_url = task.get_url() job_url = url_for("task.get_job", task_id=task_id, job_id=job_id, _external=True) if execution is None: execution_url = None execution_id = None else: execution_url = url_for( "execution.get_job_execution", task_id=task_id, job_id=job_id, execution_id=str(execution.execution_id), _external=True, ) execution_id = str(execution.execution_id) return jsonify( { "taskId": task_id, "jobId": job_id, "executionId": execution_id, "executionUrl": execution_url, "queueJobId": enqueued_id, "jobUrl": job_url, "taskUrl": task_url, } ) PK!+fastlane/api/execution.py# 3rd Party from flask import ( Blueprint, Response, current_app, g, jsonify, render_template, request, url_for, ) # Fastlane from fastlane.api.helpers import return_error from fastlane.models import Job, JobExecution bp = Blueprint("execution", __name__) # pylint: disable=invalid-name @bp.route("/tasks//jobs//executions//", methods=("GET",)) def get_job_execution(task_id, job_id, execution_id): logger = g.logger.bind( operation="get_job_execution", task_id=task_id, job_id=job_id, execution_id=execution_id, ) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: msg = f"Task ({task_id}) or Job ({job_id}) not found." return return_error(msg, "get_job_execution", status=404, logger=logger) execution = job.get_execution_by_id(execution_id) if execution is None: msg = f"Job Execution ({execution_id}) not found in job ({job_id})." return return_error(msg, "get_job_execution", status=404, logger=logger) logger.debug("Job execution retrieved successfully...") return format_execution_details(job.task, job, execution) def format_execution_details(task, job, execution, shallow=False): task_id = str(task.task_id) if shallow: execution_url = url_for( "execution.get_job_execution", task_id=task_id, job_id=str(job.job_id), execution_id=str(execution.execution_id), _external=True, ) details = {"id": str(execution.execution_id), "url": execution_url} else: details = execution.to_dict(include_log=True, include_error=True) job_url = url_for( "task.get_job", task_id=task_id, job_id=str(job.job_id), _external=True ) task_url = url_for("task.get_task", task_id=task_id, _external=True) return jsonify( { "task": {"id": task_id, "url": task_url}, "job": {"id": str(job.job_id), "url": job_url}, "execution": details, } ) def retrieve_execution_details(task_id, job_id, execution_id=None, get_data_fn=None): if get_data_fn is None: get_data_fn = lambda execution: execution.log # noqa: E731 logger = g.logger.bind(operation="get_response", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: msg = f"Task ({task_id}) or Job ({job_id}) not found." return return_error( msg, "retrieve_execution_details", status=404, logger=logger ) if not job.executions: msg = f"No executions found in job ({job_id})." return return_error( msg, "retrieve_execution_details", status=400, logger=logger ) if execution_id is None: execution = job.get_last_execution() else: execution = job.get_execution_by_id(execution_id) if not execution: msg = "No executions found in job with specified arguments." return return_error( msg, "retrieve_execution_details", status=400, logger=logger ) headers = {"Fastlane-Exit-Code": str(execution.exit_code)} if execution.status in [JobExecution.Status.running, JobExecution.Status.enqueued]: logs = "" else: logs = get_data_fn(execution) return Response(headers=headers, response=logs, status=200) def stdout_func(execution): return execution.log def stderr_func(execution): return execution.error def logs_func(execution): if execution.status not in [ JobExecution.Status.running, JobExecution.Status.enqueued, ]: return f"{execution.log}\n-=-\n{execution.error}" return "" @bp.route( "/tasks//jobs//executions//stdout/", methods=("GET",) ) def get_job_execution_stdout(task_id, job_id, execution_id): return retrieve_execution_details(task_id, job_id, execution_id, stdout_func) @bp.route( "/tasks//jobs//executions//stderr/", methods=("GET",) ) def get_job_execution_stderr(task_id, job_id, execution_id): return retrieve_execution_details(task_id, job_id, execution_id, stderr_func) @bp.route( "/tasks//jobs//executions//logs/", methods=("GET",) ) def get_job_execution_logs(task_id, job_id, execution_id): return retrieve_execution_details(task_id, job_id, execution_id, logs_func) def perform_stop_job_execution(job, execution, logger, stop_schedule=True): if "retries" in job.metadata: logger.info("Cleared any further job retries.") job.metadata["retry_count"] = job.metadata["retries"] + 1 job.save() if stop_schedule and "enqueued_id" in job.metadata: logger.info("Removed job from scheduling.") current_app.jobs_queue.deschedule(job.metadata["enqueued_id"]) job.scheduled = False if execution is not None: if execution.status == JobExecution.Status.running: logger.debug("Stopping current execution...") executor = current_app.executor executor.stop_job(job.task, job, execution) logger.debug("Current execution stopped.") if execution.error is None: execution.error = "" execution.error += "\nUser stopped job execution manually." execution.status = JobExecution.Status.failed job.save() logger.debug("Job stopped.") return True, None @bp.route( "/tasks//jobs//executions//stop/", methods=("POST",), strict_slashes=False, ) def stop_job_execution(task_id, job_id, execution_id): logger = g.logger.bind( operation="stop_job_execution", task_id=task_id, job_id=job_id, execution_id=execution_id, ) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: msg = f"Task ({task_id}) or Job ({job_id}) not found." return return_error(msg, "stop_job_execution", status=404, logger=logger) execution = job.get_execution_by_id(execution_id) if execution is None: msg = f"Job Execution ({execution_id}) not found in Job ({job_id})." return return_error(msg, "stop_job_execution", status=404, logger=logger) _, response = perform_stop_job_execution( job, execution=execution, logger=logger, stop_schedule=False ) if response is not None: return response return format_execution_details(job.task, job, execution, shallow=True) @bp.route("/tasks//jobs//executions//stream/") def stream_job(task_id, job_id, execution_id): if request.url.startswith("https"): protocol = "wss" else: protocol = "ws" url = url_for( "execution.stream_job", task_id=task_id, job_id=job_id, execution_id=execution_id, external=True, ) url = "/".join(url.split("/")[:-2]) ws_url = "%s://%s/%s/ws/" % (protocol, request.host.rstrip("/"), url.lstrip("/")) return render_template( "stream.html", task_id=task_id, job_id=job_id, execution_id=execution_id, ws_url=ws_url, ) PK!-D}}fastlane/api/gzipped.py# Standard Library import gzip from io import BytesIO as IO # 3rd Party from flask import Blueprint, request bp = Blueprint("gzip", __name__) # pylint: disable=invalid-name def init_app(app): def gzip_response(response): if response.headers["Content-Type"] != "application/json": return response accept_encoding = request.headers.get("Accept-Encoding", "") if "gzip" not in accept_encoding.lower(): return response response.direct_passthrough = False if ( response.status_code < 200 or response.status_code >= 300 or "Content-Encoding" in response.headers ): return response gzip_buffer = IO() gzip_file = gzip.GzipFile(mode="wb", fileobj=gzip_buffer) gzip_file.write(response.data) gzip_file.close() response.data = gzip_buffer.getvalue() response.headers["Content-Encoding"] = "gzip" response.headers["Vary"] = "Accept-Encoding" response.headers["Content-Length"] = len(response.data) return response app.after_request(gzip_response) PK!מl--fastlane/api/healthcheck.py# 3rd Party from flask import Blueprint, current_app, jsonify # Fastlane from fastlane.models import db bp = Blueprint("healthcheck", __name__) # pylint: disable=invalid-name @bp.route("/healthcheck", methods=("GET",)) def healthcheck(): return do_healthcheck() @bp.route("/", methods=("GET",)) def root_healthcheck(): return do_healthcheck() def do_healthcheck(): status = {"redis": True, "mongo": True, "errors": []} try: res = current_app.redis.ping() if not res: raise RuntimeError(f"ping returned {res}") except Exception as err: status["errors"].append( { "source": "redis", "message": f"Connection to redis failed (error: {str(err)}).", } ) status["redis"] = False try: database = current_app.config["MONGODB_SETTINGS"]["db"] conn = getattr(db.connection, database) res = tuple(conn.jobs.find().limit(1)) if not isinstance(res, (tuple,)): raise RuntimeError(f"Connection to mongoDB failed ({res}).") except Exception as err: status["errors"].append({"source": "mongo", "message": str(err)}) status["mongo"] = False code = 200 if status["errors"]: code = 500 return jsonify(status), code PK!Ժ[[fastlane/api/helpers.py# 3rd Party from flask import current_app, make_response # Fastlane from fastlane.helpers import dumps def return_error(msg, operation, status=500, logger=None): if logger is None: logger = current_app.bind(operation=operation) logger.error(msg) return make_response(dumps({"error": msg, "operation": operation}), status) PK!p\\fastlane/api/metrics.py# Standard Library from datetime import datetime from uuid import uuid4 # 3rd Party from flask import Blueprint, current_app, g, request bp = Blueprint("metrics", __name__) # pylint: disable=invalid-name def init_app(app): def start_timer(): request_id = request.headers.get("X-Request-ID", str(uuid4())) g.logger = current_app.logger.bind(request_id=request_id) g.request_id = request_id g.start = datetime.now() app.before_request(start_timer) def log_request(response): if request.path == "/favicon.ico": return response now = datetime.now() if hasattr(g, "start"): duration = int(round((now - g.start).microseconds / 1000, 2)) else: duration = -1 user_ip = request.headers.get("X-Forwarded-For", request.remote_addr) host = request.host.split(":", 1)[0] log_params = { "method": request.method, "path": request.path, "status": response.status_code, "duration": duration, "ip": user_ip, "host": host, } request_id = getattr(g, "request_id", None) if request_id: log_params["request_id"] = request_id if response.status_code < 400: current_app.logger.info("Request succeeded", **log_params) elif response.status_code < 500: current_app.logger.info("Bad Request", **log_params) else: current_app.logger.error("Internal Server Error", **log_params) return response app.after_request(log_request) PK!4{  fastlane/api/routes.py from flask import Blueprint, jsonify, current_app, abort bp = Blueprint('routes', __name__, url_prefix="/routes") # pylint: disable=invalid-name @bp.route('/', methods=['GET']) def routes(): # pragma: no cover """Print available functions.""" if not current_app.debug: abort(404) func_list = [] for rule in current_app.url_map.iter_rules(): endpoint = rule.rule methods = ", ".join(list(rule.methods)) doc = current_app.view_functions[rule.endpoint].__doc__ route = { "endpoint": endpoint, "methods": methods } if doc: route["doc"] = doc func_list.append(route) func_list = sorted(func_list, key=lambda k: k['endpoint']) return jsonify(func_list) PK!|ُւ fastlane/api/status.py# Standard Library from datetime import datetime # 3rd Party import croniter import pkg_resources from flask import Blueprint, current_app, jsonify, url_for # Fastlane from fastlane.models import Job, Task from fastlane.models.categories import QueueNames from fastlane.queue import Queue from fastlane.utils import from_unix bp = Blueprint("status", __name__, url_prefix="/status") # pylint: disable=invalid-name @bp.route("/", methods=("GET",)) def status(): executor = current_app.executor version = pkg_resources.get_distribution("fastlane").version metadata = {"hosts": [], "containers": {"running": []}} containers = executor.get_running_containers() for host, port, container_id in containers["running"]: metadata["containers"]["running"].append( {"host": host, "port": port, "id": container_id} ) metadata["hosts"] = [] + containers["available"] + containers["unavailable"] metadata["queues"] = {} for queue_name in [ QueueNames.Job, QueueNames.Monitor, QueueNames.Webhook, QueueNames.Notify, ]: queue = getattr(current_app, f"{queue_name}_queue") jobs_queue_size = current_app.redis.llen(queue.queue_name) metadata["queues"][queue_name] = {"length": jobs_queue_size} next_scheduled = current_app.redis.zrange( Queue.SCHEDULED_QUEUE_NAME, 0, 0, withscores=True ) if not next_scheduled: next_timestamp = None next_human = None else: next_timestamp = next_scheduled[0][1] next_human = from_unix(next_timestamp).isoformat() metadata["queues"]["scheduled"] = { "length": current_app.redis.zcard(Queue.SCHEDULED_QUEUE_NAME), "nextTimeStamp": next_timestamp, "nextHumanReadableDate": next_human, } metadata["tasks"] = {"count": Task.objects.count()} metadata["jobs"] = {"count": Job.objects.count()} metadata["jobs"]["scheduled"] = [] scheduled_jobs = Job.objects(scheduled=True).all() metadata["fastlane"] = { "version": version, "executor": current_app.config["EXECUTOR"], } for job in scheduled_jobs: j = job.to_dict(include_executions=False) itr = croniter.croniter(job.metadata["cron"], datetime.utcnow()) j["nextScheduledAt"] = itr.get_next(datetime).isoformat() task_id = job.task.task_id job_url = url_for( "task.get_job", task_id=task_id, job_id=str(job.id), _external=True ) j["url"] = job_url stop_job_url = url_for( "task.stop_job", task_id=task_id, job_id=str(job.id), _external=True ) j["stopUrl"] = stop_job_url task_url = url_for("task.get_task", task_id=task_id, _external=True) del j["taskId"] j["task"] = {"id": task_id, "url": task_url} metadata["jobs"]["scheduled"].append(j) return jsonify(metadata), 200 PK!J J fastlane/api/stream.py# Standard Library # import time # from multiprocessing import Process # 3rd Party import geventwebsocket from flask import Blueprint, current_app # Fastlane from fastlane.models import Job, JobExecution from fastlane.worker.errors import ContainerUnavailableError bp = Blueprint("stream", __name__) # pylint: disable=invalid-name def stream_log(executor, task_id, job, ex, websocket): try: if ( not websocket.closed and ex.status == JobExecution.Status.done or ex.status == JobExecution.Status.failed ): websocket.send("EXIT CODE: %s\n" % ex.exit_code) websocket.send(ex.log) websocket.send("\n-=-\n") websocket.send(ex.error) websocket.close(message="wsdone") return if not websocket.closed and ex.status != JobExecution.Status.running: websocket.close(message="wsretry") return try: for log in executor.get_streaming_logs(task_id, job, ex): if websocket.closed: return websocket.send(log) except geventwebsocket.exceptions.WebSocketError: return except BrokenPipeError: websocket.close(message="wsretry") return except ContainerUnavailableError as err: current_app.report_error( err, metadata=dict( operation="Job Execution Stream", task_id=task_id, job_id=job.job_id, execution_id=ex.execution_id, ), ) websocket.close(message="wsretry") return websocket.close(message="wsdone") def process_job_execution_logs(websocket, task_id, job_id, execution_id, logger): job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: logger.error(f"Job ({job_id}) not found in task ({task_id}).") websocket.close() return if execution_id is None: execution = job.get_last_execution() else: execution = job.get_execution_by_id(execution_id) if execution is None: logger.error("No executions found in job ({execution_id}).") websocket.close(message="wsretry") return executor = current_app.executor stream_log(executor, task_id, job, execution, websocket) # process = Process( # target=stream_log, args=(executor, task_id, job, execution, websocket) # ) # process.start() # while not websocket.closed: # time.sleep(10) # process.terminate() @bp.route("/tasks//jobs//ws/") def websocket_listen(websocket, task_id, job_id): logger = current_app.logger.bind(task_id=task_id, job_id=job_id) process_job_execution_logs(websocket, task_id, job_id, None, logger) @bp.route("/tasks//jobs//executions//ws/") def websocket_execution_listen(websocket, task_id, job_id, execution_id): logger = current_app.logger.bind(task_id=task_id, job_id=job_id) process_job_execution_logs(websocket, task_id, job_id, execution_id, logger) PK!JU""fastlane/api/task.py# 3rd Party from flask import Blueprint, current_app, g, jsonify, render_template, request, url_for # Fastlane from fastlane.api.execution import ( logs_func, perform_stop_job_execution, retrieve_execution_details, stderr_func, stdout_func, ) from fastlane.api.helpers import return_error from fastlane.models import Job, JobExecution, Task from fastlane.models.categories import Categories bp = Blueprint("task", __name__) # pylint: disable=invalid-name def get_current_page(logger): try: page = int(request.args.get("page", 1)) if page <= 0: raise ValueError() return page, None except ValueError: msg = "Tasks pagination page param should be a positive integer." return None, return_error(msg, "get_tasks", status=400, logger=logger) @bp.route("/tasks/", methods=("GET",)) def get_tasks(): logger = g.logger.bind(operation="get_tasks") page, error = get_current_page(logger) if error: return error per_page = current_app.config["PAGINATION_PER_PAGE"] logger.debug(f"Getting tasks page={page} per_page={per_page}...") paginator = Task.get_tasks(page=page, per_page=per_page) logger.debug("Tasks retrieved successfully...") next_url = None if paginator.has_next: next_url = url_for("task.get_tasks", page=paginator.next_num, _external=True) prev_url = None if paginator.has_prev: prev_url = url_for("task.get_tasks", page=paginator.prev_num, _external=True) data = { "items": [], "total": paginator.total, "page": paginator.page, "pages": paginator.pages, "perPage": paginator.per_page, "hasNext": paginator.has_next, "hasPrev": paginator.has_prev, "nextUrl": next_url, "prevUrl": prev_url, } for task in paginator.items: data["items"].append(task.to_dict()) return jsonify(data) @bp.route("/search/", methods=("GET",)) def search_tasks(): logger = g.logger.bind(operation="search_tasks") query = request.args.get("query") if not query: msg = "The query param is required." return return_error(msg, "search_tasks", status=400, logger=logger) page, error = get_current_page(logger) if error: return error per_page = current_app.config["PAGINATION_PER_PAGE"] logger.debug(f"Getting tasks page={page} per_page={per_page}...") paginator = Task.search_tasks(query=query, page=page, per_page=per_page) logger.debug("Tasks retrieved successfully...") next_url = None if paginator.has_next: next_url = url_for( "task.search_tasks", query=query, page=paginator.next_num, _external=True ) prev_url = None if paginator.has_prev: prev_url = url_for( "task.search_tasks", query=query, page=paginator.prev_num, _external=True ) data = { "items": [], "total": paginator.total, "page": paginator.page, "pages": paginator.pages, "perPage": paginator.per_page, "hasNext": paginator.has_next, "hasPrev": paginator.has_prev, "nextUrl": next_url, "prevUrl": prev_url, } for task in paginator.items: data["items"].append(task.to_dict()) return jsonify(data) @bp.route("/tasks//", methods=("GET",)) def get_task(task_id): logger = g.logger.bind(operation="get_task", task_id=task_id) logger.debug("Getting job...") task = Task.get_by_task_id(task_id) if task is None: return return_error("Task not found.", "get_task", status=404, logger=logger) logger.debug("Task retrieved successfully...") task_jobs = Job.objects(id__in=[str(job_id.id) for job_id in task.jobs]) jobs = [] for job in task_jobs: url = url_for( "task.get_job", task_id=task_id, job_id=str(job.job_id), _external=True ) job = { "id": str(job.job_id), "createdAt": job.created_at.isoformat(), "url": url } jobs.append(job) return jsonify({"taskId": task_id, "jobs": jobs}) @bp.route("/tasks//jobs//", methods=("GET",)) def get_job(task_id, job_id): logger = g.logger.bind(operation="get_job", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: return return_error( "Job not found in task.", "get_task", status=404, logger=logger ) logger.debug("Job retrieved successfully...") details = job.to_dict( include_log=True, include_error=True, blacklist_fn=current_app.blacklist_words_fn, ) for execution in details["executions"]: exec_url = url_for( "execution.get_job_execution", task_id=task_id, job_id=job_id, execution_id=execution["executionId"], _external=True, ) execution["url"] = exec_url task_url = url_for("task.get_task", task_id=task_id, _external=True) return jsonify({"task": {"id": task_id, "url": task_url}, "job": details}) @bp.route( "/tasks//jobs//stop/", methods=("POST",), strict_slashes=False ) def stop_job(task_id, job_id): logger = g.logger.bind(operation="stop_job", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: return return_error( "Job not found in task.", "stop_job", status=404, logger=logger ) execution = job.get_last_execution() _, response = perform_stop_job_execution( job, execution=execution, logger=logger, stop_schedule=True ) if response is not None: return response return get_job_summary(task_id, job_id) @bp.route( "/tasks//jobs//retry/", methods=("POST",), strict_slashes=False ) def retry_job(task_id, job_id): logger = g.logger.bind(operation="retry", task_id=task_id, job_id=job_id) logger.debug("Getting job...") job = Job.get_by_id(task_id=task_id, job_id=job_id) if job is None: return return_error( "Job not found in task.", "retry_job", status=404, logger=logger ) execution = job.get_last_execution() if execution is None: return return_error( "No execution yet to retry.", "retry_job", status=400, logger=logger ) if "enqueued_id" in job.metadata and current_app.jobs_queue.is_scheduled( job.metadata["enqueued_id"] ): msg = "Can't retry a scheduled job." return return_error(msg, "retry_job", status=400, logger=logger) if execution.status == JobExecution.Status.running: logger.debug("Stopping current execution...") executor = current_app.executor executor.stop_job(job.task, job, execution) logger.debug("Current execution stopped.") execution.status = JobExecution.Status.failed job.save() new_exec = job.create_execution(execution.image, execution.command) new_exec.status = JobExecution.Status.enqueued logger.debug("Enqueuing job execution...") args = [task_id, job_id, new_exec.execution_id, execution.image, execution.command] result = current_app.jobs_queue.enqueue(Categories.Job, *args) job.metadata["enqueued_id"] = result.id job.save() logger.info("Job execution enqueued successfully.") return get_job_summary(task_id, job_id) def get_job_summary(task_id, job_id): job_url = url_for("task.get_job", task_id=task_id, job_id=job_id, _external=True) task_url = url_for("task.get_task", task_id=task_id, _external=True) return jsonify( {"taskId": task_id, "jobId": job_id, "jobUrl": job_url, "taskUrl": task_url} ) @bp.route("/tasks//jobs//stream/") def stream_job(task_id, job_id): if request.url.startswith("https"): protocol = "wss" else: protocol = "ws" url = url_for("task.stream_job", task_id=task_id, job_id=job_id, external=True) url = "/".join(url.split("/")[:-2]) ws_url = "%s://%s/%s/ws/" % (protocol, request.host.rstrip("/"), url.lstrip("/")) return render_template("stream.html", task_id=task_id, job_id=job_id, ws_url=ws_url) @bp.route("/tasks//jobs//stdout/") def stdout(task_id, job_id): return retrieve_execution_details(task_id, job_id, get_data_fn=stdout_func) @bp.route("/tasks//jobs//stderr/") def stderr(task_id, job_id): return retrieve_execution_details(task_id, job_id, get_data_fn=stderr_func) @bp.route("/tasks//jobs//logs/") def logs(task_id, job_id): return retrieve_execution_details(task_id, job_id, get_data_fn=logs_func) PK!++fastlane/cli/__init__.pyfrom fastlane.cli.core import main # NOQA PK!ytfastlane/cli/api.py""" isort:skip_file """ # 3rd Party from gevent import monkey # Must be before other imports monkey.patch_all() # isort:skip # Fastlane from fastlane.api.app import Application # NOQA pylint: disable=wrong-import-position from fastlane.config import Config # NOQA pylint: disable=wrong-import-position class APIHandler: def __init__(self, click, host, port, config, log_level): self.config_path = config self.config = None self.click = click self.host = host self.port = port self.log_level = log_level self.load_config() def load_config(self): # self.click.echo(f'Loading configuration from {self.config_path}...') self.config = Config.load(self.config_path) def __call__(self): app = Application(self.config, self.log_level) app.logger.info( "fastlane is runnning.", host=self.host, port=self.port, environment=self.config.ENV, ) app.run(self.host, self.port) PK!W.aܮ fastlane/cli/core.py# Standard Library import sys from os.path import abspath, dirname, join # 3rd Party import click import pkg_resources # Fastlane from fastlane.cli.api import APIHandler from fastlane.cli.prune import PruneHandler from fastlane.cli.worker import WorkerHandler ROOT_CONFIG = abspath(join(dirname(__file__), "../config/local.conf")) LEVELS = {0: "ERROR", 1: "WARN", 2: "INFO", 3: "DEBUG"} @click.group() def main(): pass @click.command() def version(): """Returns fastlane version.""" click.echo(pkg_resources.get_distribution("fastlane").version) @click.command() @click.option("-b", "--host", default="0.0.0.0") @click.option("-p", "--port", default=10000) @click.option("-v", "--verbose", default=0, count=True) @click.option( "-c", "--config-file", default=ROOT_CONFIG, help="configuration file to use with fastlane", ) def api(host, port, verbose, config_file): """Runs fastlane API in the specified host and port.""" log_level = LEVELS.get(verbose, "ERROR") handler = APIHandler(click, host, port, config_file, log_level) handler() @click.command() @click.option("-i", "--worker-id", default=None, help="ID for this worker") @click.option( "-j", "--no-jobs", default=False, help="""Process the 'jobs' queue?""", is_flag=True ) @click.option( "-m", "--no-monitor", default=False, help="""Process the 'monitor' queue?""", is_flag=True, ) @click.option( "-n", "--no-notify", default=False, help="""Process the 'notify' queue?""", is_flag=True, ) @click.option( "-w", "--no-webhooks", default=False, help="""Process the 'webhooks' queue?""", is_flag=True, ) @click.option("-v", "--verbose", default=0, count=True) @click.option( "-c", "--config-file", default=ROOT_CONFIG, help="configuration file to use with fastlane", ) def worker( worker_id, no_jobs, no_monitor, no_notify, no_webhooks, verbose, config_file ): """Runs an fastlane Worker with the specified queue name and starts processing.""" jobs = not no_jobs monitor = not no_monitor notify = not no_notify webhooks = not no_webhooks if not jobs and not monitor and not notify and not webhooks: click.echo( "Worker must monitor at least one queue: jobs, monitor, notify or webhooks" ) sys.exit(1) log_level = LEVELS.get(verbose, "ERROR") handler = WorkerHandler( click, worker_id, jobs, monitor, notify, webhooks, config_file, log_level ) handler() @click.command() def config(): """Prints the default config for fastlane""" from fastlane.config import Config print(Config.get_config_text()) @click.command() @click.option("-v", "--verbose", default=0, count=True) @click.option( "-c", "--config-file", default=ROOT_CONFIG, help="configuration file to use with fastlane", ) def prune(verbose, config_file): """Removes all containers that have already been processed by fastlane.""" log_level = LEVELS.get(verbose, "ERROR") handler = PruneHandler(click, config_file, log_level) handler() main.add_command(version) main.add_command(api) main.add_command(worker) main.add_command(config) main.add_command(prune) PK!"okfastlane/cli/prune.py# Fastlane from fastlane.api.app import Application from fastlane.config import Config class PruneHandler: def __init__(self, click, config, log_level): self.config_path = config self.config = None self.click = click self.log_level = log_level self.load_config() def load_config(self): self.config = Config.load(self.config_path) def __call__(self): app = Application(self.config, self.log_level).app app.logger.info(f"Running fastlane prune...") with app.app_context(): removed = app.executor.remove_done() app.logger.info(f"Prune done.", removed=removed) PK!Xq fastlane/cli/worker.py# Standard Library import time import traceback from uuid import uuid4 # Fastlane from fastlane.api.app import Application from fastlane.config import Config from fastlane.models.categories import QueueNames from fastlane.worker.job import enqueue_missing_monitor_jobs class WorkerHandler: def __init__( self, click, worker_id, jobs, monitor, notify, webhooks, config, log_level, app=None, ): if isinstance(config, (str, bytes)): self.config_path = config self.config = None else: self.config_path = None self.config = config self.click = click self.worker_id = worker_id self.log_level = log_level self.queues = set() if jobs: self.queues.add(QueueNames.Job) if monitor: self.queues.add(QueueNames.Monitor) if notify: self.queues.add(QueueNames.Notify) if webhooks: self.queues.add(QueueNames.Webhook) self.load_config() self.app = app self.queue_group = None self.last_verified_missing_jobs = time.time() if self.app is not None: self.queue_group = self.app.app.queue_group def load_config(self): # self.click.echo(f'Loading configuration from {self.config_path}...') if self.config is None: self.config = Config.load(self.config_path) def loop_once(self): self.queue_group.move_jobs() if time.time() - self.last_verified_missing_jobs > 10: enqueue_missing_monitor_jobs(self.app.app) self.last_verified_missing_jobs = time.time() item = self.queue_group.dequeue(queues=self.queues, timeout=5) if item is None: return None return item.run() def __call__(self): # self.click.echo( # f'Running fastlane worker processing queues {",".join(self.queues)}.') self.app = app = Application(self.config, self.log_level) self.queue_group = self.app.app.queue_group app.logger.info( f'Running fastlane worker processing queues {",".join(self.queues)}.' ) interval = app.config["WORKER_SLEEP_TIME_MS"] / 1000.0 self.last_verified_missing_jobs = time.time() with app.app.app_context(): if self.worker_id is None: app.logger.warn( "The worker id was not set for this worker and a random one will be used." ) self.worker_id = str(uuid4()) app.logger = app.logger.bind(worker_id=self.worker_id, queues=self.queues) app.app.logger = app.logger while True: try: self.loop_once() time.sleep(interval) except Exception: error = traceback.format_exc() app.logger.error("Failed to process job.", error=error) PK!=fastlane/config/__init__.py# 3rd Party from derpconf.config import Config Config.allow_environment_variables() Config.define( "DEBUG", False, "This configuration details if fastlane is running in debug mode.", "General", ) Config.define( "ENV", "production", "This configuration details the environment fastlane is running in.", "General", ) Config.define( "SECRET_KEY", "OTNDN0VCRDAtRTMyMS00NUM0LUFFQUYtNEI4QUE4RkFCRjUzCg==", """This configuration specifies the `SECRET_KEY` for Fastlane API. This should be unique per environment.""", "General", ) Config.define( "ENABLE_CORS", True, "This configuration enabled CORS headers in the API. CORS is enabled " "on '*' by default in Fastlane. It's the administrator's job to secure" " it behind some gateway.", "General", ) Config.define( "CORS_ORIGINS", "*", "This configuration enabled CORS headers in the API.", "General", ) Config.define( "REDIS_URL", "redis://localhost:10100/0", """Redis connection string in the form of 'redis://' protocol. If `redis+sentinel` is used as protocol, instead, fastlane will connect to sentinel to get redis host.""", "Redis", ) Config.define( "WORKER_SLEEP_TIME_MS", 1000, "Number of milliseconds that fastlane must sleep before getting the next job", "Worker", ) Config.define( "HARD_EXECUTION_TIMEOUT_SECONDS", 30 * 60, "Number of seconds that fastlane must wait before killing an execution", "Worker", ) Config.define( "EXPONENTIAL_BACKOFF_MIN_MS", 1000, "Number of milliseconds that fastlane must wait before the first retry in each job", "Worker", ) Config.define( "EXPONENTIAL_BACKOFF_FACTOR", 2, "Factor to multiply backoff by in each retry", "Worker", ) Config.define( "WEBHOOKS_EXPONENTIAL_BACKOFF_MIN_MS", 5000, "Number of milliseconds that fastlane must wait before " "the first retry in each webhook dispatch", "Worker", ) Config.define( "WEBHOOKS_EXPONENTIAL_BACKOFF_FACTOR", 2, "Factor to multiply backoff by in each retry for webhook dispatch", "Worker", ) Config.define( "EXECUTOR", "fastlane.worker.docker", "Module full name where to find the Executor class", "Worker", ) Config.define( "DOCKER_HOSTS", [{"match": "", "hosts": ["localhost:2375"], "maxRunning": 2}], """Docker cluster definitions. The `match` portion of each host definition specifies a regular expression that must be match in order for the job to execute in one of the docker hosts in the `hosts` key. The `maxRunning` portion, specifies the maximum number of concurrent jobs in this cluster. If the regex is empty ("") it means that it matches anything. The empty regex cluster definition should be the last one, otherwise it will work all jobs. """, "Docker Executor", ) Config.define( "DOCKER_CIRCUIT_BREAKER_MAX_FAILS", 5, "Maximum number of failures to docker host to stop sending new jobs", "Docker Executor", ) Config.define( "DOCKER_CIRCUIT_BREAKER_RESET_TIMEOUT_SECONDS", 60, "Number of seconds to reopen circuit and start sending new jobs to a docker host", "Docker Executor", ) Config.define( "MONGODB_CONFIG", { "host": "mongodb://localhost:10101/fastlane", "db": "fastlane", "serverSelectionTimeoutMS": 100, "connect": False, }, "MongoDB connection details.", "Models", ) Config.define( "ERROR_HANDLERS", ["fastlane.errors.sentry.SentryErrorHandler"], "List of configured error handlers", "Errors", ) Config.define("SENTRY_DSN", "", "Sentry DSN to send errors to", "Errors") Config.define( "ENV_BLACKLISTED_WORDS", "password,key,secret,client_id", "Words that if present in environment variables are redacted", "API", ) Config.define( "SMTP_USE_SSL", False, "Wheter the SMTP server used to send notifications uses SSL", "Email", ) Config.define( "SMTP_HOST", None, "Host of the SMTP server used to send notifications", "Email" ) Config.define( "SMTP_PORT", None, "Port of the SMTP server used to send notifications", "Email" ) Config.define( "SMTP_USER", None, "User of the SMTP server used to send notifications", "Email" ) Config.define( "SMTP_PASSWORD", None, "Password of the SMTP server used to send notifications", "Email", ) Config.define( "SMTP_FROM", None, "From E-mail of the SMTP server used to send notifications", "Email", ) Config.define( "PAGINATION_PER_PAGE", 10, "Total items per page to be used on api pagination methods", "API", ) Config.define( "BASIC_AUTH_USERNAME", None, "Basic auth username required by fastlane. If set to 'None', no basic auth is enabled.", "Auth", ) Config.define( "BASIC_AUTH_PASSWORD", None, "Basic auth password required by fastlane. If set to 'None', no basic auth is enabled.", "Auth", ) Config.define("BASIC_AUTH_REALM", "", "Basic auth realm.", "Auth") PK!ӝ,vvfastlane/config/local.confDEBUG=True SERVER_NAME='localhost:10000' DOCKER_HOSTS=[{"match": "", "hosts": ["localhost:1234"], "maxRunning": 20}] PK!fastlane/errors/__init__.pyclass ErrorReporter: def __init__(self, app): self.app = app def report(self, err, metadata=None): raise NotImplementedError() PK!t22fastlane/errors/sentry.py# import sentry_sdk # 3rd Party from raven import Client # Fastlane from fastlane.errors import ErrorReporter class SentryErrorHandler(ErrorReporter): def __init__(self, app): super(SentryErrorHandler, self).__init__(app) self.client = None self.send = True if app.config["SENTRY_DSN"] == "": self.send = False else: app.logger.info( "Sentry configured properly.", sentry_dsn=app.config["SENTRY_DSN"] ) self.client = Client(app.config["SENTRY_DSN"], auto_log_stacks=True) def report(self, err, metadata=None): if not self.send: return if metadata is None: metadata = {} exc_info = (err.__class__, err, err.__traceback__) self.client.captureException(exc_info=exc_info, extra=metadata) # with sentry_sdk.configure_scope() as scope: # scope.level = "error" # for key, val in metadata.items(): # scope.set_extra(key, val) # sentry_sdk.capture_exception(err) PK!,1 rfastlane/helpers.pytry: from ujson import loads, dumps # pylint: disable=unused-import except ImportError: from json import loads, dumps # pylint: disable=unused-import PK!'fastlane/models/__init__.py""" isort:skip_file """ # 3rd Party from flask_mongoengine import MongoEngine db = MongoEngine() # isort:skip pylint: disable=invalid-name from fastlane.models.task import Task # NOQA pylint: disable=unused-import,wrong-import-position from fastlane.models.job import Job # NOQA pylint: disable=unused-import,wrong-import-position from fastlane.models.job_execution import JobExecution # NOQA pylint: disable=unused-import,wrong-import-position PK!б zfastlane/models/categories.pyclass Categories: Job = "job" Monitor = "monitor" Notify = "notify" Webhook = "webhook" class QueueNames: Job = "jobs" Monitor = "monitor" Notify = "notify" Webhook = "webhooks" Error = "error" PK!:Q] fastlane/models/job.py# Standard Library import datetime from uuid import uuid4 # 3rd Party import mongoengine.errors from mongoengine import ( BooleanField, DateTimeField, DictField, ListField, ReferenceField, StringField, ) # Fastlane from fastlane.models import db from fastlane.models.categories import Categories from fastlane.utils import words_redacted class Job(db.Document): created_at = DateTimeField(required=True) last_modified_at = DateTimeField(required=True, default=datetime.datetime.utcnow) task_id = StringField(required=True) job_id = StringField(required=True) executions = ListField(ReferenceField("JobExecution")) task = ReferenceField( "Task", required=True, reverse_delete_rule=mongoengine.CASCADE ) image = StringField(required=False) command = StringField(required=False) request_ip = StringField(required=False) metadata = DictField(required=False) scheduled = BooleanField(required=True, default=False) meta = { "ordering": ["-last_modified_at"], "indexes": ["last_modified_at", "job_id", {"fields": ["task_id", "job_id"]}], } def save(self, *args, **kwargs): if self.executions is None: self.executions = [] if not self.created_at: self.created_at = datetime.datetime.utcnow() self.last_modified_at = datetime.datetime.utcnow() return super(Job, self).save(*args, **kwargs) def create_execution(self, image, command): from fastlane.models.job_execution import JobExecution ex_id = str(uuid4()) ex = JobExecution( execution_id=ex_id, image=image, command=command, created_at=datetime.datetime.utcnow(), task=self.task, job=self, ) ex.save() self.executions.append(ex) self.save() return ex def get_metadata(self, blacklist_fn): if "envs" in self.metadata: envs = self.metadata["envs"] self.metadata["envs"] = words_redacted(envs, blacklist_fn) return self.metadata def to_dict( self, include_log=False, include_error=False, include_executions=True, blacklist_fn=None, ): if blacklist_fn is None: meta = self.metadata else: meta = self.get_metadata(blacklist_fn) res = { "createdAt": self.created_at.isoformat(), "image": self.image, "command": self.command, "lastModifiedAt": self.last_modified_at.isoformat(), "taskId": self.task.task_id, "scheduled": self.scheduled, "executionCount": len(self.executions), "requestIPAddress": self.request_ip, "metadata": meta, } if include_executions: executions = [ ex.to_dict(include_log, include_error) for ex in list( reversed(sorted(self.executions, key=lambda ex: ex.created_at)) )[:20] ] res["executions"] = executions return res @classmethod def get_by_id(cls, task_id, job_id): from fastlane.models.task import Task if task_id is None or task_id == "" or job_id is None or job_id == "": raise RuntimeError( "Task ID and Job ID are required and can't be None or empty." ) task = Task.objects(task_id=task_id).first() job = cls.objects(task=task, job_id=job_id).first() return job def get_execution_by_id(self, execution_id): for job_execution in self.executions: if job_execution.execution_id == execution_id: return job_execution return None def get_last_execution(self): if not self.executions: return None return self.executions[-1] @classmethod def get_unfinished_executions(cls, app): from fastlane.models.job_execution import JobExecution query = { "$or": [ {"status": JobExecution.Status.pulling}, {"status": JobExecution.Status.running}, ] } executions = JobExecution.objects(__raw__=query) execs = [] for execution in executions: enqueued_id = execution.job.metadata.get("enqueued_id") if enqueued_id is not None and ( app.jobs_queue.is_scheduled(enqueued_id) or app.jobs_queue.is_enqueued(enqueued_id) ): continue execs.append((execution.job, execution)) return execs @classmethod def get_unscheduled_jobs(cls, app): query = { "$or": [ {"metadata.startIn": {"$exists": True}}, {"metadata.startAt": {"$exists": True}}, {"metadata.cron": {"$exists": True}}, ] } jobs = Job.objects(__raw__=query) unscheduled = [] for job in jobs: if "enqueued_id" not in job.metadata: unscheduled.append(job) continue enqueued_id = job.metadata["enqueued_id"] if not app.jobs_queue.is_scheduled( enqueued_id ) and not app.jobs_queue.is_enqueued(enqueued_id): unscheduled.append(job) return list(unscheduled) def enqueue(self, app, execution_id, image=None, command=None): if image is None: image = self.image if command is None: command = self.command if image is None or command is None: raise RuntimeError("Can't enqueue job with no image or command.") logger = app.logger.bind( operation="enqueue_job", job_id=self.job_id, task_id=self.task.task_id, execution_id=execution_id, image=image, command=command, ) args = [self.task.task_id, str(self.job_id), str(execution_id), image, command] logger.info("Job enqueued.") return app.jobs_queue.enqueue(Categories.Job, *args) def schedule_job(self, app, details): logger = app.logger.bind(operation="schedule_job") if self.image is None or self.command is None: logger.warn("No image or command found in job.") return None start_at = details.get("startAt", None) start_in = details.get("startIn", None) cron = details.get("cron", None) args = [ str(self.task.task_id), str(self.job_id), None, self.image, self.command, ] queue_job_id = None if start_at is not None: logger.debug("Enqueuing job execution in the future...", start_at=start_at) enqueued_id = app.jobs_queue.enqueue_at( int(start_at), Categories.Job, *args ) # future_date = datetime.utcfromtimestamp(int(start_at)) # result = scheduler.enqueue_at(future_date, run_job, *args) self.metadata["enqueued_id"] = enqueued_id queue_job_id = enqueued_id self.save() logger.info("Job execution enqueued successfully.", start_at=start_at) elif start_in is not None: # future_date = datetime.now(tz=timezone.utc) + start_in logger.debug("Enqueuing job execution in the future...", start_in=start_in) enqueued_id = app.jobs_queue.enqueue_in(start_in, Categories.Job, *args) # result = scheduler.enqueue_at(future_date, run_job, *args) self.metadata["enqueued_id"] = enqueued_id queue_job_id = enqueued_id self.save() logger.info("Job execution enqueued successfully.", start_in=start_in) elif cron is not None: logger.debug("Enqueuing job execution using cron...", cron=cron) enqueued_id = app.jobs_queue.enqueue_cron(cron, Categories.Job, *args) self.metadata["enqueued_id"] = enqueued_id queue_job_id = enqueued_id self.metadata["cron"] = cron self.scheduled = True self.save() logger.info("Job execution enqueued successfully.", cron=cron) return queue_job_id PK! fastlane/models/job_execution.py# Standard Library import datetime # 3rd Party import mongoengine.errors from mongoengine import ( DateTimeField, DictField, IntField, ReferenceField, StringField, ) # Fastlane from fastlane.models import db class JobExecution(db.Document): # pylint: disable=no-member class Status: enqueued = "enqueued" pulling = "pulling" running = "running" done = "done" failed = "failed" timedout = "timedout" expired = "expired" stopped = "stopped" created_at = DateTimeField(required=True) last_modified_at = DateTimeField(required=True, default=datetime.datetime.utcnow) started_at = DateTimeField(required=False) finished_at = DateTimeField(required=False) execution_id = StringField(required=True) image = StringField(required=True) command = StringField(required=True) request_ip = StringField(required=False) status = StringField(required=True, default=Status.enqueued) task = ReferenceField( "Task", required=True, reverse_delete_rule=mongoengine.CASCADE ) job = ReferenceField( "Job", required=True, reverse_delete_rule=mongoengine.CASCADE ) log = StringField(required=False) error = StringField(required=False) exit_code = IntField(required=False) metadata = DictField(required=False) def to_dict(self, include_log=False, include_error=False): s_at = self.started_at.isoformat() if self.started_at is not None else None f_at = self.finished_at.isoformat() if self.finished_at is not None else None res = { "executionId": str(self.execution_id), "createdAt": self.created_at.isoformat(), "requestIPAddress": self.request_ip, "startedAt": s_at, "finishedAt": f_at, "image": self.image, "command": self.command, "metadata": self.metadata, "status": self.status, "exitCode": self.exit_code, } if self.finished_at is not None: res["finishedAt"] = self.finished_at.isoformat() if include_log: res["log"] = self.log if include_error: res["error"] = self.error return res def save(self, *args, **kwargs): if not self.created_at: self.created_at = datetime.datetime.utcnow() self.last_modified_at = datetime.datetime.utcnow() return super(JobExecution, self).save(*args, **kwargs) PK!uQ fastlane/models/task.py# Standard Library import datetime from uuid import uuid4 # 3rd Party import mongoengine.errors from flask import url_for from mongoengine import DateTimeField, ListField, ReferenceField, StringField # Fastlane from fastlane.models import db class Task(db.Document): task_id = StringField(required=True) jobs = ListField(ReferenceField("Job")) created_at = DateTimeField(required=True) last_modified_at = DateTimeField(required=True, default=datetime.datetime.utcnow) meta = { "indexes": ["task_id", {"fields": ["$task_id"], "default_language": "english"}] } def _validate(self): errors = {} if self.task_id == "": errors["task_id"] = mongoengine.errors.ValidationError( "Field is required", field_name="task_id" ) if errors: message = "ValidationError (%s:%s) " % (self._class_name, self.pk) raise mongoengine.errors.ValidationError(message, errors=errors) def save(self, *args, **kwargs): self._validate() if not self.created_at: self.created_at = datetime.datetime.utcnow() self.last_modified_at = datetime.datetime.utcnow() return super(Task, self).save(*args, **kwargs) def get_url(self): return url_for("task.get_task", task_id=self.task_id, _external=True) def to_dict(self): res = { "taskId": self.task_id, "createdAt": self.created_at.isoformat(), "lastModifiedAt": self.last_modified_at.isoformat(), "url": self.get_url(), "jobsCount": len(self.jobs), } return res @classmethod def create_task(cls, task_id): new_task = cls(task_id=task_id) new_task.save() return new_task @classmethod def get_tasks(cls, page=1, per_page=20): return cls.objects.paginate(page, per_page) # pylint: disable=no-member @classmethod def search_tasks(cls, query, page=1, per_page=20): return cls.objects.search_text(query).paginate(page, per_page) @classmethod def get_by_task_id(cls, task_id): if task_id is None or task_id == "": raise RuntimeError("Task ID is required and can't be None or empty.") return cls.objects(task_id=task_id).no_dereference().first() def create_job(self, image, command): from fastlane.models.job import Job job_id = uuid4() j = Job(task_id=str(self.task_id), job_id=str(job_id)) j.task = self j.image = image j.command = command j.save() self.jobs.append(j) self.save() return j def create_or_update_job(self, job_id, image, command): from fastlane.models.job import Job jobs = list(filter(lambda job: str(job.job_id) == job_id, self.jobs)) if not jobs: j = Job(task_id=str(self.task_id), job_id=str(job_id)) j.task = self j.image = image j.command = command j.save() self.jobs.append(j) self.save() else: j = jobs[0] j.image = image j.command = command j.save() return j PK!{l&&fastlane/queue.py# Standard Library from datetime import datetime, timezone from uuid import uuid4 # 3rd Party from flask import current_app # Fastlane from fastlane.models.categories import Categories from fastlane.utils import dumps, get_next_cron_timestamp, loads, parse_time, to_unix from fastlane.worker.job import monitor_job, run_job, send_email, send_webhook class Message: MESSAGE_DETAILS_NAME = "fastlane:scheduled-message" MESSAGE_HASH_NAME = "fastlane:message-queue-ids" def __init__(self, queue, category, cron_str, *args, **kw): self.queue = queue self.category = category self.cron_str = cron_str if "id" in kw: self.id = kw.pop("id") # pylint: disable=invalid-name else: self.id = str(uuid4()) self.args = args self.kwargs = kw def serialize(self): return dumps(self) @classmethod def deserialize(cls, data): instance = Message("", "", None) instance.__dict__ = loads( # pylint: disable=attribute-defined-outside-init data ) return instance def message_hash_key(self): return self.generate_message_hash_key(self.queue) def message_key(self): return self.generate_message_key(self.id) @classmethod def generate_message_key(cls, message_id): return f"{Message.MESSAGE_DETAILS_NAME}:{message_id}" @classmethod def generate_message_hash_key(cls, queue): return f"{Message.MESSAGE_HASH_NAME}:{queue}" def run(self): if self.category == Categories.Job: current_app.logger.debug("Running job...") run_job(*self.args, **self.kwargs) if self.category == Categories.Monitor: current_app.logger.debug("Running Monitoring...") monitor_job(*self.args, **self.kwargs) if self.category == Categories.Webhook: current_app.logger.debug("Running Webhook...") send_webhook(*self.args, **self.kwargs) if self.category == Categories.Notify: current_app.logger.debug("Running Notification...") send_email(*self.args, **self.kwargs) class QueueGroup: def __init__(self, logger, redis, queues): self.logger = logger self.queues = queues self.redis = redis def dequeue(self, queues=None, timeout=1): queues_to_pop = [] if queues is None: queues_to_pop = [q.queue_name for q in self.queues] else: queues_to_pop = [q.queue_name for q in self.queues if q.queue_id in queues] queue_executor = QueueExecutor(redis=self.redis, logger=self.logger) return queue_executor.dequeue_message(queues_to_pop, blocking=True, timeout=timeout) def move_jobs(self): logger = self.logger.bind(operation="move_jobs") lock = self.redis.lock( Queue.MOVE_JOBS_LOCK_NAME, timeout=5, sleep=0.1, blocking_timeout=500, thread_local=False, ) if not lock.acquire(): logger.info("Lock could not be acquired. Trying to move jobs again later.") return None moved_items = [] try: timestamp = to_unix(datetime.utcnow()) pipe = self.redis.pipeline() pipe.zrangebyscore(Queue.SCHEDULED_QUEUE_NAME, "-inf", timestamp) pipe.zremrangebyscore(Queue.SCHEDULED_QUEUE_NAME, "-inf", timestamp) items, _ = pipe.execute() if not items: logger.info("No jobs in need of moving right now.") return moved_items logger.info("Found jobs in need of moving.", job_count=len(items)) for message_id in items: key = Message.generate_message_key(message_id.decode("utf-8")) data = self.redis.get(key) message = Message.deserialize(data.decode("utf-8")) logger.info( "Moving job to queue.", queue_name=message.queue, job_id=message.id, msg=message.serialize(), ) queue_executor = QueueExecutor(redis=self.redis, logger=self.logger) queue_executor.enqueue_message(message) moved_items.append(message) logger.info("Moved jobs successfully.") return moved_items finally: lock.release() class Queue: QUEUE_NAME = "fastlane:message-queue" SCHEDULED_QUEUE_NAME = "fastlane:scheduled-messages:items" MOVE_JOBS_LOCK_NAME = "fastlane:move-jobs-lock" def __init__(self, logger, redis, queue_name="main"): self.redis = redis self.queue_id = queue_name self.queue_name = f"{Queue.QUEUE_NAME}:{queue_name}" self.logger = logger.bind(queue_id=self.queue_id, queue_name=self.queue_name) def enqueue(self, category, *args, **kw): message = Message(self.queue_name, category, None, *args, **kw) self.logger.info( "Sending message to queue.", operation="enqueue", category=category ) queue_executor = QueueExecutor(redis=self.redis, logger=self.logger) queue_executor.enqueue_message(message) return message.id def enqueue_at(self, timestamp, category, *args, **kw): return self.__enqueue_at_timestamp(timestamp, category, None, *args, **kw) def enqueue_in(self, incr, category, *args, **kw): start_in = parse_time(incr) future_date = datetime.now(tz=timezone.utc) + start_in timestamp = to_unix(future_date) return self.__enqueue_at_timestamp(timestamp, category, None, *args, **kw) def enqueue_cron(self, cron, category, *args, **kw): next_dt = get_next_cron_timestamp(cron) timestamp = to_unix(next_dt) return self.__enqueue_at_timestamp(timestamp, category, cron, *args, **kw) def dequeue(self, blocking=False, timeout=1): queue_executor = QueueExecutor(redis=self.redis, logger=self.logger) return queue_executor.dequeue_message(self.queue_name, blocking=blocking, timeout=timeout) def is_scheduled(self, message_id): return self.redis.zrank(Queue.SCHEDULED_QUEUE_NAME, message_id) is not None def is_enqueued(self, message_id): message_hash_key = Message.generate_message_hash_key(self.queue_name) return self.redis.zrank(message_hash_key, message_id) is not None def deschedule(self, message_id): logger = self.logger.bind(operation="deschedule", message_id=message_id) logger.info("Descheduling job from queue.") pipe = self.redis.pipeline() pipe.zrem(Queue.SCHEDULED_QUEUE_NAME, message_id) pipe.delete(Message.generate_message_key(message_id)) results = pipe.execute() if results[0] == 1: logger.info("Descheduling job successful.") else: logger.info("Descheduling job failed (maybe job was not found).") return results[0] == 1 def __enqueue_at_timestamp(self, timestamp, category, cron_str, *args, **kw): if not isinstance(timestamp, (int,)): raise RuntimeError( f"timestamp must be a UTC Unix Timestamp (integer), not {type(timestamp)}" ) message = Message(self.queue_name, category, cron_str, *args, **kw) queue_executor = QueueExecutor(redis=self.redis, logger=self.logger) return queue_executor.enqueue_at_timestamp(message, timestamp) class QueueExecutor: def __init__(self, redis, logger): self.redis = redis self.logger = logger def enqueue_message(self, message): timestamp = to_unix(datetime.utcnow()) pipe = self.redis.pipeline() pipe.set(message.message_key(), message.serialize()) pipe.zadd(message.message_hash_key(), {message.id: timestamp}) pipe.lpush(message.queue, message.id) if message.cron_str: next_dt = get_next_cron_timestamp(message.cron_str) timestamp = to_unix(next_dt) msg_schedule = Message(message.queue, message.category, message.cron_str, *message.args, **message.kwargs) self._pipe_at_timestamp(pipe, msg_schedule, timestamp) return pipe.execute() def enqueue_at_timestamp(self, message, timestamp): pipe = self.redis.pipeline() self._pipe_at_timestamp(pipe, message, timestamp) pipe.execute() return message.id def _pipe_at_timestamp(self, pipe, message, timestamp): self.logger.info( "Scheduling message to run at future time.", operation="schedule", timestamp=timestamp, category=message.category, ) pipe.set(message.message_key(), message.serialize()) pipe.zadd(Queue.SCHEDULED_QUEUE_NAME, {message.id: timestamp}) def dequeue_message(self, queue_names, blocking=False, timeout=1): queue = "" if isinstance(queue_names, (tuple, list)) or blocking: result = self.redis.blpop(queue_names, timeout=max(timeout, 1)) if result is None: return None queue, item = result else: item = self.redis.lpop(queue_names) queue = queue_names[0] if item is None: return None msg_id = item.decode("utf-8") pipe = self.redis.pipeline() message_key = Message.generate_message_key(msg_id) message_hash_key = Message.generate_message_hash_key(queue) pipe.zrem(message_hash_key, msg_id) pipe.get(message_key) pipe.delete(message_key) _, message_data, _ = pipe.execute() return Message.deserialize(message_data.decode("utf-8")) PK!-*fastlane/templates/stream.html {% if execution_id %} {{ task_id }} - {{ job_id }} - {{ execution_id }} - Fastlane {% else %} {{ task_id }} - {{ job_id }} - Fastlane {% endif %}

PK!tEzzfastlane/utils.py# Standard Library import calendar import re import copy from datetime import datetime, timedelta # 3rd Party import croniter REGEX = re.compile(r"((?P\d+?)h)?((?P\d+?)m)?((?P\d+?)s)?") try: from ujson import loads, dumps # NOQA pylint: disable=unused-import except ImportError: from json import loads, dumps # NOQA pylint: disable=unused-import def parse_time(time_str): if time_str is None: return None parts = REGEX.match(time_str) if not parts: return None parts = parts.groupdict() time_params = {} for (name, param) in parts.items(): if param: time_params[name] = int(param) return timedelta(**time_params) # from_unix from times.from_unix() def from_unix(string): """Convert a unix timestamp into a utc datetime""" return datetime.utcfromtimestamp(float(string)) # to_unix from times.to_unix() def to_unix(date): """Converts a datetime object to unixtime""" return calendar.timegm(date.utctimetuple()) def unix_now(): return to_unix(datetime.utcnow()) def get_next_cron_timestamp(cron): itr = croniter.croniter(cron, datetime.utcnow()) next_dt = itr.get_next(datetime) return next_dt def words_redacted(data, blacklist_fn, replacements="***"): new_data = copy.deepcopy(data) def redacted(data_redacted): for key, val in data_redacted.items(): if blacklist_fn(key): data_redacted[key] = replacements continue if isinstance(val, dict): redacted(val) return data_redacted return redacted(new_data) PK!Jߕfastlane/worker/__init__.pyclass ExecutionResult: class Status: created = 'created' running = 'running' failed = 'failed' done = 'done' def __init__(self, status): self.status = status self.exit_code = None self.started_at = None self.finished_at = None self.log = '' self.error = '' def set_log(self, log): self.log = log PK!G22"fastlane/worker/docker/__init__.py from fastlane.worker.docker.api import bp from fastlane.worker.docker.executor import Executor from fastlane.worker.docker.pool import DockerPool from fastlane.worker.docker.api import BLACKLIST_KEY from fastlane.worker.docker.executor import JOB_PREFIX from fastlane.worker.docker.executor import STATUS PK!,+fastlane/worker/docker/api.py# Standard Library from json import loads # 3rd Party from flask import Blueprint, current_app, g, make_response, request bp = Blueprint( # pylint: disable=invalid-name "docker", __name__, url_prefix="/docker-executor" ) BLACKLIST_KEY = "docker-executor::blacklisted-hosts" @bp.route("/blacklist", methods=["POST", "PUT"]) def add_to_blacklist(): redis = current_app.redis data = get_details() if data is None or data == "": msg = "Failed to add host to blacklist because JSON body could not be parsed." g.logger.warn(msg) return make_response(msg, 400) if "host" not in data: msg = "Failed to add host to blacklist because 'host' attribute was not found in JSON body." g.logger.warn(msg) return make_response(msg, 400) host = data["host"] redis.sadd(BLACKLIST_KEY, host) return "" @bp.route("/blacklist", methods=["DEL", "DELETE"]) def remove_from_blacklist(): redis = current_app.redis data = get_details() if data is None or data == "": msg = "Failed to remove host from blacklist because JSON body could not be parsed." g.logger.warn(msg) return make_response(msg, 400) if "host" not in data: msg = ( "Failed to remove host from blacklist because 'host'" " attribute was not found in JSON body." ) g.logger.warn(msg) return make_response(msg, 400) host = data["host"] redis.srem(BLACKLIST_KEY, host) return "" def get_details(): details = request.get_json() if details is None and request.get_data(): details = loads(request.get_data()) return details PK!s]OO"fastlane/worker/docker/executor.py# # Standard Library # import random import re import traceback from json import loads # 3rd Party import pybreaker import requests from dateutil.parser import parse from flask import current_app # Fastlane from fastlane.worker import ExecutionResult from fastlane.worker.docker.api import BLACKLIST_KEY from fastlane.worker.docker.pool import DockerPool from fastlane.worker.errors import ( ContainerUnavailableError, HostUnavailableError, ) JOB_PREFIX = "fastlane-job" # http://docs.docker.com/engine/reference/commandline/ps/#examples # One of created, restarting, running, removing, paused, exited, or dead STATUS = { "created": ExecutionResult.Status.created, "exited": ExecutionResult.Status.done, "dead": ExecutionResult.Status.failed, "running": ExecutionResult.Status.running, } def convert_date(date_to_parse): return parse(date_to_parse) class Executor: def __init__(self, app, pool=None): self.app = app self.logger = app.logger self.pool = pool self.circuits = {} if pool is None: docker_hosts = [] hosts = self.app.config["DOCKER_HOSTS"] if isinstance(hosts, (tuple, list)): clusters = list(hosts) elif isinstance(hosts, (dict)): clusters = [hosts] else: clusters = loads(hosts) self.logger.debug("Initializing docker pool...", clusters=clusters) for i, cluster in enumerate(clusters): regex = cluster["match"] if not regex: regex = None if i != len(clusters): self.logger.warn( "Farm with no regex found before the end of the DOCKER_HOSTS " "definition. This means all the subsequent farms will never be" " used as this one will always match anything that reaches it. " "Please ensure that the farm with no regex match is the last one." ) else: regex = re.compile(regex) hosts = cluster["hosts"] max_running = cluster.get("maxRunning", 10) self.logger.info( "Found farm definition.", regex=cluster["match"], hosts=hosts, max_running=max_running, ) docker_hosts.append((regex, hosts, max_running)) self.pool = DockerPool(docker_hosts) def get_container_by_id(self, container_id, host, port, client): logger = self.logger.bind( host=host, port=port, container_id=container_id, operation="docker_host.get_container_by_id", ) circuit = self.get_circuit(f"{host}:{port}") @circuit def run(logger): try: logger = logger.bind(container_id=container_id) logger.debug("Finding container...") container = client.containers.get(container_id) logger.info("Container found.") return container except requests.exceptions.ConnectionError as err: raise HostUnavailableError(host, port, err) from err try: return run(logger) except pybreaker.CircuitBreakerError as err: raise HostUnavailableError(host, port, err) from err def validate_max_running_executions(self, task_id): total_running = 0 max_running = 0 logger = self.logger.bind( task_id=task_id, operation="docker_host.validate_max_running_executions" ) for regex, _ in self.pool.clients_per_regex: if regex is not None and not regex.match(task_id): logger.debug("Farm does not match task_id.", regex=regex) continue running = self.get_running_containers(regex) total_running = len(running["running"]) max_running = self.pool.max_running[regex] logger.debug( "Found number of running containers.", total_running=total_running, max_running=max_running, ) break return total_running == 0 or total_running <= max_running def get_circuit(self, key): max_fails = int(current_app.config["DOCKER_CIRCUIT_BREAKER_MAX_FAILS"]) reset_timeout = int( current_app.config["DOCKER_CIRCUIT_BREAKER_RESET_TIMEOUT_SECONDS"] ) return self.circuits.setdefault( key, pybreaker.CircuitBreaker( fail_max=max_fails, reset_timeout=reset_timeout, state_storage=pybreaker.CircuitRedisStorage( pybreaker.STATE_CLOSED, current_app.redis, namespace=key ), ), ) def update_image(self, task, job, execution, image, tag, blacklisted_hosts=None): if blacklisted_hosts is None: blacklisted_hosts = self.get_blacklisted_hosts() logger = self.logger.bind( task_id=task.task_id, job_id=str(job.job_id), execution_id=str(execution.execution_id), image=image, tag=tag, blacklisted_hosts=blacklisted_hosts, operation="docker_executor.update_image", ) host, port, client = self.pool.get_client( self, task.task_id, blacklist=blacklisted_hosts ) circuit = self.get_circuit(f"{host}:{port}") logger = logger.bind(host=host, port=port) @circuit def run(logger): try: logger.debug("Updating image in docker host...") client.images.pull(image, tag=tag) execution.metadata["docker_host"] = host execution.metadata["docker_port"] = port logger.info( "Image updated successfully. Docker host and port " "stored in Job Execution for future reference." ) except requests.exceptions.ConnectionError as err: error = traceback.format_exc() logger.error( "Failed to connect to Docker Host. Will retry job later with a new host.", error=error, ) if "docker_host" in execution.metadata: del execution.metadata["docker_host"] if "docker_port" in execution.metadata: del execution.metadata["docker_port"] raise HostUnavailableError(host, port, err) from err try: run(logger) except pybreaker.CircuitBreakerError as err: raise HostUnavailableError(host, port, err) from err def run(self, task, job, execution, image, tag, command, blacklisted_hosts=None): logger = self.logger.bind( task_id=task.task_id, job_id=str(job.job_id), execution_id=str(execution.execution_id), image=image, tag=tag, command=command, blacklisted_hosts=blacklisted_hosts, operation="docker_executor.run", ) if "docker_host" not in execution.metadata: raise RuntimeError( "Can't run job without docker_host and docker_port in execution metadata." ) docker_host = execution.metadata["docker_host"] docker_port = execution.metadata["docker_port"] host, port, client = self.pool.get_client( self, task.task_id, docker_host, docker_port ) logger = logger.bind(host=host, port=port) circuit = self.get_circuit(f"{host}:{port}") @circuit def run(logger): try: container_name = f"{JOB_PREFIX}-{execution.execution_id}" envs = job.metadata.get("envs", {}) additional_dns_entries = dict( job.metadata.get("additional_dns_entries", []) ) logger = logger.bind( container_name=container_name, envs=envs, additional_dns_entries=additional_dns_entries, ) logger.debug("Running the Job in Docker Host...") container = client.containers.run( image=f"{image}:{tag}", name=container_name, command=command, detach=True, environment=envs, extra_hosts=additional_dns_entries, ) execution.metadata["container_id"] = container.id logger.info( "Container started successfully. Container ID " "stored as Job Execution metadata.", container_id=container.id, ) except (requests.exceptions.ConnectionError,) as err: error = traceback.format_exc() logger.error( "Failed to connect to Docker Host. Will retry job later with a new host.", error=error, ) if "docker_host" in execution.metadata: del execution.metadata["docker_host"] if "docker_port" in execution.metadata: del execution.metadata["docker_port"] raise HostUnavailableError(host, port, err) from err try: run(logger) except pybreaker.CircuitBreakerError as err: raise HostUnavailableError(host, port, err) from err return True def stop_job(self, task, job, execution): logger = self.logger.bind( task_id=task.task_id, job_id=str(job.job_id), execution_id=str(execution.execution_id), operation="docker_executor.stop_job", ) if "container_id" not in execution.metadata: logger.warn( "Can't stop Job Execution, since it has not been started. Aborting..." ) return False docker_host = execution.metadata["docker_host"] docker_port = execution.metadata["docker_port"] host, port, client = self.pool.get_client( self, task.task_id, docker_host, docker_port ) logger = logger.bind(host=host, port=port) circuit = self.get_circuit(f"{host}:{port}") container_id = execution.metadata["container_id"] logger = logger.bind(container_id=container_id) container = self.get_container_by_id(container_id, host, port, client) if container is None: logger.warn( "Can't stop Job Execution, since container was not found. Aborting..." ) return False @circuit def run(logger): try: logger.info("Container found.") logger.debug("Stopping container...") container.stop() logger.info("Container stopped.") except requests.exceptions.ConnectionError as err: error = traceback.format_exc() logger.error("Failed to connect to Docker Host.", error=error) raise HostUnavailableError(host, port, err) from err try: run(logger) except pybreaker.CircuitBreakerError as err: raise HostUnavailableError(host, port, err) from err return True def get_result(self, task, job, execution): execution_host = execution.metadata["docker_host"] execution_port = execution.metadata["docker_port"] host, port, client = self.pool.get_client( self, task.task_id, execution_host, execution_port ) logger = self.logger.bind( task_id=task.task_id, job_id=str(job.job_id), execution_id=str(execution.execution_id), operation="docker_executor.get_result", ) container_id = execution.metadata["container_id"] container = self.get_container_by_id(container_id, host, port, client) if container is None: return None # container.attrs['State'] # {'Status': 'exited', 'Running': False, 'Paused': False, 'Restarting': False, # 'OOMKilled': False, 'Dead': False, 'Pid': 0, 'ExitCode': 0, 'Error': '', # 'StartedAt': '2018-08-27T17:14:14.1951232Z', 'FinishedAt': '2018-08-27T17:14:14.2707026Z'} logger = logger.bind(container_id=container_id) result = ExecutionResult( STATUS.get(container.status, ExecutionResult.Status.done) ) state = container.attrs["State"] result.exit_code = state["ExitCode"] result.error = state["Error"] result.started_at = convert_date(state["StartedAt"]) logger = logger.bind( status=container.status, state=state, exit_code=result.exit_code, error=result.error, ) logger.debug("Container result found.") if ( result.status == ExecutionResult.Status.done or result.status == ExecutionResult.Status.failed ): # TODO: Use circuit in this point result.finished_at = convert_date(state["FinishedAt"]) result.log = container.logs(stdout=True, stderr=False) if result.error != "": logs = container.logs(stdout=False, stderr=True).decode("utf-8") result.error += f"\n\nstderr:\n{logs}" else: result.error = container.logs(stdout=False, stderr=True) logger.info("Container finished executing.", finished_at=result.finished_at) return result def _get_all_clients(self, regex): clients = self.pool.clients.values() if regex is not None: for cluster_regex, cluster_clients in self.pool.clients_per_regex: if cluster_regex is not None and cluster_regex != regex: continue clients = cluster_clients break return [ (host, port, client, self.get_circuit(f"{host}:{port}")) for host, port, client in clients ] def _list_containers(self, host, port, client, circuit): @circuit def run(): running = [] containers = client.containers.list(sparse=False) for container in containers: if not container.name.startswith(JOB_PREFIX): continue running.append((host, port, container.id)) return running return run() def get_running_containers(self, regex=None, blacklisted_hosts=None): if blacklisted_hosts is None: blacklisted_hosts = self.get_blacklisted_hosts() running = [] unavailable_clients = [] unavailable_clients_set = set() clients = self._get_all_clients(regex) for (host, port, client, circuit) in clients: if f"{host}:{port}" in blacklisted_hosts: unavailable_clients_set.add(f"{host}:{port}") unavailable_clients.append( (host, port, RuntimeError("server is blacklisted")) ) continue try: running += self._list_containers(host, port, client, circuit) except Exception as err: unavailable_clients_set.add(f"{host}:{port}") unavailable_clients.append((host, port, err)) return { "available": [ { "host": host, "port": port, "available": True, "blacklisted": f"{host}:{port}" in blacklisted_hosts, "circuit": circuit.current_state, "error": None, } for (host, port, client, circuit) in clients if f"{host}:{port}" not in unavailable_clients_set ], "unavailable": [ { "host": host, "port": port, "available": False, "blacklisted": f"{host}:{port}" in blacklisted_hosts, "circuit": self.get_circuit(f"{host}:{port}").current_state, "error": str(err), } for (host, port, err) in unavailable_clients ], "running": running, } def get_streaming_logs(self, task_id, job, execution): execution_host = execution.metadata["docker_host"] execution_port = execution.metadata["docker_port"] host, port, client = self.pool.get_client( self, task_id, execution_host, execution_port ) container_id = execution.metadata["container_id"] logger = self.logger.bind( task_id=task_id, job=str(job.job_id), execution_id=str(execution.execution_id), operation="docker_host.get_streaming_logs", host=host, port=port, container_id=container_id, ) logger.debug("Getting container...") container = self.get_container_by_id( container_id, execution_host, execution_port, client ) if container is None: raise ContainerUnavailableError( f"Container {container_id} was not found in {execution_host}:{execution_port}!" ) logger.info("Container found successfully.") for log in container.logs(stdout=True, stderr=True, stream=True): yield log.decode("utf-8") def get_blacklisted_hosts(self): redis = self.app.redis hosts = redis.smembers(BLACKLIST_KEY) return {host.decode("utf-8") for host in hosts} def mark_as_done(self, task, job, execution): execution_host = execution.metadata["docker_host"] execution_port = execution.metadata["docker_port"] host, port, client = self.pool.get_client( self, task.task_id, execution_host, execution_port ) container_id = execution.metadata["container_id"] logger = self.logger.bind( task_id=task.task_id, job=str(job.job_id), execution_id=str(execution.execution_id), operation="docker_host.mark_as_done", host=host, port=port, container_id=container_id, ) container = self.get_container_by_id(container_id, host, port, client) if container is None: return False try: new_name = f"defunct-{container.name}" logger.debug("Renaming container...", new_name=new_name) container.rename(new_name) logger.debug("Container renamed.", new_name=new_name) return True except ( pybreaker.CircuitBreakerError, requests.exceptions.ConnectionError, ) as err: error = traceback.format_exc() logger.error("Failed to connect to Docker Host.", error=error) raise HostUnavailableError(host, port, err) from err def remove_done(self): removed_containers = [] clients = self.pool.clients.values() for (host, port, client) in clients: containers = client.containers.list( sparse=False, all=True, filters={"name": f"defunct-{JOB_PREFIX}"} ) for container in containers: removed_containers.append( { "host": f"{host}:{port}", "name": container.name, "id": container.id, "image": container.image.attrs["RepoTags"][0], } ) container.remove() self.logger.info( "Removed all defunct containers.", removed_containers=removed_containers ) return removed_containers PK!P fastlane/worker/docker/pool.py # Standard Library import random import traceback # 3rd Party import docker import pybreaker import requests from flask import current_app # Fastlane from fastlane.worker.errors import NoAvailableHostsError class DockerPool: def __init__(self, docker_hosts): self.docker_hosts = docker_hosts self.max_running = {} self.clients_per_regex = [] self.clients = {} self.__init_clients() def __init_clients(self): for regex, docker_hosts, max_running in self.docker_hosts: client_list = [] clients = (regex, client_list) self.clients_per_regex.append(clients) self.max_running[regex] = max_running for address in docker_hosts: host, port = address.split(":") docker_client = docker.DockerClient(base_url=address) self.clients[address] = (host, int(port), docker_client) client_list.append((host, int(port), docker_client)) @staticmethod def refresh_circuits(executor, clients, blacklisted_hosts, logger): def docker_ps(client): client.containers.list(sparse=False) for host, port, client in clients: if f"{host}:{port}" in blacklisted_hosts: continue try: logger.debug("Refreshing host...", host=host, port=port) circuit = executor.get_circuit(f"{host}:{port}") circuit.call(docker_ps, client) except (requests.exceptions.ConnectionError, pybreaker.CircuitBreakerError): error = traceback.format_exc() logger.error("Failed to refresh host.", error=error) def get_client(self, executor, task_id, host=None, port=None, blacklist=None): logger = current_app.logger.bind( task_id=task_id, host=host, port=port, blacklist=blacklist ) if host is not None and port is not None: logger.debug("Custom host returned.") docker_client = self.clients.get(f"{host}:{port}") if docker_client is None: return host, port, None return docker_client if blacklist is None: blacklist = set() for regex, clients in self.clients_per_regex: logger.debug("Trying to retrieve docker client...", regex=regex) if regex is not None and not regex.match(task_id): logger.debug("Task ID does not match regex.", regex=regex) continue DockerPool.refresh_circuits(executor, clients, blacklist, logger) filtered = [ (host, port, client) for (host, port, client) in clients if f"{host}:{port}" not in blacklist and executor.get_circuit(f"{host}:{port}").current_state == "closed" ] if not filtered: logger.debug( "No non-blacklisted and closed circuit clients found for farm.", regex=regex, ) continue logger.info( "Returning random choice out of the remaining clients.", clients=[f"{host}:{port}" for (host, port, client) in filtered], ) host, port, client = random.choice(filtered) return host, int(port), client msg = f"Failed to find a docker host for task id {task_id}." logger.error(msg) raise NoAvailableHostsError(msg) PK!U)fastlane/worker/errors.pyclass HostUnavailableError(RuntimeError): def __init__(self, host, port, error): msg = f"Connection to host {host}:{port} failed with error: {error}" super(HostUnavailableError, self).__init__(msg) self.host = host self.port = port self.error = error self.message = f"Connection to host {self.host}:{self.port} failed with error: {self.error}" class NoAvailableHostsError(RuntimeError): pass class ContainerUnavailableError(RuntimeError): pass PK!Lllfastlane/worker/job.py# Standard Library import math import smtplib import time import traceback from datetime import datetime, timedelta from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText # 3rd Party from flask import current_app, url_for # Fastlane from fastlane.helpers import dumps, loads from fastlane.models import Job, JobExecution from fastlane.models.categories import Categories from fastlane.utils import from_unix, to_unix from fastlane.worker import ExecutionResult from fastlane.worker.errors import HostUnavailableError from fastlane.worker.webhooks import WebhooksDispatcher, WebhooksDispatchError def validate_max_concurrent( executor, task_id, job, execution_id, image, command, logger ): if not executor.validate_max_running_executions(task_id): logger.debug( "Maximum number of global container executions reached. Enqueuing job execution..." ) enqueued_id = job.enqueue( current_app, execution_id, image=image, command=command ) job.metadata["enqueued_id"] = enqueued_id job.save() logger.info( "Job execution re-enqueued successfully due to max number of container executions." ) return False return True def validate_expiration(job, ex, logger): now = datetime.utcnow() unixtime = to_unix(now) if ( job.metadata.get("expiration") is not None and job.metadata["expiration"] < unixtime ): expiration_utc = from_unix(job.metadata["expiration"]) ex.status = JobExecution.Status.expired ex.error = ( f"Job was supposed to be done before {expiration_utc.isoformat()}, " f"but was started at {from_unix(unixtime).isoformat()}." ) ex.finished_at = now job.save() logger.info( "Job execution canceled due to being expired.", job_expiration=job.metadata["expiration"], current_ts=unixtime, ) return False return True def reenqueue_job_due_to_break(task_id, job_id, execution_id, image, command): args = [task_id, job_id, execution_id, image, command] delta = timedelta(seconds=1.0) future_date = to_unix(datetime.utcnow() + delta) enqueued = current_app.jobs_queue.enqueue_at(future_date, Categories.Job, *args) return enqueued def download_image(executor, job, ex, image, tag, command, logger): try: logger.debug("Changing job status...", status=JobExecution.Status.pulling) ex.status = JobExecution.Status.pulling ex.save() logger.debug( "Job status changed successfully.", status=ex.status ) logger.debug("Downloading updated container image...", image=image, tag=tag) before = time.time() executor.update_image(job.task, job, ex, image, tag) ellapsed = time.time() - before logger.info( "Image downloaded successfully.", image=image, tag=tag, ellapsed=ellapsed ) except HostUnavailableError: error = traceback.format_exc() logger.error("Host is unavailable.", error=error) enqueued_id = reenqueue_job_due_to_break( job.task.task_id, str(job.job_id), ex.execution_id, image, command ) job.metadata["enqueued_id"] = enqueued_id job.save() logger.warn("Job execution re-enqueued successfully.") return False except Exception as err: error = traceback.format_exc() logger.error("Failed to download image.", error=error) ex.error = error ex.status = JobExecution.Status.failed ex.save() current_app.report_error( err, metadata=dict( operation="Downloading Image", task_id=job.task.task_id, job_id=job.id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) return False return True def run_container(executor, job, ex, image, tag, command, logger): logger.debug( "Running command in container...", image=image, tag=tag, command=command ) try: logger.debug("Changing job status...", status=JobExecution.Status.running) ex.started_at = datetime.utcnow() ex.status = JobExecution.Status.running ex.save() logger.debug( "Job status changed successfully.", status=ex.status ) before = time.time() executor.run(job.task, job, ex, image, tag, command) ellapsed = time.time() - before logger.info( "Container started successfully.", image=image, tag=tag, command=command, ellapsed=ellapsed, ) except HostUnavailableError: error = traceback.format_exc() logger.error("Host is unavailable.", error=error) enqueued_id = reenqueue_job_due_to_break( job.task.task_id, str(job.job_id), ex.execution_id, image, command ) job.metadata["enqueued_id"] = enqueued_id job.save() logger.warn("Job execution re-enqueued successfully.") return False except Exception as err: error = traceback.format_exc() logger.error("Failed to run command", error=error) ex.error = error ex.status = JobExecution.Status.failed ex.save() current_app.report_error( err, metadata=dict( operation="Running Container", task_id=job.task.task_id, job_id=job.id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) return False return True def run_job(task_id, job_id, execution_id, image, command): app = current_app logger = app.logger.bind( operation="run_job", task_id=task_id, job_id=job_id, image=image, command=command, ) try: executor = app.executor job = Job.get_by_id(task_id, job_id) if job is None: logger.error("Job was not found with task id and job id.") return False if not validate_max_concurrent( executor, task_id, job, execution_id, image, command, logger ): return False tag = "latest" if ":" in image: image, tag = image.split(":") logger = logger.bind(image=image, tag=tag) logger.debug("Changing job status...", status=JobExecution.Status.enqueued) if execution_id is None: ex = job.create_execution(image=image, command=command) ex.status = JobExecution.Status.enqueued ex.save() else: ex = job.get_execution_by_id(execution_id) logger.debug( "Job status changed successfully.", status=ex.status ) logger = logger.bind(execution_id=ex.execution_id) except Exception as err: error = traceback.format_exc() logger.error("Failed to create job execution. Skipping job...", error=error) current_app.report_error( err, metadata=dict(task_id=task_id, job_id=job_id, image=image, command=command), ) return False try: if not validate_expiration(job, ex, logger): return False logger.info("Started processing job.") if not download_image(executor, job, ex, image, tag, command, logger): return False if not run_container(executor, job, ex, image, tag, command, logger): return False logger.debug("Changing job status...", status=JobExecution.Status.running) ex.status = JobExecution.Status.running ex.save() job.save() logger.debug( "Job status changed successfully.", status=JobExecution.Status.running ) current_app.monitor_queue.enqueue_in( "1s", Categories.Monitor, task_id, job_id, ex.execution_id ) return True except Exception as err: error = traceback.format_exc() logger.error("Failed to run job", error=error) ex.status = JobExecution.Status.failed ex.error = "Job failed to run with error: %s" % error ex.save() job.save() current_app.report_error( err, metadata=dict( operation="Running Container", task_id=task_id, job_id=job_id, execution_id=ex.execution_id, image=image, tag=tag, command=command, ), ) def _webhook_dispatch(task, job, execution, collection, logger): task_id = task.task_id job_id = str(job.job_id) execution_id = str(execution.execution_id) for webhook in collection: method = "POST" url = webhook.get("url") if url is None: logger.warn("Webhook with empty URL found. Skipping...") continue headers = webhook.get("headers", {}) retries = webhook.get("retries", 0) hook_logger = logger.bind( method=method, url=url, headers=headers, retries=retries, retry_count=0 ) hook_logger.debug("Enqueueing webhook...") args = [task_id, job_id, execution_id, method, url, headers, retries, 0] current_app.webhooks_queue.enqueue(Categories.Webhook, *args) hook_logger.info("Webhook enqueued successfully.") def send_webhooks(task, job, execution, logger): if execution.status == JobExecution.Status.done: succeed = job.metadata.get("webhooks", {}).get("succeeds", []) logger.debug("Sending success webhooks...") _webhook_dispatch(task, job, execution, succeed, logger) if execution.status == JobExecution.Status.failed: fails = job.metadata.get("webhooks", {}).get("fails", []) logger.debug("Sending failed webhooks...") _webhook_dispatch(task, job, execution, fails, logger) finishes = job.metadata.get("webhooks", {}).get("finishes", []) logger.info("Sending completion webhooks...") _webhook_dispatch(task, job, execution, finishes, logger) def notify_users(task, job, execution, logger): task_id = task.task_id job_id = str(job.job_id) execution_id = str(execution.execution_id) succeed = job.metadata.get("notify", {}).get("succeeds", []) fails = job.metadata.get("notify", {}).get("fails", []) finishes = job.metadata.get("notify", {}).get("finishes", []) if execution.status == JobExecution.Status.done: logger.info("Notifying users of success...") for email in succeed: logger.info("Notifying user of success...", email=email) subject = "Job %s/%s succeeded!" % (task_id, job_id) args = [task_id, job_id, execution_id, subject, email] current_app.notify_queue.enqueue(Categories.Notify, *args) if execution.status == JobExecution.Status.failed: logger.info("Notifying users of failure...") for email in fails: logger.info( "Notifying user of failure...", email=email, exit_code=execution.exit_code, ) subject = "Job %s/%s failed with exit code %d!" % ( task_id, job_id, execution.exit_code, ) args = [task_id, job_id, execution_id, subject, email] current_app.notify_queue.enqueue(Categories.Notify, *args) logger.info("Notifying users of completion...") for email in finishes: logger.info( "Notifying user of completion...", email=email, exit_code=execution.exit_code, ) subject = "Job %s/%s finished with exit code %d!" % ( task_id, job_id, execution.exit_code, ) args = [task_id, job_id, execution_id, subject, email] current_app.notify_queue.enqueue(Categories.Notify, *args) def reenqueue_monitor_due_to_break(task_id, job_id, execution_id): args = [task_id, job_id, execution_id] enqueued_id = current_app.monitor_queue.enqueue_in("1s", Categories.Monitor, *args) return enqueued_id def monitor_job(task_id, job_id, execution_id): try: app = current_app executor = app.executor job = Job.get_by_id(task_id, job_id) logger = app.logger.bind( operation="monitor_job", task_id=task_id, job_id=job_id, execution_id=execution_id, ) if job is None: logger.error("Failed to retrieve task or job.") return False execution = job.get_execution_by_id(execution_id) if execution.status not in (JobExecution.Status.running,): logger.error("Execution result already retrieved. Skipping monitoring...") return False try: result = executor.get_result(job.task, job, execution) except HostUnavailableError as err: error = traceback.format_exc() logger.error("Failed to get results.", error=error) current_app.report_error( err, metadata=dict( operation="Monitoring Job", task_id=task_id, job_id=job_id, execution_id=execution_id, ), ) reenqueue_monitor_due_to_break(task_id, job_id, execution_id) logger.warn("Job monitor re-enqueued successfully.") return False if result is None: execution.finished_at = datetime.utcnow() execution.exit_code = result.exit_code execution.status = JobExecution.Status.failed execution.log = "" execution.error = ( "Job failed since container could not be found in docker host." ) logger.debug( "Job failed, since container could not be found in host.", status="failed", ) execution.save() job.save() send_webhooks(job.task, job, execution, logger) notify_users(job.task, job, execution, logger) return False logger.info( "Container result obtained.", container_status=result.status, container_exit_code=result.exit_code, ) if result.status in ( ExecutionResult.Status.created, ExecutionResult.Status.running, ): ellapsed = (datetime.utcnow() - execution.started_at).total_seconds() if ellapsed > job.metadata["timeout"]: execution.finished_at = datetime.utcnow() execution.status = JobExecution.Status.timedout execution.error = "Job execution timed out after %d seconds." % ellapsed try: executor.stop_job(job.task, job, execution) except HostUnavailableError as err: error = traceback.format_exc() logger.error("Failed to stop job.", error=error) current_app.report_error( err, metadata=dict( operation="Monitoring Job", task_id=task_id, job_id=job_id, execution_id=execution_id, ), ) reenqueue_monitor_due_to_break(task_id, job_id, execution_id) logger.warn("Job monitor re-enqueued successfully.") return False logger.debug( "Job execution timed out. Storing job details in mongo db.", status=execution.status, ellapsed=ellapsed, error=result.error, ) execution.save() job.save() logger.info("Job execution timed out.", status=execution.status) send_webhooks(job.task, job, execution, logger) notify_users(job.task, job, execution, logger) return False logger.info( "Job has not finished. Retrying monitoring in the future.", container_status=result.status, seconds=1, ) current_app.monitor_queue.enqueue_in( "5s", Categories.Monitor, task_id, job_id, execution_id ) return True if ( result.exit_code != 0 and "retry_count" in job.metadata and job.metadata["retry_count"] < job.metadata["retries"] ): retry_logger = logger.bind( exit_code=result.exit_code, retry_count=job.metadata["retry_count"], retries=job.metadata["retries"], ) retry_logger.debug("Job failed. Enqueuing job retry...") job.metadata["retry_count"] += 1 new_exec = job.create_execution(execution.image, execution.command) new_exec.status = JobExecution.Status.enqueued args = [ task_id, job_id, new_exec.execution_id, execution.image, execution.command, ] factor = app.config["EXPONENTIAL_BACKOFF_FACTOR"] min_backoff = app.config["EXPONENTIAL_BACKOFF_MIN_MS"] / 1000.0 delta = timedelta(seconds=min_backoff) if job.metadata["retries"] > 0: delta = timedelta( seconds=math.pow(factor, job.metadata["retry_count"]) * min_backoff ) future_date = datetime.utcnow() + delta enqueued_id = current_app.jobs_queue.enqueue_at( to_unix(future_date), Categories.Job, *args ) job.metadata["enqueued_id"] = enqueued_id job.save() retry_logger.info("Job execution enqueued successfully.") # still need to finish current execution as the retry # will be a new execution execution.finished_at = datetime.utcnow() execution.exit_code = result.exit_code execution.status = ( JobExecution.Status.done if execution.exit_code == 0 else JobExecution.Status.failed ) execution.log = result.log.decode("utf-8") execution.error = result.error.decode("utf-8") logger.debug( "Job finished. Storing job details in mongo db.", status=execution.status, log=result.log, error=result.error, ) execution.save() job.save() logger.info("Job details stored in mongo db.", status=execution.status) try: executor.mark_as_done(job.task, job, execution) except HostUnavailableError: error = traceback.format_exc() logger.error("Failed to mark job as done.", error=error) reenqueue_monitor_due_to_break(task_id, job_id, execution_id) logger.warn("Job monitor re-enqueued successfully.") return False send_webhooks(job.task, job, execution, logger) notify_users(job.task, job, execution, logger) return True except Exception as err: error = traceback.format_exc() logger.error("Failed to monitor job", error=error) current_app.report_error( err, metadata=dict( operation="Monitoring Job", task_id=task_id, job_id=job_id, execution_id=execution_id, ), ) raise err def send_email(task_id, job_id, execution_id, subject, to_email): job = Job.get_by_id(task_id, job_id) logger = current_app.logger.bind( operation="send_email", task_id=task_id, job_id=job_id, to_email=to_email, execution_id=execution_id, subject=subject, ) if job is None: logger.error("Failed to retrieve task or job.") return False execution = job.get_execution_by_id(execution_id) logger.info("Execution loaded successfully") smtp_host = current_app.config["SMTP_HOST"] smtp_port = current_app.config["SMTP_PORT"] smtp_from = current_app.config["SMTP_FROM"] if smtp_host is None or smtp_port is None or smtp_from is None: logger.error( "SMTP_HOST, SMTP_PORT and SMTP_FROM must be configured. Skipping sending e-mail." ) return False try: smtp_port = int(smtp_port) logger = logger.bind(smtp_host=smtp_host, smtp_port=smtp_port) logger.info("Connecting to SMTP Server...") server = smtplib.SMTP(smtp_host, smtp_port) server.set_debuglevel(0) if current_app.config.get("SMTP_USE_SSL"): logger.info("Starting TLS...") server.starttls() smtp_user = current_app.config.get("SMTP_USER") smtp_password = current_app.config.get("SMTP_PASSWORD") if smtp_user and smtp_password: logger.info( "Authenticating with SMTP...", smtp_user=smtp_user, smtp_password=smtp_password, ) server.login(smtp_user, smtp_password) from_email = current_app.config["SMTP_FROM"] task_url = url_for("task.get_task", task_id=task_id, _external=True) job_url = url_for( "task.get_job", task_id=task_id, job_id=job_id, _external=True ) job_data = dumps( execution.to_dict(include_log=True, include_error=True), indent=4, sort_keys=True, ) body = ( """ Automatic message. Please do not reply to this! Job Details: %s """ % job_data ) subj = "[Fastlane] %s" % subject msg = MIMEMultipart("alternative") msg["Subject"] = subj msg["From"] = from_email msg["To"] = to_email part1 = MIMEText(body, "plain") html_body = """

Job Details:

%s
---

[View Task Details] | [View Job Details]

---

Automatic message. Please do not reply to this!

""" % ( job_data, task_url, job_url, ) part2 = MIMEText(html_body, "html") msg.attach(part1) msg.attach(part2) logger.info("Sending email...") server.sendmail(from_email, to_email, msg.as_string()) server.quit() logger.info("Email sent successfully.") except Exception as exc: logger.error( "Sending e-mail failed with exception!", error=traceback.format_exc() ) raise exc return True def send_webhook( task_id, job_id, execution_id, method, url, headers, retries, retry_count ): app = current_app job = Job.get_by_id(task_id, job_id) logger = app.logger.bind( operation="send_webhook", task_id=task_id, job_id=job_id, execution_id=execution_id, method=method, url=url, headers=headers, retries=retries, retry_count=retry_count, ) if job is None: logger.error("Failed to retrieve task or job.") return False execution = job.get_execution_by_id(execution_id) logger.info("Execution loaded successfully") data = execution.to_dict(include_log=True, include_error=True) data = loads(dumps(data)) if "webhookDispatch" in data["metadata"]: del data["metadata"]["webhookDispatch"] data["metadata"]["custom"] = job.metadata.get("custom", {}) data["job_id"] = job_id data = dumps(data) try: dispatcher = WebhooksDispatcher() response = dispatcher.dispatch(method, url, data, headers) execution.metadata.setdefault("webhookDispatch", []) execution.metadata["webhookDispatch"].append( { "timestamp": datetime.utcnow().isoformat(), "url": url, "statusCode": response.status_code, "body": response.body, "headers": response.headers, } ) execution.save() job.save() logger.info("Webhook dispatched successfully.") except WebhooksDispatchError as err: error = traceback.format_exc() execution.metadata.setdefault("webhookDispatch", []) execution.metadata["webhookDispatch"].append( { "timestamp": datetime.utcnow().isoformat(), "url": url, "statusCode": err.status_code, "body": err.body, "headers": err.headers, "error": error, } ) execution.metadata["webhookDispatch"] = execution.metadata["webhookDispatch"][ -3: ] execution.save() job.save() logger.error("Failed to dispatch webhook.", err=error) if retry_count < retries: logger.debug("Retrying...") args = [ task_id, job_id, execution_id, method, url, headers, retries, retry_count + 1, ] factor = app.config["WEBHOOKS_EXPONENTIAL_BACKOFF_FACTOR"] min_backoff = app.config["WEBHOOKS_EXPONENTIAL_BACKOFF_MIN_MS"] / 1000.0 delta = to_unix( datetime.utcnow() + timedelta(seconds=math.pow(factor, retry_count) * min_backoff) ) current_app.webhooks_queue.enqueue_at(delta, Categories.Webhook, *args) logger.info("Webhook dispatch retry scheduled.", date=delta) return True def enqueue_missing_monitor_jobs(app): lock = app.redis.lock( "EnqueueMissingMonitorJobs", timeout=5, sleep=0.1, blocking_timeout=500, thread_local=False, ) if not lock.acquire(): app.logger.info( "Lock could not be acquired. Trying to enqueue missing monitor jobs later." ) return try: # find running/created executions executions = Job.get_unfinished_executions(app) queue = app.monitor_queue executions_to_monitor = [] for (job, execution) in executions: if "enqueued_id" in job.metadata and queue.is_scheduled( job.metadata["enqueued_id"] ): continue executions_to_monitor.append((job, execution)) if not executions_to_monitor: return current_app.logger.info( "Found executions missing monitoring. Enqueueing monitor.", executions=len(executions_to_monitor), ) # enqueue if execution not scheduled to be monitored for (job, execution) in executions_to_monitor: current_app.monitor_queue.enqueue_in( "5s", Categories.Monitor, job.task.task_id, job.job_id, execution.execution_id, ) finally: lock.release() PK!Lfastlane/worker/webhooks.py# 3rd Party from requests import Request, Session class WebhooksDispatchError(RuntimeError): def __init__(self, status_code, method, url, body, headers, error=None): super(WebhooksDispatchError, self).__init__("Webhook dispatch error") self.status_code = status_code self.method = method self.url = url self.body = body self.headers = headers self.error = error def __str__(self): if self.error is None: return ( f"The {self.method.upper()} request to {self.url} with body" f' of "{self.body[:10]}..." failed with status code of {self.status_code}.' ) error = "{}: {}".format(type(self.error).__name__, self.error) return ( f"The {self.method.upper()} request to {self.url} with body " f'of "{self.body[:10]}..." failed with exception of {error}.' ) def __repr__(self): return str(self) class Response: def __init__(self, status_code, body, headers): self.status_code = status_code self.body = body self.headers = headers class WebhooksDispatcher: def dispatch(self, method, url, body, headers, timeout=1): try: session = Session() req = Request(method, url, data=body, headers=headers) prepped = session.prepare_request(req) prepped.body = body resp = session.send(prepped, timeout=timeout, verify=False) if resp.status_code > 399: raise WebhooksDispatchError( resp.status_code, method, url, body, headers ) return Response(resp.status_code, resp.text, resp.headers) except Exception as err: if isinstance(err, WebhooksDispatchError): raise err raise WebhooksDispatchError(500, method, url, body, headers, error=err) PK!H+l.C)fastlane-2.2.1.dist-info/entry_points.txtN+I/N.,()JK,.IK1s2r3rrPK!DE"'33 fastlane-2.2.1.dist-info/LICENSEMIT License Copyright (c) 2018 Bernardo Heynemann Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HڽTUfastlane-2.2.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hfa!fastlane-2.2.1.dist-info/METADATAoO0S%j+ҨE&^_kvg`EUw9;Z%+,YCق-HL^ݻ@XI5ـ<أ 1G\wǯur6yJW]+r+mBDSQ(דn^] #(:=Us`/Ț)7F#,iCDފanDJo᱄Pq\(. _Fd遚yZ{n[Ռ6ei%(,1y)K PK!H9, fastlane-2.2.1.dist-info/RECORD}ǶH}? Tb +0 ~TS]}f>2"4:h}ht0岒ʵfKylIUіsW ;#/֕loO&D"w]ܘ9 >jAtmF7NTNn`GwIV]`꣕4+NeRbФJ\gogRޯ.ȮC)⓷&<ms:}8 J*{_T-=,I BG`E% j MOdDt_x֫*"pvld01'A=QD tVϢN0n~F2n yEf%xk$F` .4z9_ن^]@1#L3e5w,IP1h#0^vkaDa8%e&OEyJdYl^0uqE;zO(Ë-ӵ]SS04&;շƵY_8OLTl uLI`  f; 8?%WGH.?%6fܧwce/=vs݂bmc_q&cih)0+SwM)O7{ f$^OӨs!z}TP蚽XㇷնC:sH'!؜18myMEz`na~ewHSQۤ/٪z>}jlI)(M!6lQ)3f|.yQb*Ůs^v_|qFI &ON]Z9p^U{[TSp? 3cN:Wv1Ϣ