PKPO Q mod_ngarn/__init__.py"""Simple async worker""" __version__ = "3.5" import asyncio import os import sys import click from . import utils from .worker import JobRunner global script global run global create_table global delete_job @click.group() def script(): pass @click.command() @click.option( "--queue-table", help='Queue table name (Default: os.getenv("DBTABLE", "public.modngarn_job"))', default=os.getenv("DBTABLE", "public.modngarn_job"), ) @click.option("--limit", default=300, help="Limit jobs (Default: 300)") @click.option( "--max-delay", type=float, help="Max delay for failed jobs (seconds) (Default: None)", ) def run(queue_table, limit, max_delay): """Run mod-ngarn job""" queue_table_schema, queue_table_name = ( utils.sql_table_name(queue_table).replace('"', "").split(".") ) job_runner = JobRunner() loop = asyncio.get_event_loop() if max_delay: max_delay = float(max_delay) loop.run_until_complete( job_runner.run(queue_table_schema, queue_table_name, limit, max_delay) ) @click.command() @click.option( "--queue-table", help='Queue table name (Default: os.getenv("DBTABLE", "public.modngarn_job"))', default=os.getenv("DBTABLE", "public.modngarn_job"), ) def create_table(queue_table): """Create mod-ngarn queue table""" queue_table_schema, queue_table_name = ( utils.sql_table_name(queue_table).replace('"', "").split(".") ) asyncio.run(utils.create_table(queue_table_schema, queue_table_name)) @click.command() @click.option( "--queue-table", help='Queue table name (Default: os.getenv("DBTABLE", "public.modngarn_job"))', default=os.getenv("DBTABLE", "public.modngarn_job"), ) def wait_for_notify(queue_table): """Wait and listening for NOTIFY""" queue_table_schema, queue_table_name = ( utils.sql_table_name(queue_table).replace('"', "").split(".") ) loop = asyncio.get_event_loop() is_pending_job_exists = loop.run_until_complete( utils.is_pending_job_exists(queue_table_schema, queue_table_name)) if not is_pending_job_exists: notification_queue = asyncio.Queue(loop=loop) loop.create_task( utils.wait_for_notify(queue_table_schema, queue_table_name, notification_queue) ) loop.run_until_complete(utils.shutdown(notification_queue)) loop.run_forever() @click.command() @click.option( "--queue-table", help='Queue table name (Default: os.getenv("DBTABLE", "public.modngarn_job"))', default=os.getenv("DBTABLE", "public.modngarn_job"), ) def delete_job(queue_table): """Delete executed task""" queue_table_schema, queue_table_name = ( utils.sql_table_name(queue_table).replace('"', "").split(".") ) asyncio.run(utils.delete_executed_job(queue_table_schema, queue_table_name)) script.add_command(run) script.add_command(create_table) script.add_command(wait_for_notify) script.add_command(delete_job) PKsOuL..mod_ngarn/api.pyimport os from datetime import datetime from typing import Callable, Optional, Union import asyncpg from .utils import sql_table_name, get_fn_name async def add_job( cnx: asyncpg.Connection, queue_table: str, job_id: str, func: Union[str, Callable], schedule_time: Optional[datetime] = None, priority: int = 0, args: list = [], kwargs: dict = {}, ) -> asyncpg.Record: fn_name = await get_fn_name(func) return await cnx.fetchrow( """ INSERT INTO {queue_table} (id, fn_name, priority, scheduled, args, kwargs) VALUES ($1, $2, $3, $4, $5, $6) RETURNING *; """.format( queue_table=sql_table_name(queue_table) ), job_id, fn_name, priority, schedule_time, args, kwargs, ) PK PO umod_ngarn/connection.pyimport json import os import asyncpg async def get_connection(): PGDBNAME = os.getenv("PGDBNAME") PGHOST = os.getenv("PGHOST") PGPASSWORD = os.getenv("PGPASSWORD") PGUSER = os.getenv("PGUSER") cnx = await asyncpg.connect( user=PGUSER, password=PGPASSWORD, database=PGDBNAME, host=PGHOST ) await cnx.set_type_codec( "jsonb", encoder=json.dumps, decoder=json.loads, schema="pg_catalog" ) await cnx.set_type_codec( "json", encoder=json.dumps, decoder=json.loads, schema="pg_catalog" ) return cnx class DBConnection: async def __aenter__(self): PGDBNAME = os.getenv("PGDBNAME") PGHOST = os.getenv("PGHOST") PGPASSWORD = os.getenv("PGPASSWORD") PGUSER = os.getenv("PGUSER") self.cnx = await asyncpg.connect( user=PGUSER, password=PGPASSWORD, database=PGDBNAME, host=PGHOST ) await self.cnx.set_type_codec( "jsonb", encoder=json.dumps, decoder=json.loads, schema="pg_catalog" ) await self.cnx.set_type_codec( "json", encoder=json.dumps, decoder=json.loads, schema="pg_catalog" ) return self.cnx async def __aexit__(self, exc_type, exc, tb): await self.cnx.close() PK PO̮F[mod_ngarn/utils.pyimport asyncio import asyncpg import os import re import sys from datetime import datetime, timedelta, timezone from inspect import getmembers, getmodule, ismethod from typing import Callable, Union import uuid from asyncpg.connection import Connection from .connection import DBConnection, get_connection class ImportNotFoundException(Exception): pass class ModuleNotfoundException(Exception): pass def sql_table_name(queue_table: str) -> str: quote_table_name = [f'"{x}"' for x in queue_table.replace('"', "").split(".")] table_name = ( ['"public"'] + quote_table_name if len(quote_table_name) == 1 else quote_table_name ) return (".").join(table_name) async def get_fn_name(func: Union[str, Callable]) -> str: try: if isinstance(func, str): return func if ismethod(func): module_name = get_fn_name(dict(getmembers(func))["__self__"]) else: module_name = getmodule(func).__name__ name = func.__name__ return ".".join([module_name, name]) except AttributeError as e: raise ModuleNotfoundException(e) async def import_fn(fn_name) -> Callable: access_path = fn_name.split(".") module = None try: for index in range(1, len(access_path)): try: # import top level module module_name = ".".join(access_path[:-index]) module = __import__(module_name) except ImportError: continue else: for step in access_path[1:-1]: # walk down it module = getattr(module, step) break if module: return getattr(module, access_path[-1]) else: return globals()["__builtins__"][fn_name] except KeyError as e: raise ImportNotFoundException(e) async def is_migration_executed( cnx: asyncpg.connection, migrate_file: str, queue_table_schema: str, queue_table_name: str, ) -> bool: migration_table = await cnx.fetchval( """SELECT EXISTS (SELECT 1 FROM pg_tables WHERE schemaname || '.' || tablename = 'public.mod_ngarn_migration'); """ ) if not migration_table: return False return await cnx.fetchval( """SELECT EXISTS (SELECT 1 FROM public.mod_ngarn_migration WHERE migrate_file = $1 AND queue_table = $2); """, migrate_file, f"{queue_table_schema}.{queue_table_name}", ) async def migrate( cnx: asyncpg.connection, migrate_file: str, queue_table_schema: str, queue_table_name: str, ) -> None: with open(migrate_file) as f: sql = f.read() sql = sql.replace("{queue_table_schema}", queue_table_schema) sql = sql.replace("{queue_table_name}", queue_table_name) await cnx.execute(sql) await cnx.execute( """ INSERT INTO public.mod_ngarn_migration(migrate_file, queue_table) VALUES ($1, $2);""", os.path.basename(migrate_file), f"{queue_table_schema}.{queue_table_name}", ) async def create_table(queue_table_schema: str, queue_table_name: str) -> None: print(f"/* Creating table {queue_table_schema}.{queue_table_name}... */") async with DBConnection() as cnx: dir_path = os.path.realpath(__file__) schma_path = os.path.join(os.path.dirname(dir_path), "schema") migrate_files = [ (os.path.join(schma_path, filepath)) for filepath in os.listdir(schma_path) if filepath.endswith(".sql") ] migrate_files.sort() async with cnx.transaction(): for migrate_file in migrate_files: if not await is_migration_executed( cnx, os.path.basename(migrate_file), queue_table_schema, queue_table_name, ): await migrate( cnx, migrate_file, queue_table_schema, queue_table_name ) print(f"/* Done */") async def wait_for_notify( queue_table_schema: str, queue_table_name: str, q: asyncio.Queue ): """ Wait for notification and put channel to the Queue """ def notified(cnx: Connection, pid: int, channel: str, payload: str): asyncio.gather(cnx.close(), q.put(channel)) cnx = await get_connection() await cnx.add_listener(f"{queue_table_schema}_{queue_table_name}", notified) async def shutdown(q: asyncio.Queue): """ Gracefully shutdown when something put to the Queue """ await q.get() sys.exit() async def delete_executed_job( queue_table_schema: str, queue_table_name: str, repeat: bool = False, scheduled_day: int = 1, keep_period_day: int = 0, batch_size: int = 0, ) -> str: async with DBConnection() as cnx: async with cnx.transaction(): select_executed_job_sql = """ SELECT id FROM {queue_table} WHERE executed IS NOT NULL AND executed < NOW() - INTERVAL '{keep_period_day} days' ORDER BY executed """.format( queue_table=f'"{queue_table_schema}"."{queue_table_name}"', keep_period_day=keep_period_day, ) delete_sql = """DELETE FROM {queue_table} WHERE id IN ({select_sql});""" executed_job = 0 if batch_size: select_executed_job_sql = ( select_executed_job_sql + f" LIMIT {batch_size}" ) executed_job = await cnx.fetchval( """ SELECT COUNT(*) - $1 FROM "{}"."{}" WHERE executed IS NOT NULL AND executed < NOW() - INTERVAL '{} days' """.format( queue_table_schema, queue_table_name, keep_period_day ), batch_size, ) deleted = await cnx.execute( delete_sql.format( queue_table=f'"{queue_table_schema}"."{queue_table_name}"', select_sql=select_executed_job_sql, ) ) kwargs = { "repeat": repeat, "scheduled_day": scheduled_day, "keep_period_day": keep_period_day, "batch_size": batch_size, } if executed_job > 0: await cnx.execute( """INSERT INTO "{}"."{}" (id, fn_name, args, kwargs) VALUES ($1, 'mod_ngarn.utils.delete_executed_job', $2, $3) """.format( queue_table_schema, queue_table_name ), str(uuid.uuid4()), [queue_table_schema, queue_table_name], kwargs, ) elif repeat: next_scheduled = datetime.utcnow().replace( tzinfo=timezone.utc ) + timedelta(days=scheduled_day) await cnx.execute( """INSERT INTO "{}"."{}" (id, fn_name, args, kwargs, scheduled) VALUES ($1, 'mod_ngarn.utils.delete_executed_job', $2, $3, $4) """.format( queue_table_schema, queue_table_name ), str(uuid.uuid4()), [queue_table_schema, queue_table_name], kwargs, next_scheduled, ) return deleted async def is_pending_job_exists(queue_table_schema: str, queue_table_name: str) -> bool: async with DBConnection() as cnx: return await cnx.fetchval( f""" SELECT EXISTS( SELECT 1 FROM "{queue_table_schema}"."{queue_table_name}" WHERE executed IS NULL AND (scheduled IS NULL OR scheduled < NOW()) AND canceled IS NULL ) """) PK POXnmod_ngarn/worker.pyimport asyncio import functools import logging import math import os import sys import time import traceback from datetime import datetime, timedelta, timezone from decimal import Decimal from typing import Any, Callable, Dict, List import asyncpg from dataclasses import dataclass, field from .connection import get_connection from .utils import import_fn, sql_table_name logging.basicConfig( stream=sys.stdout, level=logging.INFO, format="[%(asctime)s] - %(name)s - %(levelname)s - %(message)s", ) log = logging.getLogger("mod_ngarn") @dataclass class Job: cnx: asyncpg.Connection table_schema: str table_name: str id: str fn_name: str priority: int args: List[Any] = field(default_factory=list) kwargs: Dict = field(default_factory=dict) max_delay: float = field(default=None) async def execute(self) -> Any: """ Execute the transaction """ start_time = time.time() try: func = await import_fn(self.fn_name) if asyncio.iscoroutinefunction(func): result = await func(*self.args, **self.kwargs) else: loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, functools.partial(func, *self.args, **self.kwargs) ) processing_time = str( Decimal(str(time.time() - start_time)).quantize(Decimal(".001")) ) await self.success(result, processing_time) return result except Exception as e: stack_trace = traceback.format_exc() error_msg = "{}\n{}".format(e.__repr__(), stack_trace) log.error("Error#{}, {}".format(self.id, error_msg)) processing_time = str( Decimal(str(time.time() - start_time)).quantize(Decimal(".001")) ) await self.failed(error_msg, processing_time) async def success(self, result: Dict, processing_time: Decimal) -> str: """ Success execution handler """ return await self.cnx.execute( f'UPDATE "{self.table_schema}"."{self.table_name}" SET result=$1, executed=NOW(), processed_time=$2, reason=NULL WHERE id=$3', result, processing_time, self.id, ) async def failed(self, error: str, processing_time: Decimal) -> str: """ Failed execution handler """ delay = await self.delay() next_schedule = datetime.now(timezone.utc) + timedelta(seconds=delay) log.error( "Rescheduled, delay for {} seconds ({}) ".format( delay, next_schedule.isoformat() ) ) await self.cnx.execute( f'INSERT INTO "{self.table_schema}"."{self.table_name}_error" (id, fn_name, args, kwargs, message, processed_time) VALUES ($1, $2, $3, $4, $5, $6)', self.id, self.fn_name, self.args, self.kwargs, error, processing_time, ) return await self.cnx.execute( f'UPDATE "{self.table_schema}"."{self.table_name}" SET priority=priority+1, reason=$2, scheduled=$3 WHERE id=$1', self.id, error, next_schedule, ) async def delay(self): # max int > e^21 and max int < e^22 priority = min(self.priority, 21) if self.max_delay: priority = min(priority, math.log(self.max_delay)) return math.exp(priority) @dataclass class JobRunner: async def fetch_job( self, cnx: asyncpg.Connection, queue_table_schema: str, queue_table_name: str, max_delay: float, ): result = await cnx.fetchrow( f"""SELECT id, fn_name, args, kwargs, priority FROM "{queue_table_schema}"."{queue_table_name}" WHERE executed IS NULL AND (scheduled IS NULL OR scheduled < NOW()) AND canceled IS NULL ORDER BY priority FOR UPDATE SKIP LOCKED LIMIT 1 """ ) if result: return Job( cnx, queue_table_schema, queue_table_name, result["id"], result["fn_name"], result["priority"], result["args"], result["kwargs"], max_delay=max_delay, ) async def run( self, queue_table_schema: str, queue_table_name: str, limit: int, max_delay: int ): cnx = await get_connection() for job_number in range(1, limit + 1): # We can reduce isolation to Read Committed # because we are using SKIP LOCK FOR UPDATE async with cnx.transaction(isolation="read_committed"): job = await self.fetch_job( cnx, queue_table_schema, queue_table_name, max_delay ) if job: log.info(f"Executing#{job_number}: \t{job.id}") result = await job.execute() log.info(f"Executed#{job_number}: \t{result}") else: break await cnx.close() PK PO#==mod_ngarn/schema/0001_init.sqlCREATE TABLE IF NOT EXISTS public.mod_ngarn_migration ( migrate_file TEXT NOT NULL, queue_table TEXT NOT NULL, PRIMARY KEY (migrate_file, queue_table) ); /* Job table */ CREATE TABLE IF NOT EXISTS "{queue_table_schema}"."{queue_table_name}" ( id TEXT NOT NULL CHECK (id !~ '\\|/|\u2044|\u2215|\u29f5|\u29f8|\u29f9|\ufe68|\uff0f|\uff3c'), fn_name TEXT NOT NULL, args JSON DEFAULT '[]', kwargs JSON DEFAULT '{}', priority INTEGER DEFAULT 0, created TIMESTAMP WITH TIME ZONE DEFAULT NOW(), scheduled TIMESTAMP WITH TIME ZONE, executed TIMESTAMP WITH TIME ZONE, canceled TIMESTAMP WITH TIME ZONE, result JSON, reason TEXT, processed_time TEXT, PRIMARY KEY (id) ); CREATE INDEX IF NOT EXISTS "{queue_table_schema}_{queue_table_name}_executed_idx" ON "{queue_table_schema}"."{queue_table_name}" (executed); CREATE OR REPLACE FUNCTION "{queue_table_schema}_{queue_table_name}_notify_job"() RETURNS TRIGGER LANGUAGE plpgsql AS $$ BEGIN NOTIFY "{queue_table_schema}_{queue_table_name}"; RETURN NEW; END; $$; DROP TRIGGER IF EXISTS "{queue_table_schema}_{queue_table_name}_notify_job_inserted" ON "{queue_table_schema}"."{queue_table_name}"; CREATE TRIGGER "{queue_table_schema}_{queue_table_name}_notify_job_inserted" AFTER INSERT ON "{queue_table_schema}"."{queue_table_name}" FOR EACH ROW EXECUTE PROCEDURE "{queue_table_schema}_{queue_table_name}_notify_job"(); /* Error log table */ CREATE TABLE IF NOT EXISTS "{queue_table_schema}"."{queue_table_name}_error" ( id TEXT NOT NULL CHECK (id !~ '\\|/|\u2044|\u2215|\u29f5|\u29f8|\u29f9|\ufe68|\uff0f|\uff3c'), fn_name TEXT NOT NULL, args JSON DEFAULT '[]', kwargs JSON DEFAULT '{}', message TEXT NOT NULL, posted TIMESTAMP WITH TIME ZONE DEFAULT NOW(), processed_time TEXT, PRIMARY KEY (id, posted) ); PK!H1'.(mod_ngarn-3.5.dist-info/entry_points.txtN+I/N.,()OKO,ʳ,+$PKsO 55mod_ngarn-3.5.dist-info/LICENSEMIT License Copyright (c) 2018 Proteus Technologies Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HMuSamod_ngarn-3.5.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UD"PK!HW:o2 mod_ngarn-3.5.dist-info/METADATAVsF~b`:ՁCt4$ej1iǓi.H:YwVw\}^@{oޯXh.<'l$RBY$ov)i* S U*XU^."2uՊ9Tښ`=A"^\i2y7e4~?bgUх,ex t9%@"m]K)eڛTIw G27Sx*XL Y6cDh-):va<Yj6ۈ}M!|O㝇۽DKwr^b-?'?%V}hԦP? 6h/`bžEwAZQ58 rk5ae+nؓֆ#]="H7p!j$tTG*Q*A5y( aU7\EYT\V^Gz\Iu;Mtbn+d678眱ecLcu &R,$ĥ!I (PSs7%NCL?9ԫPWKl=) i͇Lޓei0Y lwii#Dت09ZÖ:V7tm3v(}&kffo(AFceBWGQ:n_^08y97~ЍfLdJ G>wR;:v;NKT||F)…ZbA]5q4R6w(>G ["ݣǥ'Gǝ#o8>[ka1ph;θكR "c~g1h+i%PW_;)4# wdE?4+[;d,bݕW䪈ZiV(Di8}{׹IAǪLBƴ{cIѦhG&ztWv"*֟*My_^oԪPK!H#Ymod_ngarn-3.5.dist-info/RECORDuɒ@}= iAʸ (,!QPD黣#f |k|-iwxL)񿶎#l5'ɥ Ip+u5~$_?-iԕVqvp tȉKv|6'#Aǵ Obgޫ++2TgdYmoN|_UHսsB>ln.Œpۙ5ztҞGDܝ%#0V')F9eȟ)\$+:iyVocwљ*=>H[Iv2ժ*σ/mE,R芿sԵSudэ{ ź{Jb"F*Ջlj7bm렼`_T.z!X$D(6+!bhA;E15VM~'h fEI<: zOLL*䌒>䕿zkk"0~bWF籸" P(۱