PKUMYX[99mod_ngarn/__init__.py"""Simple async worker""" __version__ = "2.0" import click import asyncio from .worker import JobRunner @click.group() def script(): pass @click.command() def run(): job_runner = JobRunner() loop = asyncio.get_event_loop() loop.run_until_complete(job_runner.run()) script.add_command(run) PKUM  mod_ngarn/connection.pyimport json import os import asyncpg async def get_connection(): PGDBNAME = os.getenv('PGDBNAME') PGHOST = os.getenv('PGHOST') PGPASSWORD = os.getenv('PGPASSWORD') PGUSER = os.getenv('PGUSER') cnx = await asyncpg.connect(user=PGUSER, password=PGPASSWORD, database=PGDBNAME, host=PGHOST) await cnx.set_type_codec('jsonb', encoder=json.dumps, decoder=json.loads, schema='pg_catalog') await cnx.set_type_codec('json', encoder=json.dumps, decoder=json.loads, schema='pg_catalog') return cnx PKUM5M388mod_ngarn/utils.pyfrom typing import Callable class ImportNotFoundException(Exception): pass async def import_fn(fn_name) -> Callable: access_path = fn_name.split(".") module = None try: for index in range(1, len(access_path)): try: # import top level module module_name = ".".join(access_path[:-index]) module = __import__(module_name) except ImportError: continue else: for step in access_path[1:-1]: # walk down it module = getattr(module, step) break if module: return getattr(module, access_path[-1]) else: return globals()["__builtins__"][fn_name] except KeyError as e: raise ImportNotFoundException(e) PKUMKiD;W W mod_ngarn/worker.pyimport asyncio import functools import logging import sys import time import traceback from datetime import datetime, timedelta, timezone from decimal import Decimal from typing import Any, Callable, Dict, List import asyncpg from mod_ngarn.connection import get_connection from dataclasses import dataclass, field from .utils import import_fn logging.basicConfig( stream=sys.stdout, level=logging.INFO, format="[%(asctime)s] - %(name)s - %(levelname)s - %(message)s", ) log = logging.getLogger("mod_ngarn") @dataclass class Job: cnx: asyncpg.Connection id: str fn_name: str priority: int args: List[Any] = field(default_factory=list) kwargs: Dict = field(default_factory=dict) async def execute(self) -> Any: """ Execute the transaction """ try: start_time = time.time() func = await import_fn(self.fn_name) if asyncio.iscoroutinefunction(func): result = await func(*self.args, **self.kwargs) else: loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, functools.partial(func, *self.args, **self.kwargs) ) processing_time = str(Decimal(str(time.time() - start_time)).quantize(Decimal(".001"))) await self.success(result, processing_time) return result except Exception as e: log.error("Error#{}, {}".format(self.id, e.__repr__())) await self.failed(e.__repr__()) async def success(self, result: Dict, processing_time: Decimal) -> str: """ Success execution handler """ return await self.cnx.execute( "UPDATE modngarn_job SET result=$1, executed=NOW(), processed_time=$2 WHERE id=$3", result, processing_time, self.id, ) async def failed(self, error: str) -> str: """ Failed execution handler """ delay = 2 ** self.priority next_schedule = datetime.now(timezone.utc) + timedelta(seconds=delay) log.error( 'Rescheduled, delay for {} seconds ({}) '.format(delay, next_schedule.isoformat()) ) return await self.cnx.execute( "UPDATE modngarn_job SET priority=priority+1, reason=$2, scheduled=$3 WHERE id=$1", self.id, error, next_schedule, ) @dataclass class JobRunner: async def fetch_job(self, cnx: asyncpg.Connection): result = await cnx.fetchrow( """ SELECT id, fn_name, args, kwargs, priority FROM modngarn_job WHERE executed IS NULL AND (scheduled IS NULL OR scheduled < NOW()) AND canceled IS NULL ORDER BY priority FOR UPDATE SKIP LOCKED LIMIT 1 """ ) if result: return Job( cnx, result["id"], result["fn_name"], result["priority"], result["args"], result["kwargs"], ) else: log.info('404 Job not found') async def run(self): cnx = await get_connection() async with cnx.transaction(): job = await self.fetch_job(cnx) if job: await job.execute() await cnx.close() PKUM6mod_ngarn/Schema/migrations.txtenable-modules worker migrate PKUM&TT,mod_ngarn/Schema/worker/001-initial.blue.sqlBEGIN; CREATE FUNCTION url_safe(str text) RETURNS boolean AS $body$ BEGIN --- Disallow back slash, forward slash, fraction slash (2044), --- division slash (2215), reverse solidus operator (29f5), --- big solidus (29f8), big reverse solidus (29f9), --- small reverse solidus (fe68), fullwidth solidus (ff0f), --- full width reverse solidus (ff3c) RETURN str !~ '\\|/|\u2044|\u2215|\u29f5|\u29f8|\u29f9|\ufe68|\uff0f|\uff3c'; END $body$ LANGUAGE plpgsql; CREATE TABLE modngarn_job ( id TEXT NOT NULL CHECK (url_safe(id)), fn_name TEXT NOT NULL, args JSONB DEFAULT '[]', kwargs JSONB DEFAULT '{}', priority INTEGER DEFAULT 0, created TIMESTAMP WITH TIME ZONE DEFAULT NOW(), scheduled TIMESTAMP WITH TIME ZONE, executed TIMESTAMP WITH TIME ZONE, canceled TIMESTAMP WITH TIME ZONE, result JSONB, reason TEXT, processed_time TEXT, PRIMARY KEY (id) ); CREATE INDEX idx_kwargs ON modngarn_job USING gin (kwargs); CREATE INDEX idx_pending_jobs ON modngarn_job (executed) WHERE executed IS NULL; COMMIT; PK!H1'.(mod_ngarn-2.0.dist-info/entry_points.txtN+I/N.,()OKO,ʳ,+$PK;EM 55mod_ngarn-2.0.dist-info/LICENSEMIT License Copyright (c) 2018 Proteus Technologies Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H>*RQmod_ngarn-2.0.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,rzd&Y)r$[)T&UrPK!Hړ- mod_ngarn-2.0.dist-info/METADATATQo6~ׯ8$}EUggM"{hD${KvlM4O{xw8㎕1+L%EsZUȊ8Ok[f f{e6DD kMni83Nj:OA\Z 8u};W+ӟm6]2$)@O?% -ɶDu40B;dO&J:.Yn5rMe4ib-8Vс$E~c?.{Cũ.Ǡ4=78GnLLp\F4}"tapuGzk8)jӝP$!ޜagIU(_-=ynv/HE)뾗[U'})=Qx'J8Tc(:T9?uݩ">ZhZE/yEXk,f| W54A6yZ݃[ ,?Q !cM|`B8Att%? iQ8-CB>9?o<鋁 Q-8ΉRkQNHw7=@!k`8<><qh.; sd-R‹xv-O➏2꾸@䄼8rY#P$ :W˞~ŗ({(Q)1_𞹼fq"" SU}J l (O_uz/+_n,B;E7{az;efw>em#lm3I|6d>& Wے+Tb0;wi$}RtOTy(Z|]-_7#7e/Ek>ؑ$ '+8UEN[TvWU:a{5u=ODlqՁ,q6-;`͛F8 :t f5}1AYA1vU`kjw*`'Zd;,5q x8 LrDOLfaW0sb\(I en2U-{`uǥʷfN`>Ԃ'k6pq;4ƕAvG;p^qPo2CzBy^*]GDJG ߚ[3p`񈊠ge8l[Tݜ+X OO>?ݝq/?PKUMYX[99mod_ngarn/__init__.pyPKUM  lmod_ngarn/connection.pyPKUM5M388mod_ngarn/utils.pyPKUMKiD;W W mod_ngarn/worker.pyPKUM6mod_ngarn/Schema/migrations.txtPKUM&TT,mod_ngarn/Schema/worker/001-initial.blue.sqlPK!H1'.(mod_ngarn-2.0.dist-info/entry_points.txtPK;EM 55mod_ngarn-2.0.dist-info/LICENSEPK!H>*RQsmod_ngarn-2.0.dist-info/WHEELPK!Hړ- mod_ngarn-2.0.dist-info/METADATAPK!H ?+rk"mod_ngarn-2.0.dist-info/RECORDPK 8$