PKUN"jtclambda/__init__.py"""AWS Lambda Library""" from .exceptions import RetryException from .function import LambdaFunction from .handlers import LambdaHandler __version__ = "0.0.10" __all__ = ("LambdaFunction", "LambdaHandler", "RetryException") PKMNqtclambda/auto_functions.pyimport os from functools import lru_cache from .function import LambdaFunction __path__ = None @lru_cache(128) def __getattr__(module) -> LambdaFunction: queue = os.getenv(f"TC_{module.upper()}_QUEUE") bucket = os.getenv(f"TC_{module.upper()}_BUCKET") print(f"Looking for lambda function {module}") if not queue: raise ImportError(f"Couldn't automatically create LambdaFunction {module}") return LambdaFunction(queue, bucket) PKJkNf**tclambda/exceptions.pyclass RetryException(Exception): pass PKQGNբtclambda/extras.pyimport time from contextlib import contextmanager from dataclasses import dataclass @dataclass class Value: value: float @contextmanager def timeme(): start_time = time.monotonic() value = Value(0) try: yield value finally: value.value = time.monotonic() - start_time def sentry_init(): # pragma: no cover try: import sentry_sdk from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration except ImportError: pass else: # The Sentry DSN is set in the SENTRY_DSN environmental variable sentry_sdk.init(integrations=[AwsLambdaIntegration()]) PKNS??tclambda/function.py# https://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html from __future__ import annotations import asyncio import json import logging import os import time from dataclasses import dataclass from datetime import datetime from uuid import uuid4 import boto3 from botocore.exceptions import ClientError from .extras import timeme s3client = boto3.client("s3") sqsclient = boto3.client("sqs") TC_QUEUE = os.getenv("TC_THIS_QUEUE") TC_BUCKET = os.getenv("TC_THIS_BUCKET") class LambdaFunction: def __init__(self, queue_url=TC_QUEUE, s3_bucket=TC_BUCKET): self.queue_url = queue_url self.s3_bucket = s3_bucket def __getattr__(self, function_name): return LambdaWrapperFunction(self.queue_url, self.s3_bucket, function_name) @dataclass class Message: result_store: str message_body: str def build_message( function_name, args, kwargs, s3_bucket, force_upload=False ) -> Message: logger = logging.getLogger("tclambda.function.build_message") key = f"{function_name}/{datetime.utcnow():%Y/%m/%d/%H%M%S}/{uuid4()}.json" result_store = f"results/{key}" proxy_store = f"proxy/{key}" message_body = json.dumps( { "function": function_name, "args": args, "kwargs": kwargs, "result_store": result_store, } ) logger.info( f'Function "{function_name}", ' f'result_store: "{result_store}", ' f"message_body size: {sizeof_fmt(len(message_body))}" ) if len(message_body) > 250000 or force_upload: # current maximum is 262144 bytes logger.info("Uploading proxy for {function_name}") with timeme() as dt: s3client.put_object(Bucket=s3_bucket, Key=proxy_store, Body=message_body) logger.info(f"Uploaded proxy for {function_name} in {dt.value}s") message_body = json.dumps({"proxy": proxy_store}) return Message(result_store=result_store, message_body=message_body) class LambdaWrapperFunction: def __init__(self, queue_url, s3_bucket, function_name): self.logger = logging.getLogger("tclambda.function.LambdaFunction") self.queue_url = queue_url self.s3_bucket = s3_bucket self.function_name = function_name def __call__(self, *args, **kwargs) -> LambdaResult: message = build_message( function_name=self.function_name, args=args, kwargs=kwargs, s3_bucket=self.s3_bucket, ) sqsclient.send_message( QueueUrl=self.queue_url, MessageBody=message.message_body ) return LambdaResult(s3_bucket=self.s3_bucket, key=message.result_store) class LambdaResult: def __init__(self, s3_bucket, key): self.logger = logging.getLogger("tclambda.function.LambdaFunction") self.s3_bucket = s3_bucket self.key = key self.waited = False self._result = {} def _iter_wait(self, delay: float, max_attempts: int): if self.waited: return obj = None start_time = time.monotonic() for i in range(max_attempts): try: obj = s3client.get_object(Bucket=self.s3_bucket, Key=self.key) end_time = time.monotonic() self.logger.debug( f"Found key {self.key} on {i+1} attempts and {end_time - start_time} seconds" ) break except ClientError as e: if e.response["Error"]["Code"] == "NoSuchKey": yield i continue raise if not obj: raise TimeoutError( f"Result {self.key} not found within {delay*max_attempts} seconds" ) self._result = json.load(obj["Body"]) self.waited = True def wait(self, delay: int = 5, max_attempts=20): for _ in self._iter_wait(delay, max_attempts): time.sleep(delay) async def async_wait(self, delay=5, max_attempts: int = 20): for _ in self._iter_wait(delay, max_attempts): await asyncio.sleep(delay) def result(self, delay: int = 5, max_attempts: int = 20): self.wait(delay, max_attempts) try: return self._result["result"] except KeyError: raise Exception(self._result["exception"]) from None async def async_result(self, delay: int = 5, max_attempts: int = 20): await self.async_wait(delay, max_attempts) try: return self._result["result"] except KeyError: raise Exception(self._result["exception"]) from None def sizeof_fmt(num, suffix="B"): for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, "Yi", suffix) PKgNNtclambda/handlers.py# https://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html import asyncio import json import logging import os import traceback from io import StringIO import boto3 from .exceptions import RetryException from .extras import sentry_init sentry_init() logging.basicConfig(level=logging.INFO) s3client = boto3.client("s3") cloudwatch = boto3.client("cloudwatch") TC_THIS_BUCKET = os.getenv("TC_THIS_BUCKET") class LambdaHandler: def __init__(self, json_encoder_class=json.JSONEncoder): self.logger = logging.getLogger("tclambda.LambdaHandler") self.functions = {"ping": lambda: "pong"} self.json_encoder_class = json_encoder_class def register(self, name=None): def wrapper(func): nonlocal name if name is None: name = func.__name__ self.logger.debug(f"Registering function {func} with name {name}") self.functions[name] = func return func return wrapper def __call__(self, event, context): self.context = context self.logger.info(event) if "Records" in event: return asyncio.run(self.handle_sqs_event(event, context)) elif "function" in event or "proxy" in event: return asyncio.run(self.handle_message(event, context)) async def handle_sqs_event(self, event, context): handlers = [] for record in event["Records"]: body = record["body"] try: message = json.loads(body) except json.JSONDecodeError: self.logger.exception(f'Couldn\'t decode body "{body}"') else: future = asyncio.ensure_future(self.handle_message(message, context)) handlers.append(future) await asyncio.gather(*handlers) async def handle_message(self, message, context): if "proxy" in message: obj = s3client.get_object(Bucket=TC_THIS_BUCKET, Key=message["proxy"]) message = json.load(obj["Body"]) result_body = {} result_store = message.get("result_store") try: func_name = message.get("function") if not func_name: self.logger.error(f'Message does not contain key "function" {message}') raise TypeError(f'Message does not contain key "function" {message}') func = self.functions.get(func_name) if not func: available_functions = sorted(self.functions.keys()) self.logger.error( f"Function {func_name} is not a registered function in {available_functions}" ) raise TypeError(f"Function {func_name} does not exist") args = message.get("args", ()) kwargs = message.get("kwargs", {}) if asyncio.iscoroutinefunction(func): result_body["result"] = await func(*args, **kwargs) else: result_body["result"] = func(*args, **kwargs) except RetryException: raise except Exception as e: self.logger.exception(f"An exception occured while executing {message}") s = StringIO() traceback.print_exc(file=s) result_body["exception"] = repr(e) result_body["traceback"] = s.getvalue() finally: cloudwatch.put_metric_data( Namespace="tclambda", MetricData=[ { "MetricName": "Count", "Value": 1, "Unit": "Count", "Dimensions": [ {"Name": "TcFunctionName", "Value": str(func_name)}, { "Name": "LambdaFunctionName", "Value": context.function_name, }, ], }, { "MetricName": "RemainingMilliseconds", "Value": context.get_remaining_time_in_millis(), "Unit": "Milliseconds", "Dimensions": [ {"Name": "TcFunctionName", "Value": str(func_name)}, { "Name": "LambdaFunctionName", "Value": context.function_name, }, ], }, ], ) if result_store: self.store_result(result_store, result_body) def store_result(self, key, result): if TC_THIS_BUCKET: try: result_body = json.dumps(result, cls=self.json_encoder_class) except TypeError as e: self.logger.exception(f"Couldn't encode result {result}") s = StringIO() traceback.print_exc(file=s) result_body = json.dumps( {"exception": repr(e), "traceback": s.getvalue()}, cls=self.json_encoder_class, ) s3client.put_object(Bucket=TC_THIS_BUCKET, Key=key, Body=result_body) return result PK IN=..!tclambda-0.0.10.dist-info/LICENSEMIT License Copyright (c) 2019 Trustcruit AB Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HPOtclambda-0.0.10.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!Hk"tclambda-0.0.10.dist-info/METADATAUmo8_1_("=.w-Q [ LbIN)a^[}9̌gySCoLi..t6iƺ`fޅ{/HPdU.60 kFd̑4A1Rw[Up#*TMk}#k_OY TÈeoWQv!VT W<Lsm1߿zdt4Xs%rri+dLJwt~\ᝁI5[#/ĔAER-I{]z}VJ36߹rfKf {4K5;:zJ+a~GףR .A)9撦oB%rjPm>IR+RjĒ/CٻrRTHV!'g?L&a 'cg<VLpJ1jKc7C_p'41Q\ui+8AUs=w19LplmPK!HΤW0 tclambda-0.0.10.dist-info/RECORD}I@}$IHX!bp`C̓ _/sKqSߟVs ?eO+)L/N}ʵeRs૭ CPO+himqqYD#$5FC_L ͼJvyQy_'Kb/kVknsd=X,GݘkasX(I &x