PK!ܓHHdramatiq_pg/__init__.pyfrom .broker import PostgresBroker __all__ = [ "PostgresBroker", ] PK!L%%dramatiq_pg/broker.pyimport json import logging import select from random import randint from contextlib import contextmanager from textwrap import dedent from urllib.parse import ( parse_qsl, urlencode, urlparse, ) from dramatiq.broker import ( Broker, Consumer, MessageProxy, ) from dramatiq.common import current_millis, dq_name from dramatiq.message import Message from psycopg2.extensions import ( ISOLATION_LEVEL_AUTOCOMMIT, Notify, quote_ident, ) from psycopg2.extras import Json from psycopg2.pool import ThreadedConnectionPool logger = logging.getLogger(__name__) @contextmanager def transaction(pool): # Manage the connection, transaction and cursor from a connection pool. conn = pool.getconn() try: with conn: # Wraps in a transaction. with conn.cursor() as curs: yield curs finally: pool.putconn(conn) def purge(curs, max_age='30 days'): # Delete old messages. Returns deleted messages. curs.execute(dedent("""\ DELETE FROM dramatiq.queue WHERE "state" IN ('done', 'rejected') AND mtime <= (NOW() - interval %s); """), (max_age,)) return curs.rowcount def make_pool(url): parts = urlparse(url) qs = dict(parse_qsl(parts.query)) minconn = int(qs.pop('minconn', '0')) maxconn = int(qs.pop('maxconn', '16')) parts = parts._replace(query=urlencode(qs)) connstring = parts.geturl() if ":/?" in connstring or connstring.endswith(':/'): # geturl replaces :/// with :/. libpq does not accept that. connstring = connstring.replace(':/', ':///') return ThreadedConnectionPool(minconn, maxconn, connstring) class PostgresBroker(Broker): def __init__(self, *, pool=None, url="", **kw): super(PostgresBroker, self).__init__(**kw) if pool and url: raise ValueError("You can't set both pool and URL!") if url: self.pool = make_pool(url) else: # Receive a pool object to have an I/O less __init__. self.pool = pool def consume(self, queue_name, prefetch=1, timeout=30000): return PostgresConsumer( pool=self.pool, queue_name=queue_name, prefetch=prefetch, timeout=timeout, ) def declare_queue(self, queue_name): if queue_name not in self.queues: self.emit_before("declare_queue", queue_name) self.queues[queue_name] = True # Actually do nothing in Postgres since all queues are stored in # the same table. self.emit_after("declare_queue", queue_name) def enqueue(self, message, *, delay=None): if delay: message = message.copy(queue_name=dq_name(message.queue_name)) message.options['eta'] = current_millis() + delay q = message.queue_name insert = (dedent("""\ WITH enqueued AS ( INSERT INTO dramatiq.queue (queue_name, message_id, "state", message) VALUES (%s, %s, 'queued', %s) ON CONFLICT (message_id) DO UPDATE SET "state" = 'queued', message = EXCLUDED.message RETURNING queue_name, message ) SELECT pg_notify('dramatiq.' || queue_name || '.enqueue', message::text) FROM enqueued; """), (q, message.message_id, Json(message.asdict()))) with transaction(self.pool) as curs: logger.debug("Upserting %s in queue %s.", message.message_id, q) curs.execute(*insert) class PostgresConsumer(Consumer): def __init__(self, *, pool, queue_name, timeout, **kw): self.listen_conn = None self.notifies = [] self.pool = pool self.queue_name = queue_name self.timeout = timeout // 1000 def __next__(self): # First, open connexion and fetch missed notifies from table. if self.listen_conn is None: self.listen_conn = self.start_listening() # We may have received a notify between LISTEN and SELECT of # pending messages. That's not a problem because we are able to # skip spurious notifies. self.notifies = self.fetch_pending_notifies() logger.debug( "Found %s pending messages in queue %s.", len(self.notifies), self.queue_name) if not self.notifies: # Then, fetch notifies from Pg connexion. self.poll_for_notify() # If we have some notifies, loop to find one todo. while self.notifies: notify = self.notifies.pop(0) payload = json.loads(notify.payload) message = Message(**payload) mid = message.message_id if self.consume_one(message): return MessageProxy(message) else: logger.debug("Message %s already consumed. Skipping.", mid) # We have nothing to do, let's see if the queue needs some cleaning. self.auto_purge() def ack(self, message): with transaction(self.pool) as curs: channel = f"dramatiq.{message.queue_name}.ack" payload = Json(message.asdict()) logger.debug( "Notifying %s for ACK %s.", channel, message.message_id) # dramatiq always ack a message, even if it has been requeued by # the Retries middleware. Thus, only update message in state # `consumed`. curs.execute(dedent("""\ WITH updated AS ( UPDATE dramatiq.queue SET "state" = 'done', message = %s WHERE message_id = %s AND state = 'consumed' RETURNING message ) SELECT pg_notify(%s, message::text) FROM updated; """), (payload, message.message_id, channel)) def auto_purge(self): # Automatically purge messages every 100k iteration. Dramatiq defaults # to 1s. This mean about 1 purge for 28h idle. if randint(0, 100_000): return logger.debug("Randomly triggering garbage collector.") with self.listen_conn.cursor() as curs: deleted = purge(curs) logger.info("Purged %d messages in all queues.", deleted) def close(self): if self.listen_conn: self.pool.putconn(self.listen_conn) self.listen_conn = None def consume_one(self, message): # Race to process this message. with transaction(self.pool) as curs: curs.execute(dedent("""\ UPDATE dramatiq.queue SET "state" = 'consumed', mtime = (NOW() AT TIME ZONE 'UTC') WHERE message_id = %s AND "state" = 'queued'; """), (message.message_id,)) # If no row was updated, this mean another worker has consumed it. return 1 == curs.rowcount def nack(self, message): with transaction(self.pool) as curs: # Use the same channel as ack. Actually means done. channel = f"dramatiq.{message.queue_name}.ack" logger.debug( "Notifying %s for NACK %s.", channel, message.message_id) payload = Json(message.asdict()) curs.execute(dedent("""\ WITH updated AS ( UPDATE dramatiq.queue SET "state" = 'rejected', message = %s WHERE message_id = %s AND state <> 'rejected' RETURNING message ) SELECT pg_notify(%s, message::text) FROM updated; """), (payload, message.message_id, channel)) def fetch_pending_notifies(self): with self.listen_conn.cursor() as curs: curs.execute(dedent("""\ SELECT message::text FROM dramatiq.queue WHERE state = 'queued' AND queue_name IN %s; """), ((self.queue_name, dq_name(self.queue_name)),)) return [ Notify(pid=0, channel=None, payload=r[0]) for r in curs ] def requeue(self, messages): messages = list(messages) if not len(messages): return logger.debug("Batch update of messages for requeue.") with self.listen_conn.cursor() as curs: curs.execute(dedent("""\ UPDATE dramatiq.queue SET state = 'queued' WHERE message_id IN %s; """), (tuple(m.message_id for m in messages),)) def start_listening(self): # Opens listening connection with proper configuration. conn = self.pool.getconn() # This is for NOTIFY consistency, according to psycopg2 doc. conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) channel = quote_ident(f"dramatiq.{self.queue_name}.enqueue", conn) dq = dq_name(self.queue_name) dchannel = quote_ident(f"dramatiq.{dq}.enqueue", conn) with conn.cursor() as curs: logger.debug( "Listening on channels %s, %s.", channel, dchannel) curs.execute(f"LISTEN {channel}; LISTEN {dchannel};") return conn def poll_for_notify(self): rlist, *_ = select.select([self.listen_conn], [], [], self.timeout) self.listen_conn.poll() if self.listen_conn.notifies: self.notifies += self.listen_conn.notifies logger.debug( "Received %d Postgres notifies for queue %s.", len(self.listen_conn.notifies), self.queue_name, ) self.listen_conn.notifies[:] = [] PK!J$ dramatiq_pg/cli.pyimport argparse import logging import pdb from contextlib import closing, contextmanager from pkg_resources import get_distribution from textwrap import dedent from dramatiq.cli import ( LOGFORMAT, VERBOSITY, ) from psycopg2 import connect from .broker import purge logger = logging.getLogger(__name__) def entrypoint(): logging.basicConfig(level=logging.INFO, format=LOGFORMAT) try: exit(main()) except (pdb.bdb.BdbQuit, KeyboardInterrupt): logger.info("Interrupted.") except Exception: logger.exception('Unhandled error:') logger.error( "Please file an issue at " "https://gitlab.com/dalibo/dramatiq-pg/issues/new with full log.", ) exit(1) def main(): parser = make_argument_parser() args = parser.parse_args() logging.getLogger().setLevel(VERBOSITY.get(args.verbose, logging.INFO)) if not hasattr(args, 'command'): logger.error("Missing command. See --help for usage.") return 1 return args.command(args) def make_argument_parser(): dist = get_distribution('dramatiq-pg') parser = argparse.ArgumentParser( prog="dramatiq-pg", description="Maintainance utility for task-queue in Postgres.", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--version", action="version", version=dist.version) parser.add_argument( "--verbose", "-v", default=0, action="count", help="turn on verbose log output", ) subparsers = parser.add_subparsers() subparser = subparsers.add_parser('purge') subparser.set_defaults(command=purge_command) subparser.add_argument( '--maxage', dest='purge_maxage', default='30 days', help=dedent("""\ Max age of done/rejected message to keep in queue. Format is Postgres interval. Default is %(default)r. """) ) subparser = subparsers.add_parser('recover') subparser.set_defaults(command=recover_command) subparser.add_argument( '--minage', dest='recover_minage', default='1 min', help=dedent("""\ Max age of consumed message to requere. Format is Postgres interval. Default is %(default)r. """) ) subparser = subparsers.add_parser('stats') subparser.set_defaults(command=stats_command) return parser def purge_command(args): with transaction() as curs: deleted = purge(curs, args.purge_maxage) logger.info("Deleted %d messages.", deleted) def recover_command(args): with transaction() as curs: curs.execute(dedent("""\ UPDATE dramatiq.queue SET state = 'queued' WHERE state = 'consumed' AND mtime < (NOW() AT TIME ZONE 'UTC') - interval %s; """), (args.recover_minage,)) recovered = curs.rowcount logger.info("Recovered %s messages.", recovered) def stats_command(args): with transaction() as curs: curs.execute(dedent("""\ SELECT "state", count(1) FROM dramatiq.queue GROUP BY "state"; """)) stats = dict(curs.fetchall()) for state in 'queued', 'consumed', 'done', 'rejected': print(f'{state}: {stats.get(state, 0)}') @contextmanager def transaction(connstring=""): # Manager for connecting to psycopg2 for a single transaction. with closing(connect(connstring)) as conn: with conn: with conn.cursor() as curs: yield curs if '__main__' == __name__: entrypoint() PK!y,lldramatiq_pg/schema.sql\set ON_ERROR_STOP on CREATE SCHEMA dramatiq; CREATE TYPE dramatiq."state" AS ENUM ( 'queued', 'consumed', 'rejected', 'done' ); CREATE TABLE dramatiq.queue( id BIGSERIAL PRIMARY KEY, queue_name TEXT NOT NULL DEFAULT 'default', message_id uuid UNIQUE, "state" dramatiq."state", mtime TIMESTAMP WITH TIME ZONE DEFAULT (NOW() AT TIME ZONE 'UTC'), -- message as encoded by dramatiq. message JSONB, "result" JSONB ); -- Index state and mtime together to speed up deletion. This can also speed up -- statistics when VACUUM ANALYZE is recent enough. CREATE INDEX ON dramatiq.queue("state", mtime); PK!HRs5:,dramatiq_pg-0.4.1.dist-info/entry_points.txtN+I/N.,()J)JM,,-H s2RJ* 3JPK!S#dramatiq_pg-0.4.1.dist-info/LICENSEPostgreSQL Licence Copyright (c) 2019, DALIBO Permission to use, copy, modify, and distribute this software and its documentation for any purpose, without fee, and without a written agreement is hereby granted, provided that the above copyright notice and this paragraph and the following two paragraphs appear in all copies. IN NO EVENT SHALL DALIBO BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF DALIBO HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. DALIBO SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND DALIBO HAS NO OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. PK!HnHTU!dramatiq_pg-0.4.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HM $dramatiq_pg-0.4.1.dist-info/METADATAVNFS%UhK iUH@ ,T*;kN{g O&}~gFQ y^ʫ;ŔvGѱyJirո̢z;~ Yjtj*v\Vtѹr暣lRe0t$ިE|*vrNpdD'J04ͯlu]Z&A/C ^2rh+rOF7Ζ\P/+M'DYPGko=0(P;1r=fZ‰ 9PcgF]2ɷW wo\݆_ln!iKʛ)I_]$0t"UĪwvd5t1姙2xsӋ.V D-q>' tIg!.H]c|[6E0P!17^7N 'XL0쨛W{g#)[Z"sc?| 0<^W6dhrn6iFdU"a'_#&Z#7riRiu@`}S">IIJߑ2TƋj3YL6HI50J%5a!3-y9;h%T5HŽjؿCXJs`^. 3G`P*\aLL 'é4Q.*Bu¡0@"X"gX*o\mlnF,HW0 UgMEuFhy% 0.1H:lGj7I7 Q'HZ>tps'bg"ERr"zPK!HȾͷ"dramatiq_pg-0.4.1.dist-info/RECORD}I@@}(IE/ d ! )HWv P5FCEU_(" `?`uL҄نn|)c6N'pP 7Wl-S9<\sgv,8 8Bc+oZV/T'z9H-_-tU%#='|b!ú J(ݽQsRfݍY)'zRfNQ\v~Rs7`hGnVo#h$jFx;ѴEz4聞 z ns[nd5ܝdX Erc`Qrq"i:Ro+lW -J}.0Lo{Ƕm`)qf g>GtNISd6`{B=w"i#ne+PK!ܓHHdramatiq_pg/__init__.pyPK!L%%}dramatiq_pg/broker.pyPK!J$ &dramatiq_pg/cli.pyPK!y,ll4dramatiq_pg/schema.sqlPK!HRs5:,H7dramatiq_pg-0.4.1.dist-info/entry_points.txtPK!S#7dramatiq_pg-0.4.1.dist-info/LICENSEPK!HnHTU!;dramatiq_pg-0.4.1.dist-info/WHEELPK!HM $1<dramatiq_pg-0.4.1.dist-info/METADATAPK!HȾͷ"QBdramatiq_pg-0.4.1.dist-info/RECORDPK HD