PK!O/dramatiq_pg/__init__.pyfrom .broker import PostgresBroker from .results import PostgresBackend __all__ = [ "PostgresBackend", "PostgresBroker", ] PK!},cl#l#dramatiq_pg/broker.pyimport json import logging import select from random import randint from textwrap import dedent from dramatiq.broker import ( Broker, Consumer, MessageProxy, ) from dramatiq.common import current_millis, dq_name from dramatiq.message import Message from dramatiq.results import Results from psycopg2.extensions import ( ISOLATION_LEVEL_AUTOCOMMIT, Notify, quote_ident, ) from psycopg2.extras import Json from .utils import make_pool, transaction from .results import PostgresBackend logger = logging.getLogger(__name__) def purge(curs, max_age='30 days'): # Delete old messages. Returns deleted messages. curs.execute(dedent("""\ DELETE FROM dramatiq.queue WHERE "state" IN ('done', 'rejected') AND mtime <= (NOW() - interval %s); """), (max_age,)) return curs.rowcount class PostgresBroker(Broker): def __init__(self, *, pool=None, url="", results=True, **kw): super(PostgresBroker, self).__init__(**kw) if pool and url: raise ValueError("You can't set both pool and URL!") if not pool: self.pool = make_pool(url) else: # Receive a pool object to have an I/O less __init__. self.pool = pool self.backend = None if results: self.backend = PostgresBackend(pool=self.pool) self.add_middleware(Results(backend=self.backend)) def consume(self, queue_name, prefetch=1, timeout=30000): return PostgresConsumer( pool=self.pool, queue_name=queue_name, prefetch=prefetch, timeout=timeout, ) def declare_queue(self, queue_name): if queue_name not in self.queues: self.emit_before("declare_queue", queue_name) self.queues[queue_name] = True # Actually do nothing in Postgres since all queues are stored in # the same table. self.emit_after("declare_queue", queue_name) def enqueue(self, message, *, delay=None): if delay: message = message.copy(queue_name=dq_name(message.queue_name)) message.options['eta'] = current_millis() + delay q = message.queue_name insert = (dedent("""\ WITH enqueued AS ( INSERT INTO dramatiq.queue (queue_name, message_id, "state", message) VALUES (%s, %s, 'queued', %s) ON CONFLICT (message_id) DO UPDATE SET "state" = 'queued', message = EXCLUDED.message RETURNING queue_name, message ) SELECT pg_notify('dramatiq.' || queue_name || '.enqueue', message::text) FROM enqueued; """), (q, message.message_id, Json(message.asdict()))) with transaction(self.pool) as curs: logger.debug("Upserting %s in queue %s.", message.message_id, q) curs.execute(*insert) return message class PostgresConsumer(Consumer): def __init__(self, *, pool, queue_name, timeout, **kw): self.listen_conn = None self.notifies = [] self.pool = pool self.queue_name = queue_name self.timeout = timeout // 1000 def __next__(self): # First, open connexion and fetch missed notifies from table. if self.listen_conn is None: self.listen_conn = self.start_listening() # We may have received a notify between LISTEN and SELECT of # pending messages. That's not a problem because we are able to # skip spurious notifies. self.notifies = self.fetch_pending_notifies() logger.debug( "Found %s pending messages in queue %s.", len(self.notifies), self.queue_name) if not self.notifies: # Then, fetch notifies from Pg connexion. self.poll_for_notify() # If we have some notifies, loop to find one todo. while self.notifies: notify = self.notifies.pop(0) payload = json.loads(notify.payload) message = Message(**payload) mid = message.message_id if self.consume_one(message): return MessageProxy(message) else: logger.debug("Message %s already consumed. Skipping.", mid) # We have nothing to do, let's see if the queue needs some cleaning. self.auto_purge() def ack(self, message): with transaction(self.pool) as curs: channel = f"dramatiq.{message.queue_name}.ack" payload = Json(message.asdict()) logger.debug( "Notifying %s for ACK %s.", channel, message.message_id) # dramatiq always ack a message, even if it has been requeued by # the Retries middleware. Thus, only update message in state # `consumed`. curs.execute(dedent("""\ WITH updated AS ( UPDATE dramatiq.queue SET "state" = 'done', message = %s WHERE message_id = %s AND state = 'consumed' RETURNING message ) SELECT pg_notify(%s, message::text) FROM updated; """), (payload, message.message_id, channel)) def auto_purge(self): # Automatically purge messages every 100k iteration. Dramatiq defaults # to 1s. This mean about 1 purge for 28h idle. if randint(0, 100_000): return logger.debug("Randomly triggering garbage collector.") with self.listen_conn.cursor() as curs: deleted = purge(curs) logger.info("Purged %d messages in all queues.", deleted) def close(self): if self.listen_conn: self.pool.putconn(self.listen_conn) self.listen_conn = None def consume_one(self, message): # Race to process this message. with transaction(self.pool) as curs: curs.execute(dedent("""\ UPDATE dramatiq.queue SET "state" = 'consumed', mtime = (NOW() AT TIME ZONE 'UTC') WHERE message_id = %s AND "state" = 'queued'; """), (message.message_id,)) # If no row was updated, this mean another worker has consumed it. return 1 == curs.rowcount def nack(self, message): with transaction(self.pool) as curs: # Use the same channel as ack. Actually means done. channel = f"dramatiq.{message.queue_name}.ack" logger.debug( "Notifying %s for NACK %s.", channel, message.message_id) payload = Json(message.asdict()) curs.execute(dedent("""\ WITH updated AS ( UPDATE dramatiq.queue SET "state" = 'rejected', message = %s WHERE message_id = %s AND state <> 'rejected' RETURNING message ) SELECT pg_notify(%s, message::text) FROM updated; """), (payload, message.message_id, channel)) def fetch_pending_notifies(self): with self.listen_conn.cursor() as curs: curs.execute(dedent("""\ SELECT message::text FROM dramatiq.queue WHERE state = 'queued' AND queue_name IN %s; """), ((self.queue_name, dq_name(self.queue_name)),)) return [ Notify(pid=0, channel=None, payload=r[0]) for r in curs ] def requeue(self, messages): messages = list(messages) if not len(messages): return logger.debug("Batch update of messages for requeue.") with self.listen_conn.cursor() as curs: curs.execute(dedent("""\ UPDATE dramatiq.queue SET state = 'queued' WHERE message_id IN %s; """), (tuple(m.message_id for m in messages),)) def start_listening(self): # Opens listening connection with proper configuration. conn = self.pool.getconn() # This is for NOTIFY consistency, according to psycopg2 doc. conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) channel = quote_ident(f"dramatiq.{self.queue_name}.enqueue", conn) dq = dq_name(self.queue_name) dchannel = quote_ident(f"dramatiq.{dq}.enqueue", conn) with conn.cursor() as curs: logger.debug( "Listening on channels %s, %s.", channel, dchannel) curs.execute(f"LISTEN {channel}; LISTEN {dchannel};") return conn def poll_for_notify(self): rlist, *_ = select.select([self.listen_conn], [], [], self.timeout) self.listen_conn.poll() if self.listen_conn.notifies: self.notifies += self.listen_conn.notifies logger.debug( "Received %d Postgres notifies for queue %s.", len(self.listen_conn.notifies), self.queue_name, ) self.listen_conn.notifies[:] = [] PK!QcZZdramatiq_pg/cli.pyimport argparse import logging import pdb from contextlib import closing, contextmanager from pkg_resources import get_distribution from textwrap import dedent from dramatiq.cli import ( LOGFORMAT, VERBOSITY, ) from psycopg2 import connect from .broker import purge logger = logging.getLogger(__name__) def entrypoint(): logging.basicConfig(level=logging.INFO, format=LOGFORMAT) try: exit(main()) except (pdb.bdb.BdbQuit, KeyboardInterrupt): logger.info("Interrupted.") except Exception: logger.exception('Unhandled error:') logger.error( "Please file an issue at " "https://gitlab.com/dalibo/dramatiq-pg/issues/new with full log.", ) exit(1) def main(): parser = make_argument_parser() args = parser.parse_args() logging.getLogger().setLevel(VERBOSITY.get(args.verbose, logging.INFO)) if not hasattr(args, 'command'): logger.error("Missing command. See --help for usage.") return 1 return args.command(args) def make_argument_parser(): dist = get_distribution('dramatiq-pg') parser = argparse.ArgumentParser( prog="dramatiq-pg", description="Maintainance utility for task-queue in Postgres.", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--version", action="version", version=dist.version) parser.add_argument( "--verbose", "-v", default=0, action="count", help="turn on verbose log output", ) subparsers = parser.add_subparsers() subparser = subparsers.add_parser('flush') subparser.set_defaults(command=flush_command) subparser = subparsers.add_parser('purge') subparser.set_defaults(command=purge_command) subparser.add_argument( '--maxage', dest='purge_maxage', default='30 days', help=dedent("""\ Max age of done/rejected message to keep in queue. Format is Postgres interval. Default is %(default)r. """) ) subparser = subparsers.add_parser('recover') subparser.set_defaults(command=recover_command) subparser.add_argument( '--minage', dest='recover_minage', default='1 min', help=dedent("""\ Max age of consumed message to requere. Format is Postgres interval. Default is %(default)r. """) ) subparser = subparsers.add_parser('stats') subparser.set_defaults(command=stats_command) return parser def flush_command(args): with transaction() as curs: curs.execute(dedent("""\ DELETE FROM dramatiq.queue WHERE "state" IN ('queued', 'consumed'); """)) flushed = curs.rowcount logger.info("Flushed %d messages.", flushed) def purge_command(args): with transaction() as curs: deleted = purge(curs, args.purge_maxage) logger.info("Deleted %d messages.", deleted) def recover_command(args): with transaction() as curs: curs.execute(dedent("""\ UPDATE dramatiq.queue SET state = 'queued' WHERE state = 'consumed' AND mtime < (NOW() AT TIME ZONE 'UTC') - interval %s; """), (args.recover_minage,)) recovered = curs.rowcount logger.info("Recovered %s messages.", recovered) def stats_command(args): with transaction() as curs: curs.execute(dedent("""\ SELECT "state", count(1) FROM dramatiq.queue GROUP BY "state"; """)) stats = dict(curs.fetchall()) for state in 'queued', 'consumed', 'done', 'rejected': print(f'{state}: {stats.get(state, 0)}') @contextmanager def transaction(connstring=""): # Manager for connecting to psycopg2 for a single transaction. with closing(connect(connstring)) as conn: with conn: with conn.cursor() as curs: yield curs if '__main__' == __name__: entrypoint() PK![ [ dramatiq_pg/results.py# # R E S U L T S # # Implements a result backend using Postgres. See # https://dramatiq.io/cookbook.html#results. # import json import logging from textwrap import dedent from dramatiq.results import ResultBackend, ResultMissing, ResultTimeout from psycopg2.extras import Json from .utils import make_pool, transaction, wait_for_notifies logger = logging.getLogger(__name__) class PostgresBackend(ResultBackend): def __init__(self, *, url=None, pool=None, **kw): super().__init__(**kw) if url: self.pool = make_pool(url) else: # Receive a pool object to have an I/O less __init__. self.pool = pool def build_message_key(self, message): # Just use message_id, it's UNIQUE in table. return str(message.message_id) def get_result(self, message, *, block=False, timeout=None): key = self.build_message_key(message) # Ensure a timeout is set. timeout = timeout or 300_000 channel = f'dramatiq.{key}.results' with transaction(self.pool, listen=channel) as curs: # First, search result in table. curs.execute(dedent("""\ SELECT result FROM dramatiq.queue WHERE message_id = %s AND result IS NOT NULL; """), (key,)) if curs.rowcount: result, = curs.fetchone() return result elif not block: raise ResultMissing(message) # From here, we are in blocking mode. logger.debug("Waiting for result of %s.", key) notifies = wait_for_notifies(curs.connection, timeout=timeout) if not notifies: raise ResultTimeout(message) notify, = notifies # Don't query database, use NOTIFY payload. return json.loads(notify.payload) def _store(self, key, result, ttl): with transaction(self.pool) as curs: logger.debug("Storing result for %s.", key) curs.execute(dedent("""\ WITH stored AS ( INSERT INTO dramatiq.queue (queue_name, message_id, "state", result, result_ttl) VALUES ('__RQ__', %s, 'done', %s, (NOW() AT TIME ZONE 'UTC') + interval %s) ON CONFLICT (message_id) DO UPDATE SET mtime = (NOW() AT TIME ZONE 'UTC'), result = EXCLUDED.result, result_ttl = EXCLUDED.result_ttl RETURNING queue_name, message_id, result ) SELECT pg_notify('dramatiq.' || message_id || '.results', result::text) FROM stored; """), (key, Json(result), f"{ttl} ms",)) if 0 == curs.rowcount: raise Exception(f"Can't store result of message {key}.") PK!xoXdramatiq_pg/schema.sql\set ON_ERROR_STOP on CREATE SCHEMA dramatiq; CREATE TYPE dramatiq."state" AS ENUM ( 'queued', 'consumed', 'rejected', 'done' ); CREATE TABLE dramatiq.queue( id BIGSERIAL PRIMARY KEY, queue_name TEXT NOT NULL DEFAULT 'default', message_id uuid UNIQUE, "state" dramatiq."state", mtime TIMESTAMP WITH TIME ZONE DEFAULT (NOW() AT TIME ZONE 'UTC'), -- message as encoded by dramatiq. message JSONB, result_key TEXT UNIQUE, "result" JSONB, result_ttl TIMESTAMP WITH TIME ZONE ); -- Index state and mtime together to speed up deletion. This can also speed up -- statistics when VACUUM ANALYZE is recent enough. CREATE INDEX ON dramatiq.queue("state", mtime); PK!dramatiq_pg/utils.pyimport logging import select from contextlib import contextmanager from urllib.parse import ( parse_qsl, urlencode, urlparse, ) from psycopg2.extensions import ( ISOLATION_LEVEL_AUTOCOMMIT, quote_ident, ) from psycopg2.pool import ThreadedConnectionPool logger = logging.getLogger(__name__) def make_pool(url): parts = urlparse(url) qs = dict(parse_qsl(parts.query)) minconn = int(qs.pop('minconn', '0')) maxconn = int(qs.pop('maxconn', '16')) parts = parts._replace(query=urlencode(qs)) connstring = parts.geturl() if ":/?" in connstring or connstring.endswith(':/'): # geturl replaces :/// with :/. libpq does not accept that. connstring = connstring.replace(':/', ':///') return ThreadedConnectionPool(minconn, maxconn, connstring) @contextmanager def transaction(pool, listen=None): # Manage the connection, transaction and cursor from a connection pool. conn = pool.getconn() if listen: # This is for NOTIFY consistency, according to psycopg2 doc. conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) channel = quote_ident(listen, conn) try: with conn: # Wraps in a transaction. with conn.cursor() as curs: if listen: curs.execute(f"LISTEN {channel};") yield curs finally: pool.putconn(conn) def wait_for_notifies(conn, timeout=1000): rlist, *_ = select.select([conn], [], [], timeout / 1000.) conn.poll() notifies = conn.notifies[:] if notifies: logger.debug("Received %d Postgres notifies.", len(conn.notifies)) conn.notifies[:] = [] return notifies PK!HRs5:,dramatiq_pg-0.5.1.dist-info/entry_points.txtN+I/N.,()J)JM,,-H s2RJ* 3JPK!S#dramatiq_pg-0.5.1.dist-info/LICENSEPostgreSQL Licence Copyright (c) 2019, DALIBO Permission to use, copy, modify, and distribute this software and its documentation for any purpose, without fee, and without a written agreement is hereby granted, provided that the above copyright notice and this paragraph and the following two paragraphs appear in all copies. IN NO EVENT SHALL DALIBO BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF DALIBO HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. DALIBO SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND DALIBO HAS NO OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. PK!HnHTU!dramatiq_pg-0.5.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hp $dramatiq_pg-0.5.1.dist-info/METADATAV[nFU\ J$#'Z PE4"/ɉ34g;hIW{IP$ )zH/'orʚFę,pI% ͤa VuQȪu>nVpt[x]cG[औͽ/l:͔rŶ&R'\!Rͭ7K;S{oq\V3//Wsq7"ECgGobs*UH~]j|㲬L\T6|2,jjG㻯x‰r~d8xO0z8بtMl쐍'GlDc'o.38q]˚-yv3-XM2 E]\Mx =jJ;ON6F81)uo}}H NF !`{0XT/d#ؔLnB^5 1U]R ]9 6"Woe)TfXPg/O[B1Jh+&:fKj{:<l4ۣKO8BE-gӳū_["@uġjMAًCRiȓwlT+|.qaot \'^dE+HZ*CPWZEMA+M5T.hn cZܙ(qHfBjBFQZFsW9A;ٲ]H{Ep w c-Sb ?)-fa4}\x+/Lk)HO@JZ'έ%&-wnFCJcd4ogӰ$ݏ3d{{`|=&tLzF!u?[s%ɏ$ fu_@JmEFaT{5v=Z}d9(CՁ44þBф?Quc \ݒM >Ӯvʅ4Վ[kA-һe5a2}G;b#Z9}wa5Q o ^ޙi pk {JLϪӇQ[8\N $̧$ '$ \J%4IF`'gЧIS=6]Rn2]n`wհN䃝!| Zb@7DE⣉mmZ)i{05SrZc16K1c#]?u ehEכS**P !bPK!HFgj"dramatiq_pg-0.5.1.dist-info/RECORD}GvPy^eAH^A\{ |Ka=%c$u_$ٍ.)A3_.ˠ Goert4olJJ1I{$?qyh 3[ULVG7"dOEx^(=i.1d'mez-4Yd?D#$Ob/\[ ǥQ ݚ=vjn⢿и42@c_8].݁Ns'}=XIZ N.[d}J3c! Ǽh7X4Zc]<7L~)I&Ff8:&Q}S-gj(w.D>[2u>ÿ; n 8h54]L&dPFUI1W J27a ! k*:$KhB*^Q]o/Ψ0=Yns,!a[ !Xzۊd2|PK!O/dramatiq_pg/__init__.pyPK!},cl#l#dramatiq_pg/broker.pyPK!QcZZX$dramatiq_pg/cli.pyPK![ [ 3dramatiq_pg/results.pyPK!xoXq?dramatiq_pg/schema.sqlPK!SBdramatiq_pg/utils.pyPK!HRs5:,&Idramatiq_pg-0.5.1.dist-info/entry_points.txtPK!S#Idramatiq_pg-0.5.1.dist-info/LICENSEPK!HnHTU!|Mdramatiq_pg-0.5.1.dist-info/WHEELPK!Hp $Ndramatiq_pg-0.5.1.dist-info/METADATAPK!HFgj"GSdramatiq_pg-0.5.1.dist-info/RECORDPK .U