PK! huey-pg.sql\set ON_ERROR_STOP on CREATE SCHEMA huey; CREATE TYPE huey."state" AS ENUM ( 'queued', 'consumed', 'rejected', 'done' ); CREATE TABLE huey.queue( id SERIAL PRIMARY KEY, queue_name TEXT NOT NULL DEFAULT 'default', "state" huey."state", message bytea ); PK!Off huey_pg.pyimport logging import select import threading from contextlib import contextmanager from textwrap import dedent from huey.api import Huey from huey.constants import EmptyData from huey.storage import BaseStorage from psycopg2.extensions import ( ISOLATION_LEVEL_AUTOCOMMIT, quote_ident, ) logger = logging.getLogger(__name__) @contextmanager def transaction(pool): # Manage the connection, transaction and cursor from a connection pool. conn = pool.getconn() try: with conn: # Wraps in a transaction. with conn.cursor() as curs: yield curs finally: pool.putconn(conn) class PostgresHuey(Huey): def get_storage(self, *, connection_pool): return PostgresStorage(huey=self, connection_pool=connection_pool) class PostgresStorage(BaseStorage): def __init__(self, *, name='huey', huey, connection_pool): super(PostgresStorage, self).__init__(name=name) self.pool = connection_pool self.huey = huey self._local = threading.local() @property def listen_connection(self): if not hasattr(self._local, 'listen_connection'): logger.info("New pg connection.") self._local.listen_connection = conn = self.pool.getconn() # This is for NOTIFY consistency, according to psycopg2 doc. conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) channel = quote_ident(f"huey.{self.name}.enqueue", conn) with conn.cursor() as curs: logger.debug("Listening on channel %s.", channel) curs.execute(f"LISTEN {channel};") return self._local.listen_connection @property def notifies(self): if not hasattr(self._local, 'notifies'): self._local.notifies = [] return self._local.notifies def consume_one(self, mid): # Race to process this message. with transaction(self.pool) as curs: curs.execute(dedent("""\ UPDATE huey.queue SET "state" = 'consumed' WHERE id = %s AND "state" = 'queued' RETURNING message; """), (mid,)) # If no row was updated, this mean another worker has consumed it. if 1 == curs.rowcount: message, = curs.fetchone() return message def dequeue(self): while True: # Start by processing already fetched notifies. while self.notifies: notify = self.notifies.pop(0) mid = int(notify.payload) message = self.consume_one(mid) if message: logger.info("Consumed message %s.", mid) return message else: logger.info("Message %s already consumed.", mid) # Notify list is empty, listen for more. (blocking) self.listen() def enqueue(self, data): insert = (dedent("""\ INSERT INTO huey.queue("state", message) VALUES (%s, %s) RETURNING id; """), ("queued", data)) with transaction(self.pool) as curs: curs.execute(*insert) id_, = curs.fetchone() channel = quote_ident(f"huey.{self.name}.enqueue", curs) curs.execute(f"""NOTIFY {channel}, %s;""", (str(id_),)) def listen(self): with self.listen_connection.cursor() as curs: while not self.notifies: fd_lists = select.select([curs.connection], [], [], 300) if any(fd_lists): curs.connection.poll() self._local.notifies += curs.connection.notifies logger.info("Got %s.", curs.connection.notifies) curs.connection.notifies[:] = [] def peek_data(self, key): return EmptyData def read_schedule(self, *_): return [] PK!HnHTUhuey_pg-0.1.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hp.I huey_pg-0.1.0.dist-info/METADATATnG }`PHven,BmY}&\iٙlEwJnviH( e}PdpDV&q+7X몒`jC\y PZ c5k[a>M~67*Gp?x#.긶> Axa6>֊R$ӆ»iz;?##-CPBbB| y'/|ޮ*eVp#ͪ&V_O^ӫ#~~uC.UIBp MnꄽNҷSvĐ{"M?Yd8]į5! 1 PAT4Bd=}d-9C_ X|lKJv/DթGG52G&ale=Bn "4ӣ#!ā~Q[:mN C|i4/BkI9tge!_c%S |aҟ-WCz@hLĂZS.VZ!~6ȬC%IŹR؄(n)^00?Ko+teB@qQy/\7\nK(&B}%3,򂤄gAEC5P Y.we$ /ۇ̬jzkjڤj~ϱ (/!ڣў5lJϝHV(SpA 6nwvOyERlrN Aͬ,*ij-IPC{G*Ӛ'@Lm4rMKHM$İYu<,XeC?.jM=+*MˡCхo,ReBPK!HQThuey_pg-0.1.0.dist-info/RECORDu;0޳BbEdA׆wO}[>? 3MskptZN\4E.wLh#*9EI|݆LRB3`J P IǢv"=tǡ6X[uuljQ#zfl62wPᴿA9(\(SL)hcsW,^PK! huey-pg.sqlPK!Off 8huey_pg.pyPK!HnHTUhuey_pg-0.1.0.dist-info/WHEELPK!Hp.I Uhuey_pg-0.1.0.dist-info/METADATAPK!HQThuey_pg-0.1.0.dist-info/RECORDPKV