PK!ܓHHdramatiq_pg/__init__.pyfrom .broker import PostgresBroker __all__ = [ "PostgresBroker", ] PK!łgdramatiq_pg/broker.pyimport logging import select from contextlib import contextmanager from textwrap import dedent from dramatiq.broker import ( Broker, Consumer, MessageProxy, ) from dramatiq.message import Message from psycopg2.extensions import ( ISOLATION_LEVEL_AUTOCOMMIT, quote_ident, ) logger = logging.getLogger(__name__) @contextmanager def transaction(pool): # Manage the connection, transaction and cursor from a connection pool. conn = pool.getconn() try: # This is for NOTIFY consistency, according to psycopg2 doc. conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) with conn: # Wraps in a transaction. with conn.cursor() as curs: yield curs finally: pool.putconn(conn) class PostgresBroker(Broker): def __init__(self, *, pool, **kw): super(PostgresBroker, self).__init__(**kw) # Receive a pool object to have an I/O less __init__. self.pool = pool def consume(self, queue_name, prefetch=1, timeout=30000): return PostgresConsumer( pool=self.pool, queue_name=queue_name, prefetch=prefetch, timeout=timeout, ) def declare_queue(self, queue_name): if queue_name not in self.queues: self.emit_before("declare_queue", queue_name) self.queues[queue_name] = True # Actually do nothing in Postgres since all queues are stored in # the same table. self.emit_after("declare_queue", queue_name) def enqueue(self, message, *, delay=None): q = message.queue_name payload = message.encode().decode('utf-8') insert = (dedent("""\ INSERT INTO dramatiq.queue (queue_name, message_id, "state", message) VALUES (%s, %s, %s, %s::jsonb); """), (q, message.message_id, 'queued', payload)) with transaction(self.pool) as curs: channel = quote_ident(f"dramatiq.{q}.enqueue", curs) logger.debug("Inserting %s in %s.", message.message_id, q) curs.execute(*insert) # Message must be shorter than 8ko. curs.execute(f"NOTIFY {channel}, %s;", (payload,)) class PostgresConsumer(Consumer): def __init__(self, *, pool, queue_name, **kw): self.pool = pool self.queue_name = queue_name self.notifies = [] def __next__(self): while True: # Start by processing already fetched notifies. while self.notifies: notify = self.notifies.pop(0) message = Message.decode(notify.payload.encode('utf-8')) mid = message.message_id if self.consume_one(message): logger.debug("Consumed message %s.", mid) return MessageProxy(message) else: logger.debug("Message %s already consumed.", mid) # Notify list is empty, listen for more. self.listen() def ack(self, message): with transaction(self.pool) as curs: curs.execute(dedent("""\ UPDATE dramatiq.queue SET "state" = 'done' WHERE message_id = %s AND "state" <> 'done' """), (message.message_id,)) def consume_one(self, message): # Race to process this message. with transaction(self.pool) as curs: curs.execute(dedent("""\ UPDATE dramatiq.queue SET "state" = 'consumed' WHERE message_id = %s AND "state" = 'queued'; """), (message.message_id,)) # If no row was updated, this mean another worker has consumed it. return 1 == curs.rowcount def listen(self): with transaction(self.pool) as curs: channel = quote_ident(f"dramatiq.{self.queue_name}.enqueue", curs) logger.debug("Listening on channel %s.", channel) curs.execute(f"LISTEN {channel};") while not self.notifies: fd_lists = select.select([curs.connection], [], [], 300) if any(fd_lists): curs.connection.poll() self.notifies += curs.connection.notifies PK!I⺠dramatiq_pg/schema.sql\set ON_ERROR_STOP on CREATE SCHEMA dramatiq; CREATE TYPE dramatiq."state" AS ENUM ( 'queued', 'consumed', 'rejected', 'done' ); CREATE TABLE dramatiq.queue( id SERIAL PRIMARY KEY, queue_name TEXT NOT NULL DEFAULT 'default', message_id uuid UNIQUE, "state" dramatiq."state", mtime TIMESTAMP WITH TIME ZONE DEFAULT NOW(), -- message as encoded by dramatiq. message JSONB, "result" JSONB ); PK!HnHTU!dramatiq_pg-0.1.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hk⟂$dramatiq_pg-0.1.0.dist-info/METADATAUrH}Gۅ/ɦJe\e㸖] '2H-1akFK ]cB@Rh9}z^& N"Je$̥WoDw|^bV,&DYc )̓tkTaWc`eF+וֹLy-al~"Z!᳊p1>=߱ޘ2qؖ)zlq_)#/(>NgC1ŷJQR0)j܄{蘒ΩT!E:Ii2U,J-495>|r/'g|wQ92 Aձ5]z7..`h 浥y߃y!|ġ _'B NV#̓ʐ\SõMx  K0uxٔ qq1_!7=)J5B=qb_JR;7ܱ#8PESm5.5rн87sd.^a.9#lqGO7F@Ƀqu06ZgMQ+qJ7ϗ/j$ J M?<hLcef㧥i|Q *K%#hu=\ج4j }ʦi<X f$/52zeL3r=uʴYJ6ʯh+n|d{-LGOIA\oU\0LeUHy Ŷ n=c 16@ɂV1 rY[}Hg5|햊h++nw/N2% W>w_쥩I.8pݙ1 "&{nV$$ḏ f˕*@6I$^;%h_ӂxU>L׭^S*Ӹo,),qw%e_75Q`c[0ȅ]O@mLX->G&ѫClN1y<5GY)0}2)?PK!ܓHHdramatiq_pg/__init__.pyPK!łg}dramatiq_pg/broker.pyPK!I⺠Wdramatiq_pg/schema.sqlPK!HnHTU!+dramatiq_pg-0.1.0.dist-info/WHEELPK!Hk⟂$dramatiq_pg-0.1.0.dist-info/METADATAPK!H"*"dramatiq_pg-0.1.0.dist-info/RECORDPK