PKHN $wtclambda/__init__.py"""AWS Lambda Library""" from .function import LambdaFunction from .handlers import LambdaHandler __version__ = "0.0.1" __all__ = ("LambdaFunction", "LambdaHandler") PK4INU1rtclambda/auto_functions.pyimport os from .function import LambdaFunction def __getattr__(module) -> LambdaFunction: queue = os.getenv(f"TC_{module.upper()}_QUEUE") bucket = os.getenv(f"TC_{module.upper()}_BUCKET") print(f"Looking for lambda function {module}") if not queue: raise AttributeError(f"Couldn't automatically create LambdaFunction {module}") return LambdaFunction(queue, bucket) PK4IN]tclambda/function.py# https://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html import asyncio import json import logging import os import time from datetime import datetime from uuid import uuid4 import boto3 from botocore.exceptions import ClientError s3client = boto3.client("s3") sqsclient = boto3.client("sqs") TC_QUEUE = os.getenv("TC_THIS_QUEUE") TC_BUCKET = os.getenv("TC_THIS_BUCKET") class LambdaFunction: def __init__(self, queue_url=TC_QUEUE, s3_bucket=TC_BUCKET): self.logger = logging.getLogger("tclambda.function.LambdaFunction") self.queue_url = queue_url self.s3_bucket = s3_bucket def __getattr__(self, function_name): def wrapper(*args, **kwargs) -> LambdaResult: key = f"results/{function_name}/{datetime.utcnow():%Y/%m/%d/%H%M%S}/{uuid4()}.json" message_body = json.dumps( { "function": function_name, "args": args, "kwargs": kwargs, "result_store": key, } ) self.logger.debug( f'Enqueing function "{function_name}", ' f'result_store: "{key}", ' f"message_body size: {sizeof_fmt(len(message_body))}" ) sqsclient.send_message(QueueUrl=self.queue_url, MessageBody=message_body) return LambdaResult(s3_bucket=self.s3_bucket, key=key) return wrapper class LambdaResult: def __init__(self, s3_bucket, key): self.s3_bucket = s3_bucket self.key = key self.waited = False self._result = {} def _iter_wait(self, delay: float, max_attempts: int): if self.waited: return obj = None start_time = time.monotonic() for i in range(max_attempts): try: obj = s3client.get_object(Bucket=self.s3_bucket, Key=self.key) end_time = time.monotonic() self.logger.debug( f"Found key {self.key} on {i+1} attempts and {end_time - start_time} seconds" ) break except ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": yield i continue raise if not obj: raise TimeoutError( f"Result {self.key} not found within {delay*max_attempts} seconds" ) self._result = json.load(obj["Body"]) self.waited = True def wait(self, delay: int = 5, max_attempts=20): for _ in self._iter_wait(delay, max_attempts): time.sleep(delay) async def async_wait(self, delay=5, max_attempts: int = 20): for _ in self._iter_wait(delay, max_attempts): await asyncio.sleep(delay) def result(self, delay: int = 5, max_attempts: int = 20): self.wait(delay, max_attempts) if self._result.get("result"): return self._result["result"] raise Exception(self._result["exception"]) async def async_result(self, delay: int = 5, max_attempts: int = 20): await self.async_wait(delay, max_attempts) if self._result.get("result"): return self._result["result"] raise Exception(self._result["exception"]) def sizeof_fmt(num, suffix="B"): for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, "Yi", suffix) PK4IN>6 tclambda/handlers.py# https://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html import asyncio import json import logging import os import traceback from io import StringIO import boto3 logging.basicConfig(level=logging.INFO) logger = logging.getLogger("app") logger.setLevel(logging.INFO) s3client = boto3.client("s3") TC_THIS_BUCKET = os.getenv("TC_THIS_BUCKET") class LambdaHandler: def __init__(self): self.logger = logging.getLogger("tclambda.LambdaHandler") self.functions = {"ping": lambda: "pong"} def register(self, name=None): def wrapper(func): nonlocal name if name is None: name = func.__name__ self.logger.debug(f"Registering function {func} with name {name}") self.functions[name] = func return func return wrapper def __call__(self, event, context): self.context = context self.logger.warning(event) if "Records" in event: return asyncio.run(self.handle_sqs_event(event, context)) elif "function" in event: return asyncio.run(self.handle_message(event, context)) async def handle_sqs_event(self, event, context): handlers = [] for record in event["Records"]: body = record["body"] try: message = json.loads(body) except json.JSONDecodeError: logger.exception(f'Couldn\'t decode body "{body}"') else: future = asyncio.ensure_future(self.handle_message(message, context)) handlers.append(future) await asyncio.gather(*handlers) async def handle_message(self, message, context): func_name = message.get("function") if not func_name: logger.error(f'Message does not contain key "function" {message}') return func = self.functions.get(func_name) if not func: available_functions = sorted(self.functions.keys()) self.logger.error( f"Function {func_name} is not a registered function in {available_functions}" ) return args = message.get("args", ()) kwargs = message.get("kwargs", {}) result_store = message.get("result_store") result_body = {} try: if asyncio.iscoroutinefunction(func): result_body["result"] = await func(*args, **kwargs) else: result_body["result"] = func(*args, **kwargs) except Exception as e: self.logger.exception("An exception occured while executing {func}") s = StringIO() traceback.print_exc(file=s) result_body["exception"] = repr(e) result_body["traceback"] = s.getvalue() if result_store: self.store_result(result_store, result_body) def store_result(self, key, result): if TC_THIS_BUCKET: try: result_body = json.dumps(result) except TypeError as e: self.logger.exception(f"Couldn't encode result {result}") s = StringIO() traceback.print_exc(file=s) result_body = json.dumps( {"exception": repr(e), "traceback": s.getvalue()} ) s3client.put_object(Bucket=TC_THIS_BUCKET, Key=key, Body=result_body) return result PK IN=.. tclambda-0.0.1.dist-info/LICENSEMIT License Copyright (c) 2019 Trustcruit AB Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HMuSatclambda-0.0.1.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UD"PK!H!tclambda-0.0.1.dist-info/METADATAM0DzIE"*煮ؤKI &Š8LE3j1ƍN(d0(~.KhKB6P~*PG&N)p:U4=p^2dFb4V\Jޱ8Z RBOwOkW7PK!Hvr2atclambda-0.0.1.dist-info/RECORDu˖c@} ibByt4$9|Lg璢φӴE-IøRSqGW`5T8m3MGXecL^CbEV6fTIpηG\qplJɱm (¯cOVȋ""0^zn_gQҒOGr^) OX1. 9]&?`ZZF-CyCG^Pc4s`l'B[ .8ȁyаlհ^U s4je$5[:;Xʹ ,dV窏er=z5h"F]cl kvg'ҷni.Jq:EPKHN $wtclambda/__init__.pyPK4INU1rtclambda/auto_functions.pyPK4IN]tclambda/function.pyPK4IN>6 tclambda/handlers.pyPK IN=.. tclambda-0.0.1.dist-info/LICENSEPK!HMuSa"tclambda-0.0.1.dist-info/WHEELPK!H!#tclambda-0.0.1.dist-info/METADATAPK!Hvr2a`$tclambda-0.0.1.dist-info/RECORDPKD!&