PKPND]telstar/__init__.py""" Somedesc """ __version__ = "0.0.1" class Message(object): IDFieldName = b"message_id" DataFieldName = b"data" def __init__(self, stream, msg_uuid, data): self.stream = stream self.msg_uuid = msg_uuid self.data = data PKPN[B consumer # # msg:1 -> cg-userSignUp.1 # msg:2 -> cg-userSignUp.2 # msg:3 -> cg-userSignUp.3 # msg:4 -> cg-userSignUp.1 # msg:5 -> cg-userSignUp.2 # msg:6 -> cg-userSignUp.3 # msg:7 -> cg-userSignUp.1 # # Which basically means that inside a group a single consumer will only get message the others have not yet seen. # For a deep dive read to this -> https://redis.io/topics/streams-intro # This allows us to create N consumers w/o needing to figure out which message already has been processed. class Consumer(object): @staticmethod def increment(id): # IDs are of the form "1509473251518-0" and comprise a millisecond # timestamp plus a sequence number to differentiate within the timestamp. time, sequence = id.decode("ascii").split("-") if not sequence: raise Exception("Argument error, {id} has wrong format not #-#") next_sequence = int(sequence) + 1 return bytes(f"{time}-{next_sequence}", "ascii") def __init__(self, link, group_name, consumer_name, stream_name, processor_fn): self.link = link self.stream_name = f"telstar:stream:{stream_name}" self.consumer_name = consumer_name self.group_name = group_name self.processor_fn = processor_fn self.consumer_group = f"{self.stream_name}:{self.group_name}" self.consumer_name = f"cg:{self.consumer_group}:{self.consumer_name}" self.create_consumer() # A new consumer group for the given stream, if the stream does not exist yet # create one (`mkstream`) - if it does we want all messages present `id=0` def create_consumer(self): try: self.link.xgroup_create(self.stream_name, self.group_name, mkstream=True, id="0") except redis.exceptions.ResponseError: pass # The group already exists # In consumer groups, consumers can disappear, when they do they can leave non ack'ed message # which we want to claim and be delivered to us. def claim_message_from_the_dead(self): # Get information about all consumers in the group and how many messages are pending pending_info = self.link.xpending(self.stream_name, self.group_name) # {'pending': 10, # 'min': b'1560032216285-0', # 'max': b'1560032942270-0', # 'consumers': [{'name': b'cg-userSignUp.1', 'pending': 10}]} # Nothing to see here if pending_info["pending"] == 0: return # Now get all messages ids within that range and select the ones we want to claim and claim them # But only if they are pending for more than 20secs. pending_messages = self.link.xpending_range(self.stream_name, self.group_name, pending_info["min"], pending_info["max"], pending_info["pending"]) # [ # {'message_id': b'1560194528886-0', # 'consumer': b'cg-userSignUp.1', # 'time_since_delivered': 22020, # 'times_delivered': 1} # ...] messages_to_claim = [p["message_id"] for p in pending_messages if not p["consumer"].decode("ascii") == self.consumer_name] if not messages_to_claim: return # The pending messages are all our own no need to claim anything return self.link.xclaim(self.stream_name, self.group_name, self.consumer_name, 20 * 1000, messages_to_claim, justid=True) # We claim the message from other dead/non-responsive consumers. # When new message have been claimed they are usually from the past which # which means in order to process them we need to start processing our history. def transfer_and_process_history(self): start = self.get_last_seen_id() stream_msg_ids = self.claim_message_from_the_dead() if stream_msg_ids: start = min([min(stream_msg_ids), self.increment(start)]) print(start) # start = b"0-0" # This is strange is it means we want to reprocess everything - including what we have seen sofar self.catchup(start=start) # This is the main loop where we start from the history # and claim message and reprocess our history. def run(self): self.transfer_and_process_history() while True: self.subscribe() # block for 2 secs. self.transfer_and_process_history() def get_last_seen_id(self): check_point_key = f"telstar:checkpoint:{self.consumer_name}" return self.link.get(check_point_key) or b"0-0" # Multiple things are happening here. # 1. Save the stream_msg_id as checkpoint, which means # that we know where to start should the consumer be restarted # 2. Each message has a UUID and in order to process each meassage only once we remember # the UUID for 14 days # 3. Acknowledge the message to meaning that we have processed it def acknowledge(self, msg: Message, stream_msg_id): check_point_key = f"telstar:checkpoint:{self.consumer_name}" seen_key = f"telstar:seen:{self.consumer_group}:{msg.msg_uuid}" # Execute the following statments in a transaction e.g. redis speak `pipeline` pipe = self.link.pipeline() # If this key changes before we execute than the pipeline fails the ack fails and this the processor reverting all the work. # which is exactly what we want in this case as the work has already been completed by another consumer. pipe.watch(seen_key) # Mark this as a seen key for 14 Days meaning if the message reappears after 14 days we reprocess it pipe.set(seen_key, 1, ex=14 * 24 * 60 * 60) # 14 days # Set the checkpoint for this consumer so that it knows where to start agains once it restarts. pipe.set(check_point_key, stream_msg_id) # Acknowledge the actual message pipe.xack(self.stream_name, self.group_name, stream_msg_id) pipe.execute() def work(self, stream_msg_id, record): msg = Message(self.stream_name, record[Message.IDFieldName], json.loads(record[Message.DataFieldName])) key = f"telstar:seen:{self.consumer_group}:{msg.msg_uuid}" if self.link.get(key): # This is a double send self.acknowledge(msg, stream_msg_id) return self.processor_fn(self, msg, done=partial(self.acknowledge, msg, stream_msg_id)) # Process all message from `start` def catchup(self, start): return self._xread(start) # Process wait for new messages def subscribe(self): return self._xread(">", block=2000) def _xread(self, start, block=0): value = self.link.xreadgroup(self.group_name, self.consumer_name, {self.stream_name: start}, block=block) if not value: return 0 [[stream, records]] = value for record in records: stream_msg_id, record = record self.work(stream_msg_id, record) return len(records) PKPNIxxtelstar/producer.pyfrom typing import Callable, List, Tuple import json from . import Message class Producer(object): def __init__(self, link, puller_fn: Callable[[], Tuple[List[Message], Callable[[], None]]], context_callable): self.link = link self.puller_fn = puller_fn self.context_callable = context_callable def run_once(self): records, done = self.puller_fn() for record in records: self.add(record) done() def run(self): while True: if callable(self.context_callable): with self.context_callable(): self.run_once() else: self.run_once() def add(self, msg: Message): self.link.xadd(f"telstar:stream:{msg.stream}", { Message.IDFieldName: str(msg.msg_uuid), Message.DataFieldName: json.dumps(msg.data)}) PKPN޼telstar/tests/_dbcontent.pyimport os from playhouse.db_url import connect db = connect(os.environ["DATABASE"]) result = db.execute_sql("SELECT number, group_name FROM test") print("\n".join(map(str, list(result)))) PKPNzE00telstar/tests/expected.txt(1, 'validation') (1, 'validation2') (2, 'validation') (2, 'validation2') (3, 'validation') (3, 'validation2') (4, 'validation') (4, 'validation2') (5, 'validation') (5, 'validation2') (6, 'validation') (6, 'validation2') (7, 'validation') (7, 'validation2') (8, 'validation') (8, 'validation2') (9, 'validation') (9, 'validation2') (10, 'validation') (10, 'validation2') (11, 'validation') (11, 'validation2') (12, 'validation') (12, 'validation2') (13, 'validation') (13, 'validation2') (14, 'validation') (14, 'validation2') (15, 'validation') (15, 'validation2') (16, 'validation') (16, 'validation2') (17, 'validation') (17, 'validation2') (18, 'validation') (18, 'validation2') (19, 'validation') (19, 'validation2') (20, 'validation') (20, 'validation2') (21, 'validation') (21, 'validation2') (22, 'validation') (22, 'validation2') (23, 'validation') (23, 'validation2') (24, 'validation') (24, 'validation2') (25, 'validation') (25, 'validation2') (26, 'validation') (26, 'validation2') (27, 'validation') (27, 'validation2') (28, 'validation') (28, 'validation2') (29, 'validation') (29, 'validation2') (30, 'validation') (30, 'validation2') (31, 'validation') (31, 'validation2') (32, 'validation') (32, 'validation2') (33, 'validation') (33, 'validation2') (34, 'validation') (34, 'validation2') (35, 'validation') (35, 'validation2') (36, 'validation') (36, 'validation2') (37, 'validation') (37, 'validation2') (38, 'validation') (38, 'validation2') (39, 'validation') (39, 'validation2') (40, 'validation') (40, 'validation2') (41, 'validation') (41, 'validation2') (42, 'validation') (42, 'validation2') (43, 'validation') (43, 'validation2') (44, 'validation') (44, 'validation2') (45, 'validation') (45, 'validation2') (46, 'validation') (46, 'validation2') (47, 'validation') (47, 'validation2') (48, 'validation') (48, 'validation2') (49, 'validation') (49, 'validation2') (50, 'validation') (50, 'validation2') (51, 'validation') (51, 'validation2') (52, 'validation') (52, 'validation2') (53, 'validation') (53, 'validation2') (54, 'validation') (54, 'validation2') (55, 'validation') (55, 'validation2') (56, 'validation') (56, 'validation2') (57, 'validation') (57, 'validation2') (58, 'validation') (58, 'validation2') (59, 'validation') (59, 'validation2') (60, 'validation') (60, 'validation2') (61, 'validation') (61, 'validation2') (62, 'validation') (62, 'validation2') (63, 'validation') (63, 'validation2') (64, 'validation') (64, 'validation2') (65, 'validation') (65, 'validation2') (66, 'validation') (66, 'validation2') (67, 'validation') (67, 'validation2') (68, 'validation') (68, 'validation2') (69, 'validation') (69, 'validation2') (70, 'validation') (70, 'validation2') (71, 'validation') (71, 'validation2') (72, 'validation') (72, 'validation2') (73, 'validation') (73, 'validation2') (74, 'validation') (74, 'validation2') (75, 'validation') (75, 'validation2') (76, 'validation') (76, 'validation2') (77, 'validation') (77, 'validation2') (78, 'validation') (78, 'validation2') (79, 'validation') (79, 'validation2') (80, 'validation') (80, 'validation2') (81, 'validation') (81, 'validation2') (82, 'validation') (82, 'validation2') (83, 'validation') (83, 'validation2') (84, 'validation') (84, 'validation2') (85, 'validation') (85, 'validation2') (86, 'validation') (86, 'validation2') (87, 'validation') (87, 'validation2') (88, 'validation') (88, 'validation2') (89, 'validation') (89, 'validation2') (90, 'validation') (90, 'validation2') (91, 'validation') (91, 'validation2') (92, 'validation') (92, 'validation2') (93, 'validation') (93, 'validation2') (94, 'validation') (94, 'validation2') (95, 'validation') (95, 'validation2') (96, 'validation') (96, 'validation2') (97, 'validation') (97, 'validation2') (98, 'validation') (98, 'validation2') (99, 'validation') (99, 'validation2') (100, 'validation') (100, 'validation2') (101, 'validation') (101, 'validation2') (102, 'validation') (102, 'validation2') (103, 'validation') (103, 'validation2') (104, 'validation') (104, 'validation2') (105, 'validation') (105, 'validation2') (106, 'validation') (106, 'validation2') (107, 'validation') (107, 'validation2') (108, 'validation') (108, 'validation2') (109, 'validation') (109, 'validation2') (110, 'validation') (110, 'validation2') (111, 'validation') (111, 'validation2') (112, 'validation') (112, 'validation2') (113, 'validation') (113, 'validation2') (114, 'validation') (114, 'validation2') (115, 'validation') (115, 'validation2') (116, 'validation') (116, 'validation2') (117, 'validation') (117, 'validation2') (118, 'validation') (118, 'validation2') (119, 'validation') (119, 'validation2') (120, 'validation') (120, 'validation2') (121, 'validation') (121, 'validation2') (122, 'validation') (122, 'validation2') (123, 'validation') (123, 'validation2') (124, 'validation') (124, 'validation2') (125, 'validation') (125, 'validation2') (126, 'validation') (126, 'validation2') (127, 'validation') (127, 'validation2') (128, 'validation') (128, 'validation2') (129, 'validation') (129, 'validation2') (130, 'validation') (130, 'validation2') (131, 'validation') (131, 'validation2') (132, 'validation') (132, 'validation2') (133, 'validation') (133, 'validation2') (134, 'validation') (134, 'validation2') (135, 'validation') (135, 'validation2') (136, 'validation') (136, 'validation2') (137, 'validation') (137, 'validation2') (138, 'validation') (138, 'validation2') (139, 'validation') (139, 'validation2') (140, 'validation') (140, 'validation2') (141, 'validation') (141, 'validation2') (142, 'validation') (142, 'validation2') (143, 'validation') (143, 'validation2') (144, 'validation') (144, 'validation2') (145, 'validation') (145, 'validation2') (146, 'validation') (146, 'validation2') (147, 'validation') (147, 'validation2') (148, 'validation') (148, 'validation2') (149, 'validation') (149, 'validation2') (150, 'validation') (150, 'validation2') (151, 'validation') (151, 'validation2') (152, 'validation') (152, 'validation2') (153, 'validation') (153, 'validation2') (154, 'validation') (154, 'validation2') (155, 'validation') (155, 'validation2') (156, 'validation') (156, 'validation2') (157, 'validation') (157, 'validation2') (158, 'validation') (158, 'validation2') (159, 'validation') (159, 'validation2') (160, 'validation') (160, 'validation2') (161, 'validation') (161, 'validation2') (162, 'validation') (162, 'validation2') (163, 'validation') (163, 'validation2') (164, 'validation') (164, 'validation2') (165, 'validation') (165, 'validation2') (166, 'validation') (166, 'validation2') (167, 'validation') (167, 'validation2') (168, 'validation') (168, 'validation2') (169, 'validation') (169, 'validation2') (170, 'validation') (170, 'validation2') (171, 'validation') (171, 'validation2') (172, 'validation') (172, 'validation2') (173, 'validation') (173, 'validation2') (174, 'validation') (174, 'validation2') (175, 'validation') (175, 'validation2') (176, 'validation') (176, 'validation2') (177, 'validation') (177, 'validation2') (178, 'validation') (178, 'validation2') (179, 'validation') (179, 'validation2') (180, 'validation') (180, 'validation2') (181, 'validation') (181, 'validation2') (182, 'validation') (182, 'validation2') (183, 'validation') (183, 'validation2') (184, 'validation') (184, 'validation2') (185, 'validation') (185, 'validation2') (186, 'validation') (186, 'validation2') (187, 'validation') (187, 'validation2') (188, 'validation') (188, 'validation2') (189, 'validation') (189, 'validation2') (190, 'validation') (190, 'validation2') (191, 'validation') (191, 'validation2') (192, 'validation') (192, 'validation2') (193, 'validation') (193, 'validation2') (194, 'validation') (194, 'validation2') (195, 'validation') (195, 'validation2') (196, 'validation') (196, 'validation2') (197, 'validation') (197, 'validation2') (198, 'validation') (198, 'validation2') (199, 'validation') (199, 'validation2') (200, 'validation') (200, 'validation2') PKPNTf__telstar/tests/test_consumer.pyimport os import sys import random from telstar.consumer import Consumer from telstar import Message import redis from time import sleep import peewee from playhouse.db_url import connect r = redis.Redis(host=os.environ.get("REDIS_HOST"), port=os.environ.get("REDIS_PORT"), password=os.environ.get("REDIS_PASSWORD"), db=int(os.environ.get("REDIS_DB"))) db = connect(os.environ["DATABASE"]) db.connect() class Test(peewee.Model): number = peewee.IntegerField() group_name = peewee.CharField() class Meta: indexes = ( (('number', 'group_name'), True), ) database = db if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "setup": print("Recreating table in order to start from scratch") db.drop_tables([Test]) db.create_tables([Test]) def simple(consumer, record: Message, done): with db.atomic(): Test.create(number=int(record.data["value"]), group_name=consumer.group_name) sleep(random.randrange(int(os.environ.get("SLEEPINESS"))) / 100) done() Consumer(link=r, stream_name=os.environ.get("STREAM_NAME"), group_name=os.environ.get("GROUP_NAME"), consumer_name=os.environ.get("CONSUMER_NAME"), processor_fn=simple).run() PKPNӭetelstar/tests/test_producer.pyfrom playhouse.db_url import connect import os import json import redis import peewee import sys import uuid import random from time import sleep from telstar.producer import Producer, Message from typing import List, Tuple, Callable class JSONField(peewee.TextField): def db_value(self, value): return json.dumps(value) def python_value(self, value): if value is not None: return json.loads(value) r = redis.Redis(host=os.environ.get("REDIS_HOST"), port=os.environ.get("REDIS_PORT"), password=os.environ.get("REDIS_PASSWORD"), db=int(os.environ["REDIS_DB"])) db = connect(os.environ["DATABASE"]) db.connect() class Events(peewee.Model): msg_uid = peewee.UUIDField(default=uuid.uuid4) topic = peewee.CharField() data = JSONField() class Meta: database = db if __name__ == "__main__": if len(sys.argv) > 1: if sys.argv[1] == "setup": print("Recreating table in order to start from scratch") db.drop_tables([Events]) db.create_tables([Events]) if "create" in sys.argv[1:]: for i in range(int(os.environ["RANGE_FROM"]), int(os.environ["RANGE_TO"])): Events.create(topic="test", data=dict(value=i)) def puller() -> Tuple[List[Message], Callable[[], None]]: qs = Events.select().order_by(peewee.fn.RAND()).limit(5) msgs = [Message(e.topic, e.msg_uid, e.data) for e in qs] def done(): if not os.environ.get("KEEPEVENTS"): for r in qs: r.delete_instance() sleep(random.randrange(int(os.environ.get("SLEEPINESS"))) / 10) return msgs, done print("starting") Producer(r, puller, context_callable=db.atomic).run() PKPN telstar/tests/test_telstar.sh#!/bin/bash # This script is used to verify/test a assumpations and guaranteesabout the system. # # Todos: # * kill redis in the process # * kill mysql in the process # * use more fuzzing - meaning kill and start processes more randomly to simulate failure # * use this inside a CI # * improve the sleeping, maybe look at how we can wait until all pending messages are done and then make final diff. readonly SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )" function finish() { jobs -p | xargs kill } # Upon exit kill all child procceses trap finish SIGINT trap finish EXIT # Configuration export STREAM_NAME="${STREAM_NAME:-test}" export GROUP_NAME="${GROUP_NAME:-validation}" export SLEEPINESS="${SLEEPINESS:-10}" export REDIS_PASSWORD="${REDIS_PASSWORD:-}" export REDIS_PORT="${REDIS_PORT:-6379}" export REDIS_HOST="${REDIS_HOST:-localhost}" export REDIS_DB="${REDIS_DB:-10}" export DATABASE="${DATABASE:-mysql://root:root@127.0.0.1:3306/test}" export PYTHONPATH="${PYTHONPATH}:${SCRIPTPATH}../}" if [ -x "$(command -v redis-cli)" ]; then # Clear redis only if available echo "flushing" redis-cli -n $REDIS_DB FLUSHDB fi # Start the first customer with `create` which drops and creates the needed tables in mysql CONSUMER_NAME=1 python $SCRIPTPATH/test_consumer.py setup & CONSUMER_1=$! # This saves the PID of the last command - which we can use to `kill` the process later # What for the creation to be done sleep 2 # Start more consumers CONSUMER_NAME=2 python $SCRIPTPATH/test_consumer.py & CONSUMER_2=$! CONSUMER_NAME=3 python $SCRIPTPATH/test_consumer.py & CONSUMER_3=$! # Create a producer that double sends messages because it does not delete them after sending. KEEPEVENTS=1 RANGE_FROM=1 RANGE_TO=101 python $SCRIPTPATH/test_producer.py setup create & PRODUCER_1=$! sleep 2 # Create another producer that keeps double sending KEEPEVENTS=1 python $SCRIPTPATH/test_producer.py & PRODUCER_2=$! # Since CONSUMER_1 is in the background in already has consumed some messages from the stream kill $CONSUMER_1 # Wait some more for CONSUMER_2 and CONSUMER_3 to process more data. sleep 2 # Let's kill the producer while it is sending kill $PRODUCER_1 kill $PRODUCER_2 # Now we restart CONSUMER_1 CONSUMER_NAME=1 python $SCRIPTPATH/test_consumer.py & CONSUMER_1=$! # Let all consumers process a bit more data sleep 5 # Kill more consumers kill $CONSUMER_1 kill $CONSUMER_3 # Oops all consumers are dead now kill $CONSUMER_2 # Create another producer that generates the rest of the data but now also delete what was send already RANGE_FROM=101 RANGE_TO=201 python $SCRIPTPATH/test_producer.py create & PRODUCER_3=$! # Start a new one CONSUMER_NAME=4 python $SCRIPTPATH/test_consumer.py & CONSUMER_4=$! # Wait for `CONSUMER_4` to process all data sleep 30 kill $CONSUMER_4 kill $PRODUCER_3 # Restart `CONSUMER_4` and kill it later CONSUMER_NAME=4 python $SCRIPTPATH/test_consumer.py & CONSUMER_4=$! sleep 30 kill $CONSUMER_4 # Create a new consumer group that reads everything from the beginning of time. SLEEPINESS=1 CONSUMER_NAME=4 GROUP_NAME=validation2 python $SCRIPTPATH/test_consumer.py & NEW_CONSUMER=$! sleep 10 kill $NEW_CONSUMER # Verify that the database hold the expected records python $SCRIPTPATH/_dbcontent.py | diff $SCRIPTPATH/expected.txt - || exit 1PKoONe33telstar-0.0.1.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2019 Bitspark Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HMuSatelstar-0.0.1.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UD"PK!Hv5M telstar-0.0.1.dist-info/METADATAM0}} zIQ+]Ҧ-&5Amf(ˆٕ|v+&ѐHcHAXCBIdctA]\;閦Ee}coRZ Q&mov#r!nP:W7PK!HFtelstar-0.0.1.dist-info/RECORDurP}*BP tUL{sg:(v(h_}?2fº4 iF3eQ4?+diSri)9} M4>o.Xro.\2ppǼiYwpgC#yj:ÿm[4? |14Rc=laqW,w{{>WCߘn*˒abJ?8JJ:о+^_95F {([ ÿng̉ PheꐓM8Z׃0Uւ%q|HkJ+\coPKPND]telstar/__init__.pyPKPN[B