PKMyN{==telstar/__init__.py""" Telstar is a package to write producer and consumers groups against redis streams. """ from .com import StagedMessage __version__ = "0.1.0" def stage(topic, data): e = StagedMessage.create(topic=topic, data=data) return e.msg_uid def staged(): return [e.to_msg() for e in StagedMessage.unsent()] PKMyNWR$$telstar/consumer.pyimport json import uuid from functools import partial import redis from .com import Message # An important concept to understand here is the consumer group which give us the following consumer properties: # msg -> consumer # # msg:1 -> cg-userSignUp.1 # msg:2 -> cg-userSignUp.2 # msg:3 -> cg-userSignUp.3 # msg:4 -> cg-userSignUp.1 # msg:5 -> cg-userSignUp.2 # msg:6 -> cg-userSignUp.3 # msg:7 -> cg-userSignUp.1 # # Which basically means that inside a group a single consumer will only get message the others have not yet seen. # For a deep dive read to this -> https://redis.io/topics/streams-intro # This allows us to create N consumers w/o needing to figure out which message already has been processed. class MultiConsumer(object): @staticmethod def increment(id): # IDs are of the form "1509473251518-0" and comprise a millisecond # timestamp plus a sequence number to differentiate within the timestamp. time, sequence = id.decode("ascii").split("-") if not sequence: raise Exception("Argument error, {id} has wrong format not #-#") next_sequence = int(sequence) + 1 return bytes(f"{time}-{next_sequence}", "ascii") @staticmethod def decrement(id): time, sequence = id.decode("ascii").split("-") if not sequence: raise Exception("Argument error, {id} has wrong format not #-#") sequence = int(sequence) time = int(time) if sequence == 0: time = time - 1 else: sequence = int(sequence) - 1 return bytes(f"{time}-{sequence}", "ascii") def __init__(self, link: redis.Redis, group_name: str, consumer_name: str, config: dict, block=2000, claim_the_dead_after=20 * 1000): self.link = link self.block = block self.claim_the_dead_after = claim_the_dead_after self.consumer_name = consumer_name self.group_name = group_name self.processors = {f"telstar:stream:{stream_name}": fn for stream_name, fn in config.items()} self.streams = self.processors.keys() for stream_name in self.streams: self.create_consumer_group(stream_name) def get_consumer_name(self, stream): return f"cg:{self.group_name}:{self.consumer_name}" def _seen_key(self, msg: Message): return f"telstar:seen:{msg.stream}:{self.group_name}:{msg.msg_uuid}" def _checkpoint_key(self, stream: str): return f"telstar:checkpoint:{stream}:{self.get_consumer_name(stream)}" # A new consumer group for the given stream, if the stream does not exist yet # create one (`mkstream`) - if it does we want all messages present `id=0` def create_consumer_group(self, stream_name: str): try: self.link.xgroup_create(stream_name, self.group_name, mkstream=True, id="0") except redis.exceptions.ResponseError: pass # In consumer groups, consumers can disappear, when they do they can leave non ack'ed message # which we want to claim and be delivered to a new consumer def claim_message_from_the_dead(self, stream_name: str): # Get information about all consumers in the group and how many messages are pending pending_info = self.link.xpending(stream_name, self.group_name) # {'pending': 10, # 'min': b'1560032216285-0', # 'max': b'1560032942270-0', # 'consumers': [{'name': b'cg-userSignUp.1', 'pending': 10}]} # Nothing to do if pending_info["pending"] == 0: return # Get all messages ids within that range and select the ones we want to claim and claim them # But only if they are pending for more than 20secs. pending_messages = self.link.xpending_range(stream_name, self.group_name, pending_info["min"], pending_info["max"], pending_info["pending"]) # [ # {'message_id': b'1560194528886-0', # 'consumer': b'cg-userSignUp.1', # 'time_since_delivered': 22020, # 'times_delivered': 1} # ...] messages_to_claim = [p["message_id"] for p in pending_messages] if not messages_to_claim: # The pending messages are all our own no need to claim anything # This can happen when we simply restart a consumer with the same name return # It might be cheaper to claim *and* receive the message so we can work on them directly # w/o catching up through the history with the potential of a lot of already seen keys. claimed_messages = self.link.xclaim(stream_name, self.group_name, self.consumer_name, self.claim_the_dead_after, messages_to_claim, justid=True) return claimed_messages # We claim the message from other dead/non-responsive consumers. # When new message have been claimed they are usually from the past # which means in order to process them we need to start processing our history. def transfer_and_process_stream_history(self, streams: list): last_seen = dict() for stream_name in streams: last_seen[stream_name] = self.get_last_seen_id(stream_name) stream_msg_ids = self.claim_message_from_the_dead(stream_name) if stream_msg_ids: # if there are message that we have claimed we need to determine where to start processing # because we can't just wait for new message to arrive. before_earliest = self.decrement(min(stream_msg_ids)) next_after_seen = self.increment(last_seen[stream_name]) last_seen[stream_name] = min([before_earliest, next_after_seen]) # Read all message for the past up until now. self.catchup(last_seen) # This is the main loop where we start from the history # and claim message and reprocess our history. # We also loop the transfer_and_process_history as other consumers might have died while we waited def run(self): while True: self._once() def _once(self): self.transfer_and_process_stream_history(self.streams) # With our history processes we can now start waiting for new message to arrive `>` config = {k: ">" for k in self.streams} self.read(config, block=self.block) def get_last_seen_id(self, stream_name: str): check_point_key = self._checkpoint_key(stream_name) return self.link.get(check_point_key) or b"0-0" # Multiple things are happening here. # 1. Save the stream_msg_id as checkpoint, which means # that we know where to start should the consumer be restarted # 2. Each message has a UUID and in order to process each meassage only once we remember # the UUID for 14 days # 3. Acknowledge the message to meaning that we have processed it def acknowledge(self, msg: Message, stream_msg_id): check_point_key = self._checkpoint_key(f"telstar:stream:{msg.stream}") seen_key = self._seen_key(msg) # Execute the following statments in a transaction e.g. redis speak `pipeline` pipe = self.link.pipeline() # If this key changes before we execute the pipeline than the ack fails and this the processor reverts all the work. # Which is exactly what we want in this case as the work has already been completed by another consumer. pipe.watch(seen_key) # Mark this as a seen key for 14 Days meaning if the message reappears after 14 days we reprocess it pipe.set(seen_key, 1, ex=14 * 24 * 60 * 60) # 14 days # Set the checkpoint for this consumer so that it knows where to start agains once it restarts. pipe.set(check_point_key, stream_msg_id) # Acknowledge the actual message pipe.xack(f"telstar:stream:{msg.stream}", self.group_name, stream_msg_id) pipe.execute() def work(self, stream_name, stream_msg_id, record): msg = Message(stream_name, uuid.UUID(record[Message.IDFieldName].decode("ascii")), json.loads(record[Message.DataFieldName])) done = partial(self.acknowledge, msg, stream_msg_id) key = self._seen_key(msg) if self.link.get(key): # This is a double send return done() self.processors[stream_name.decode("ascii")](self, msg, done) # Process all message from `start` def catchup(self, streams): return self._xreadgroup(streams) # Process wait for new messages def read(self, streams, block): return self._xreadgroup(streams, block=block) def _xreadgroup(self, streams, block=0): processed = 0 value = self.link.xreadgroup(self.group_name, self.consumer_name, streams, block=block) if not value: return 0 for stream_name, records in value: for record in records: stream_msg_id, record = record self.work(stream_name, stream_msg_id, record) processed = processed + 1 return processed class Consumer(MultiConsumer): def __init__(self, link, group_name, consumer_name, stream_name, processor_fn): super().__init__(link, group_name, consumer_name, {stream_name: processor_fn}) PKMyNntelstar/producer.pyimport json from time import sleep from typing import Callable, List, Tuple from .com import Message from .com import StagedMessage class Producer(object): def __init__(self, link, get_records: Callable[[], Tuple[List[Message], Callable[[], None]]], context_callable=None): self.link = link self.get_records = get_records self.context_callable = context_callable def run_once(self): records, done = self.get_records() for record in records: self.send(record) done() def run(self): while True: if callable(self.context_callable): with self.context_callable(): self.run_once() else: self.run_once() def send(self, msg: Message): self.link.xadd(f"telstar:stream:{msg.stream}", { Message.IDFieldName: str(msg.msg_uuid), Message.DataFieldName: json.dumps(msg.data)}) class StagedProducer(Producer): def __init__(self, link, database, batch_size=5, wait=0.5): self.batch_size = batch_size self.wait = wait StagedMessage.bind(database) super().__init__(link, self.create_puller(), StagedMessage._meta.database.atomic) def create_puller(self): def puller() -> Tuple[List[Message], Callable[[], None]]: qs = StagedMessage.unsent().limit(self.batch_size) msgs = [e.to_msg() for e in qs] def done(): ids = list(map(lambda l: l.id, qs)) if ids: StagedMessage.update(sent=True).where(StagedMessage.id in ids).execute() sleep(self.wait) return msgs, done return puller PKMyNtelstar/com/__init__.pyimport json import uuid import peewee from datetime import datetime class TelstarEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, datetime): return o.isoformat() if isinstance(o, uuid.UUID): return str(o) return json.JSONEncoder.default(self, o) class JSONField(peewee.TextField): def db_value(self, value): return json.dumps(value, cls=TelstarEncoder) def python_value(self, value): if value is not None: return json.loads(value) class StagedMessage(peewee.Model): msg_uid = peewee.UUIDField(default=uuid.uuid4, index=True) topic = peewee.CharField(index=True) data = JSONField() sent = peewee.BooleanField(default=False, index=True) created_at = peewee.TimestampField(resolution=10**3) @classmethod def unsent(cls): return cls.select().where(cls.sent == False) # noqa def to_msg(self): return Message(self.topic, self.msg_uid, self.data) class Message(object): IDFieldName = b"message_id" DataFieldName = b"data" def __init__(self, stream: str, msg_uuid: uuid.UUID, data: dict): if not isinstance(msg_uuid, uuid.UUID): raise TypeError(f"msg_uuid needs to be uuid.UUID not {type(msg_uuid)}") if isinstance(stream, bytes): stream = stream.decode("ascii") self.stream = stream.replace("telstar:stream:", "") self.msg_uuid = msg_uuid self.data = data PKMyNtelstar/tests/_dbcontent.pyimport os from playhouse.db_url import connect db = connect(os.environ["DATABASE"]) result = db.execute_sql("SELECT number, group_name, topic FROM test") print("\n".join(map(str, list(result)))) PKMyNj`DKDKtelstar/tests/expected.txt(1, 'validation', 'mystream') (1, 'validation', 'mystream2') (1, 'validation2', 'mystream') (1, 'validation2', 'mystream2') (2, 'validation', 'mystream') (2, 'validation', 'mystream2') (2, 'validation2', 'mystream') (2, 'validation2', 'mystream2') (3, 'validation', 'mystream') (3, 'validation', 'mystream2') (3, 'validation2', 'mystream') (3, 'validation2', 'mystream2') (4, 'validation', 'mystream') (4, 'validation', 'mystream2') (4, 'validation2', 'mystream') (4, 'validation2', 'mystream2') (5, 'validation', 'mystream') (5, 'validation', 'mystream2') (5, 'validation2', 'mystream') (5, 'validation2', 'mystream2') (6, 'validation', 'mystream') (6, 'validation', 'mystream2') (6, 'validation2', 'mystream') (6, 'validation2', 'mystream2') (7, 'validation', 'mystream') (7, 'validation', 'mystream2') (7, 'validation2', 'mystream') (7, 'validation2', 'mystream2') (8, 'validation', 'mystream') (8, 'validation', 'mystream2') (8, 'validation2', 'mystream') (8, 'validation2', 'mystream2') (9, 'validation', 'mystream') (9, 'validation', 'mystream2') (9, 'validation2', 'mystream') (9, 'validation2', 'mystream2') (10, 'validation', 'mystream') (10, 'validation', 'mystream2') (10, 'validation2', 'mystream') (10, 'validation2', 'mystream2') (11, 'validation', 'mystream') (11, 'validation', 'mystream2') (11, 'validation2', 'mystream') (11, 'validation2', 'mystream2') (12, 'validation', 'mystream') (12, 'validation', 'mystream2') (12, 'validation2', 'mystream') (12, 'validation2', 'mystream2') (13, 'validation', 'mystream') (13, 'validation', 'mystream2') (13, 'validation2', 'mystream') (13, 'validation2', 'mystream2') (14, 'validation', 'mystream') (14, 'validation', 'mystream2') (14, 'validation2', 'mystream') (14, 'validation2', 'mystream2') (15, 'validation', 'mystream') (15, 'validation', 'mystream2') (15, 'validation2', 'mystream') (15, 'validation2', 'mystream2') (16, 'validation', 'mystream') (16, 'validation', 'mystream2') (16, 'validation2', 'mystream') (16, 'validation2', 'mystream2') (17, 'validation', 'mystream') (17, 'validation', 'mystream2') (17, 'validation2', 'mystream') (17, 'validation2', 'mystream2') (18, 'validation', 'mystream') (18, 'validation', 'mystream2') (18, 'validation2', 'mystream') (18, 'validation2', 'mystream2') (19, 'validation', 'mystream') (19, 'validation', 'mystream2') (19, 'validation2', 'mystream') (19, 'validation2', 'mystream2') (20, 'validation', 'mystream') (20, 'validation', 'mystream2') (20, 'validation2', 'mystream') (20, 'validation2', 'mystream2') (21, 'validation', 'mystream') (21, 'validation', 'mystream2') (21, 'validation2', 'mystream') (21, 'validation2', 'mystream2') (22, 'validation', 'mystream') (22, 'validation', 'mystream2') (22, 'validation2', 'mystream') (22, 'validation2', 'mystream2') (23, 'validation', 'mystream') (23, 'validation', 'mystream2') (23, 'validation2', 'mystream') (23, 'validation2', 'mystream2') (24, 'validation', 'mystream') (24, 'validation', 'mystream2') (24, 'validation2', 'mystream') (24, 'validation2', 'mystream2') (25, 'validation', 'mystream') (25, 'validation', 'mystream2') (25, 'validation2', 'mystream') (25, 'validation2', 'mystream2') (26, 'validation', 'mystream') (26, 'validation', 'mystream2') (26, 'validation2', 'mystream') (26, 'validation2', 'mystream2') (27, 'validation', 'mystream') (27, 'validation', 'mystream2') (27, 'validation2', 'mystream') (27, 'validation2', 'mystream2') (28, 'validation', 'mystream') (28, 'validation', 'mystream2') (28, 'validation2', 'mystream') (28, 'validation2', 'mystream2') (29, 'validation', 'mystream') (29, 'validation', 'mystream2') (29, 'validation2', 'mystream') (29, 'validation2', 'mystream2') (30, 'validation', 'mystream') (30, 'validation', 'mystream2') (30, 'validation2', 'mystream') (30, 'validation2', 'mystream2') (31, 'validation', 'mystream') (31, 'validation', 'mystream2') (31, 'validation2', 'mystream') (31, 'validation2', 'mystream2') (32, 'validation', 'mystream') (32, 'validation', 'mystream2') (32, 'validation2', 'mystream') (32, 'validation2', 'mystream2') (33, 'validation', 'mystream') (33, 'validation', 'mystream2') (33, 'validation2', 'mystream') (33, 'validation2', 'mystream2') (34, 'validation', 'mystream') (34, 'validation', 'mystream2') (34, 'validation2', 'mystream') (34, 'validation2', 'mystream2') (35, 'validation', 'mystream') (35, 'validation', 'mystream2') (35, 'validation2', 'mystream') (35, 'validation2', 'mystream2') (36, 'validation', 'mystream') (36, 'validation', 'mystream2') (36, 'validation2', 'mystream') (36, 'validation2', 'mystream2') (37, 'validation', 'mystream') (37, 'validation', 'mystream2') (37, 'validation2', 'mystream') (37, 'validation2', 'mystream2') (38, 'validation', 'mystream') (38, 'validation', 'mystream2') (38, 'validation2', 'mystream') (38, 'validation2', 'mystream2') (39, 'validation', 'mystream') (39, 'validation', 'mystream2') (39, 'validation2', 'mystream') (39, 'validation2', 'mystream2') (40, 'validation', 'mystream') (40, 'validation', 'mystream2') (40, 'validation2', 'mystream') (40, 'validation2', 'mystream2') (41, 'validation', 'mystream') (41, 'validation', 'mystream2') (41, 'validation2', 'mystream') (41, 'validation2', 'mystream2') (42, 'validation', 'mystream') (42, 'validation', 'mystream2') (42, 'validation2', 'mystream') (42, 'validation2', 'mystream2') (43, 'validation', 'mystream') (43, 'validation', 'mystream2') (43, 'validation2', 'mystream') (43, 'validation2', 'mystream2') (44, 'validation', 'mystream') (44, 'validation', 'mystream2') (44, 'validation2', 'mystream') (44, 'validation2', 'mystream2') (45, 'validation', 'mystream') (45, 'validation', 'mystream2') (45, 'validation2', 'mystream') (45, 'validation2', 'mystream2') (46, 'validation', 'mystream') (46, 'validation', 'mystream2') (46, 'validation2', 'mystream') (46, 'validation2', 'mystream2') (47, 'validation', 'mystream') (47, 'validation', 'mystream2') (47, 'validation2', 'mystream') (47, 'validation2', 'mystream2') (48, 'validation', 'mystream') (48, 'validation', 'mystream2') (48, 'validation2', 'mystream') (48, 'validation2', 'mystream2') (49, 'validation', 'mystream') (49, 'validation', 'mystream2') (49, 'validation2', 'mystream') (49, 'validation2', 'mystream2') (50, 'validation', 'mystream') (50, 'validation', 'mystream2') (50, 'validation2', 'mystream') (50, 'validation2', 'mystream2') (51, 'validation', 'mystream') (51, 'validation', 'mystream2') (51, 'validation2', 'mystream') (51, 'validation2', 'mystream2') (52, 'validation', 'mystream') (52, 'validation', 'mystream2') (52, 'validation2', 'mystream') (52, 'validation2', 'mystream2') (53, 'validation', 'mystream') (53, 'validation', 'mystream2') (53, 'validation2', 'mystream') (53, 'validation2', 'mystream2') (54, 'validation', 'mystream') (54, 'validation', 'mystream2') (54, 'validation2', 'mystream') (54, 'validation2', 'mystream2') (55, 'validation', 'mystream') (55, 'validation', 'mystream2') (55, 'validation2', 'mystream') (55, 'validation2', 'mystream2') (56, 'validation', 'mystream') (56, 'validation', 'mystream2') (56, 'validation2', 'mystream') (56, 'validation2', 'mystream2') (57, 'validation', 'mystream') (57, 'validation', 'mystream2') (57, 'validation2', 'mystream') (57, 'validation2', 'mystream2') (58, 'validation', 'mystream') (58, 'validation', 'mystream2') (58, 'validation2', 'mystream') (58, 'validation2', 'mystream2') (59, 'validation', 'mystream') (59, 'validation', 'mystream2') (59, 'validation2', 'mystream') (59, 'validation2', 'mystream2') (60, 'validation', 'mystream') (60, 'validation', 'mystream2') (60, 'validation2', 'mystream') (60, 'validation2', 'mystream2') (61, 'validation', 'mystream') (61, 'validation', 'mystream2') (61, 'validation2', 'mystream') (61, 'validation2', 'mystream2') (62, 'validation', 'mystream') (62, 'validation', 'mystream2') (62, 'validation2', 'mystream') (62, 'validation2', 'mystream2') (63, 'validation', 'mystream') (63, 'validation', 'mystream2') (63, 'validation2', 'mystream') (63, 'validation2', 'mystream2') (64, 'validation', 'mystream') (64, 'validation', 'mystream2') (64, 'validation2', 'mystream') (64, 'validation2', 'mystream2') (65, 'validation', 'mystream') (65, 'validation', 'mystream2') (65, 'validation2', 'mystream') (65, 'validation2', 'mystream2') (66, 'validation', 'mystream') (66, 'validation', 'mystream2') (66, 'validation2', 'mystream') (66, 'validation2', 'mystream2') (67, 'validation', 'mystream') (67, 'validation', 'mystream2') (67, 'validation2', 'mystream') (67, 'validation2', 'mystream2') (68, 'validation', 'mystream') (68, 'validation', 'mystream2') (68, 'validation2', 'mystream') (68, 'validation2', 'mystream2') (69, 'validation', 'mystream') (69, 'validation', 'mystream2') (69, 'validation2', 'mystream') (69, 'validation2', 'mystream2') (70, 'validation', 'mystream') (70, 'validation', 'mystream2') (70, 'validation2', 'mystream') (70, 'validation2', 'mystream2') (71, 'validation', 'mystream') (71, 'validation', 'mystream2') (71, 'validation2', 'mystream') (71, 'validation2', 'mystream2') (72, 'validation', 'mystream') (72, 'validation', 'mystream2') (72, 'validation2', 'mystream') (72, 'validation2', 'mystream2') (73, 'validation', 'mystream') (73, 'validation', 'mystream2') (73, 'validation2', 'mystream') (73, 'validation2', 'mystream2') (74, 'validation', 'mystream') (74, 'validation', 'mystream2') (74, 'validation2', 'mystream') (74, 'validation2', 'mystream2') (75, 'validation', 'mystream') (75, 'validation', 'mystream2') (75, 'validation2', 'mystream') (75, 'validation2', 'mystream2') (76, 'validation', 'mystream') (76, 'validation', 'mystream2') (76, 'validation2', 'mystream') (76, 'validation2', 'mystream2') (77, 'validation', 'mystream') (77, 'validation', 'mystream2') (77, 'validation2', 'mystream') (77, 'validation2', 'mystream2') (78, 'validation', 'mystream') (78, 'validation', 'mystream2') (78, 'validation2', 'mystream') (78, 'validation2', 'mystream2') (79, 'validation', 'mystream') (79, 'validation', 'mystream2') (79, 'validation2', 'mystream') (79, 'validation2', 'mystream2') (80, 'validation', 'mystream') (80, 'validation', 'mystream2') (80, 'validation2', 'mystream') (80, 'validation2', 'mystream2') (81, 'validation', 'mystream') (81, 'validation', 'mystream2') (81, 'validation2', 'mystream') (81, 'validation2', 'mystream2') (82, 'validation', 'mystream') (82, 'validation', 'mystream2') (82, 'validation2', 'mystream') (82, 'validation2', 'mystream2') (83, 'validation', 'mystream') (83, 'validation', 'mystream2') (83, 'validation2', 'mystream') (83, 'validation2', 'mystream2') (84, 'validation', 'mystream') (84, 'validation', 'mystream2') (84, 'validation2', 'mystream') (84, 'validation2', 'mystream2') (85, 'validation', 'mystream') (85, 'validation', 'mystream2') (85, 'validation2', 'mystream') (85, 'validation2', 'mystream2') (86, 'validation', 'mystream') (86, 'validation', 'mystream2') (86, 'validation2', 'mystream') (86, 'validation2', 'mystream2') (87, 'validation', 'mystream') (87, 'validation', 'mystream2') (87, 'validation2', 'mystream') (87, 'validation2', 'mystream2') (88, 'validation', 'mystream') (88, 'validation', 'mystream2') (88, 'validation2', 'mystream') (88, 'validation2', 'mystream2') (89, 'validation', 'mystream') (89, 'validation', 'mystream2') (89, 'validation2', 'mystream') (89, 'validation2', 'mystream2') (90, 'validation', 'mystream') (90, 'validation', 'mystream2') (90, 'validation2', 'mystream') (90, 'validation2', 'mystream2') (91, 'validation', 'mystream') (91, 'validation', 'mystream2') (91, 'validation2', 'mystream') (91, 'validation2', 'mystream2') (92, 'validation', 'mystream') (92, 'validation', 'mystream2') (92, 'validation2', 'mystream') (92, 'validation2', 'mystream2') (93, 'validation', 'mystream') (93, 'validation', 'mystream2') (93, 'validation2', 'mystream') (93, 'validation2', 'mystream2') (94, 'validation', 'mystream') (94, 'validation', 'mystream2') (94, 'validation2', 'mystream') (94, 'validation2', 'mystream2') (95, 'validation', 'mystream') (95, 'validation', 'mystream2') (95, 'validation2', 'mystream') (95, 'validation2', 'mystream2') (96, 'validation', 'mystream') (96, 'validation', 'mystream2') (96, 'validation2', 'mystream') (96, 'validation2', 'mystream2') (97, 'validation', 'mystream') (97, 'validation', 'mystream2') (97, 'validation2', 'mystream') (97, 'validation2', 'mystream2') (98, 'validation', 'mystream') (98, 'validation', 'mystream2') (98, 'validation2', 'mystream') (98, 'validation2', 'mystream2') (99, 'validation', 'mystream') (99, 'validation', 'mystream2') (99, 'validation2', 'mystream') (99, 'validation2', 'mystream2') (100, 'validation', 'mystream') (100, 'validation', 'mystream2') (100, 'validation2', 'mystream') (100, 'validation2', 'mystream2') (101, 'validation', 'mystream') (101, 'validation2', 'mystream') (102, 'validation', 'mystream') (102, 'validation2', 'mystream') (103, 'validation', 'mystream') (103, 'validation2', 'mystream') (104, 'validation', 'mystream') (104, 'validation2', 'mystream') (105, 'validation', 'mystream') (105, 'validation2', 'mystream') (106, 'validation', 'mystream') (106, 'validation2', 'mystream') (107, 'validation', 'mystream') (107, 'validation2', 'mystream') (108, 'validation', 'mystream') (108, 'validation2', 'mystream') (109, 'validation', 'mystream') (109, 'validation2', 'mystream') (110, 'validation', 'mystream') (110, 'validation2', 'mystream') (111, 'validation', 'mystream') (111, 'validation2', 'mystream') (112, 'validation', 'mystream') (112, 'validation2', 'mystream') (113, 'validation', 'mystream') (113, 'validation2', 'mystream') (114, 'validation', 'mystream') (114, 'validation2', 'mystream') (115, 'validation', 'mystream') (115, 'validation2', 'mystream') (116, 'validation', 'mystream') (116, 'validation2', 'mystream') (117, 'validation', 'mystream') (117, 'validation2', 'mystream') (118, 'validation', 'mystream') (118, 'validation2', 'mystream') (119, 'validation', 'mystream') (119, 'validation2', 'mystream') (120, 'validation', 'mystream') (120, 'validation2', 'mystream') (121, 'validation', 'mystream') (121, 'validation2', 'mystream') (122, 'validation', 'mystream') (122, 'validation2', 'mystream') (123, 'validation', 'mystream') (123, 'validation2', 'mystream') (124, 'validation', 'mystream') (124, 'validation2', 'mystream') (125, 'validation', 'mystream') (125, 'validation2', 'mystream') (126, 'validation', 'mystream') (126, 'validation2', 'mystream') (127, 'validation', 'mystream') (127, 'validation2', 'mystream') (128, 'validation', 'mystream') (128, 'validation2', 'mystream') (129, 'validation', 'mystream') (129, 'validation2', 'mystream') (130, 'validation', 'mystream') (130, 'validation2', 'mystream') (131, 'validation', 'mystream') (131, 'validation2', 'mystream') (132, 'validation', 'mystream') (132, 'validation2', 'mystream') (133, 'validation', 'mystream') (133, 'validation2', 'mystream') (134, 'validation', 'mystream') (134, 'validation2', 'mystream') (135, 'validation', 'mystream') (135, 'validation2', 'mystream') (136, 'validation', 'mystream') (136, 'validation2', 'mystream') (137, 'validation', 'mystream') (137, 'validation2', 'mystream') (138, 'validation', 'mystream') (138, 'validation2', 'mystream') (139, 'validation', 'mystream') (139, 'validation2', 'mystream') (140, 'validation', 'mystream') (140, 'validation2', 'mystream') (141, 'validation', 'mystream') (141, 'validation2', 'mystream') (142, 'validation', 'mystream') (142, 'validation2', 'mystream') (143, 'validation', 'mystream') (143, 'validation2', 'mystream') (144, 'validation', 'mystream') (144, 'validation2', 'mystream') (145, 'validation', 'mystream') (145, 'validation2', 'mystream') (146, 'validation', 'mystream') (146, 'validation2', 'mystream') (147, 'validation', 'mystream') (147, 'validation2', 'mystream') (148, 'validation', 'mystream') (148, 'validation2', 'mystream') (149, 'validation', 'mystream') (149, 'validation2', 'mystream') (150, 'validation', 'mystream') (150, 'validation2', 'mystream') (151, 'validation', 'mystream') (151, 'validation2', 'mystream') (152, 'validation', 'mystream') (152, 'validation2', 'mystream') (153, 'validation', 'mystream') (153, 'validation2', 'mystream') (154, 'validation', 'mystream') (154, 'validation2', 'mystream') (155, 'validation', 'mystream') (155, 'validation2', 'mystream') (156, 'validation', 'mystream') (156, 'validation2', 'mystream') (157, 'validation', 'mystream') (157, 'validation2', 'mystream') (158, 'validation', 'mystream') (158, 'validation2', 'mystream') (159, 'validation', 'mystream') (159, 'validation2', 'mystream') (160, 'validation', 'mystream') (160, 'validation2', 'mystream') (161, 'validation', 'mystream') (161, 'validation2', 'mystream') (162, 'validation', 'mystream') (162, 'validation2', 'mystream') (163, 'validation', 'mystream') (163, 'validation2', 'mystream') (164, 'validation', 'mystream') (164, 'validation2', 'mystream') (165, 'validation', 'mystream') (165, 'validation2', 'mystream') (166, 'validation', 'mystream') (166, 'validation2', 'mystream') (167, 'validation', 'mystream') (167, 'validation2', 'mystream') (168, 'validation', 'mystream') (168, 'validation2', 'mystream') (169, 'validation', 'mystream') (169, 'validation2', 'mystream') (170, 'validation', 'mystream') (170, 'validation2', 'mystream') (171, 'validation', 'mystream') (171, 'validation2', 'mystream') (172, 'validation', 'mystream') (172, 'validation2', 'mystream') (173, 'validation', 'mystream') (173, 'validation2', 'mystream') (174, 'validation', 'mystream') (174, 'validation2', 'mystream') (175, 'validation', 'mystream') (175, 'validation2', 'mystream') (176, 'validation', 'mystream') (176, 'validation2', 'mystream') (177, 'validation', 'mystream') (177, 'validation2', 'mystream') (178, 'validation', 'mystream') (178, 'validation2', 'mystream') (179, 'validation', 'mystream') (179, 'validation2', 'mystream') (180, 'validation', 'mystream') (180, 'validation2', 'mystream') (181, 'validation', 'mystream') (181, 'validation2', 'mystream') (182, 'validation', 'mystream') (182, 'validation2', 'mystream') (183, 'validation', 'mystream') (183, 'validation2', 'mystream') (184, 'validation', 'mystream') (184, 'validation2', 'mystream') (185, 'validation', 'mystream') (185, 'validation2', 'mystream') (186, 'validation', 'mystream') (186, 'validation2', 'mystream') (187, 'validation', 'mystream') (187, 'validation2', 'mystream') (188, 'validation', 'mystream') (188, 'validation2', 'mystream') (189, 'validation', 'mystream') (189, 'validation2', 'mystream') (190, 'validation', 'mystream') (190, 'validation2', 'mystream') (191, 'validation', 'mystream') (191, 'validation2', 'mystream') (192, 'validation', 'mystream') (192, 'validation2', 'mystream') (193, 'validation', 'mystream') (193, 'validation2', 'mystream') (194, 'validation', 'mystream') (194, 'validation2', 'mystream') (195, 'validation', 'mystream') (195, 'validation2', 'mystream') (196, 'validation', 'mystream') (196, 'validation2', 'mystream') (197, 'validation', 'mystream') (197, 'validation2', 'mystream') (198, 'validation', 'mystream') (198, 'validation2', 'mystream') (199, 'validation', 'mystream') (199, 'validation2', 'mystream') (200, 'validation', 'mystream') (200, 'validation2', 'mystream') PKMyNqVVtelstar/tests/test_consumer.pyimport os import random import sys from time import sleep import peewee import redis from playhouse.db_url import connect from telstar.com import Message from telstar.consumer import MultiConsumer link = redis.from_url(os.environ["REDIS"]) db = connect(os.environ["DATABASE"]) db.connect() class Test(peewee.Model): number = peewee.IntegerField() group_name = peewee.CharField() topic = peewee.CharField() class Meta: indexes = ( (('number', 'group_name', 'topic'), True), ) database = db if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "setup": print("Recreating table in order to start from scratch") db.drop_tables([Test]) db.create_tables([Test]) def simple(consumer, record: Message, done): with db.atomic(): Test.create(number=int(record.data["value"]), group_name=consumer.group_name, topic=record.stream) sleep(random.randrange(int(os.environ.get("SLEEPINESS"))) / 100) done() MultiConsumer(link=link, group_name=os.environ.get("GROUP_NAME"), consumer_name=os.environ.get("CONSUMER_NAME"), config={ os.environ["STREAM_NAME"]: simple, os.environ["STREAM_NAME_TWO"]: simple }).run() PKMyN,J,telstar/tests/test_producer.pyimport json import os import random import sys import uuid from time import sleep from typing import Callable, List, Tuple import peewee import redis from playhouse.db_url import connect from telstar.com import Message from telstar.producer import Producer class JSONField(peewee.TextField): def db_value(self, value): return json.dumps(value) def python_value(self, value): if value is not None: return json.loads(value) link = redis.from_url(os.environ["REDIS"]) db = connect(os.environ["DATABASE"]) db.connect() class Events(peewee.Model): msg_uid = peewee.UUIDField(default=uuid.uuid4) topic = peewee.CharField() data = JSONField() class Meta: database = db if __name__ == "__main__": if len(sys.argv) > 1: if sys.argv[1] == "setup": print("Recreating table in order to start from scratch") db.drop_tables([Events]) db.create_tables([Events]) if "create" in sys.argv[1:]: for i in range(int(os.environ["RANGE_FROM"]), int(os.environ["RANGE_TO"])): Events.create(topic=os.environ["STREAM_NAME"], data=dict(value=i)) def puller() -> Tuple[List[Message], Callable[[], None]]: qs = Events.select().order_by(Events.id).limit(5) msgs = [Message(e.topic, e.msg_uid, e.data) for e in qs] def done(): if not os.environ.get("KEEPEVENTS"): for r in qs: r.delete_instance() sleep(random.randrange(int(os.environ.get("SLEEPINESS"))) / 10) return msgs, done print("starting") Producer(link, puller, context_callable=db.atomic).run() PKMyNgJL  telstar/tests/test_telstar.sh#!/bin/bash # This script is used to verify/test a assumpations and guaranteesabout the system. # # Todos: # * kill redis in the process # * kill mysql in the process # * use more fuzzing - meaning kill and start processes more randomly to simulate failure # * improve the sleeping, maybe look at how we can wait until all pending messages are done and then make final diff. # Configuration export STREAM_NAME="${STREAM_NAME:-mystream}" export STREAM_NAME_TWO="${STREAM_NAME_TWO:-mystream2}" export GROUP_NAME="${GROUP_NAME:-validation}" export SLEEPINESS="${SLEEPINESS:-10}" export REDIS="${REDIS:-redis://localhost:6379/10}" export DATABASE="${DATABASE:-mysql://root:root@127.0.0.1:3306/test}" export PYTHONPATH="${PYTHONPATH}:${SCRIPTPATH}../}" readonly SCRIPTPATH="$( cd "$(dirname "$0")" pwd -P )" function kill_childs_and_exit() { echo "Attempting to kill all childs" echo "..." pkill -P $$ echo "ok, bye" } # TRAP CTRL-C and kill all childs trap kill_childs_and_exit INT if [ -x "$(command -v redis-cli)" ]; then # Clear redis only if available echo "flushing" redis-cli -u $REDIS FLUSHDB fi main() { # Start the first customer with `create` which drops and creates the needed tables in mysql CONSUMER_NAME=1 python $SCRIPTPATH/test_consumer.py setup & CONSUMER_1=$! # This saves the PID of the last command - which we can use to `kill` the process later # What for the creation to be done sleep 2 # Start more consumers # The `SLEEPINESS` will result in race conditions in the database, which is what we want for testing SLEEPINESS=20 CONSUMER_NAME=2 python $SCRIPTPATH/test_consumer.py & CONSUMER_2=$! CONSUMER_NAME=3 python $SCRIPTPATH/test_consumer.py & CONSUMER_3=$! # Create a producer that double sends messages because it does not delete them after sending. KEEPEVENTS=1 RANGE_FROM=1 RANGE_TO=101 python $SCRIPTPATH/test_producer.py setup create & PRODUCER_1=$! sleep 2 # Create another producer that keeps double sending KEEPEVENTS=1 python $SCRIPTPATH/test_producer.py & PRODUCER_2=$! # Create a producer that emits messages onto the second stream RANGE_FROM=1 RANGE_TO=101 STREAM_NAME=$STREAM_NAME_TWO python $SCRIPTPATH/test_producer.py create & PRODUCER_TWO=$! # Since CONSUMER_1 is in the background in already has consumed some messages from the stream kill -0 $CONSUMER_1 && kill -9 $CONSUMER_1 # Wait some more for CONSUMER_2 and CONSUMER_3 to process more data. sleep 2 # Let's kill the producer while it is sending kill -0 $PRODUCER_1 && kill -9 $PRODUCER_1 kill -0 $PRODUCER_2 && kill -9 $PRODUCER_2 # Now we restart CONSUMER_1 CONSUMER_NAME=1 python $SCRIPTPATH/test_consumer.py & CONSUMER_1=$! # Let all consumers process a bit more data sleep 5 # Kill more consumers kill -0 $CONSUMER_1 && kill -9 $CONSUMER_1 kill -0 $CONSUMER_3 && kill -9 $CONSUMER_3 # Oops all consumers are dead now kill $CONSUMER_2 # Create another producer that generates the rest of the data but now also delete what was send already RANGE_FROM=101 RANGE_TO=201 python $SCRIPTPATH/test_producer.py create & PRODUCER_3=$! # Start a new one CONSUMER_NAME=4 python $SCRIPTPATH/test_consumer.py & CONSUMER_4=$! # Wait for `CONSUMER_4` to process all data sleep 30 kill -0 $CONSUMER_4 && kill -9 $CONSUMER_4 kill -0 $PRODUCER_3 && kill -9 $PRODUCER_3 kill -0 $PRODUCER_TWO && kill -9 $PRODUCER_TWO # Restart `CONSUMER_4` and kill it later CONSUMER_NAME=4 python $SCRIPTPATH/test_consumer.py & CONSUMER_4=$! sleep 30 kill -0 $CONSUMER_4 && kill -9 $CONSUMER_4 # Create a new consumer group that reads everything from the beginning of time. SLEEPINESS=1 CONSUMER_NAME=4 GROUP_NAME=validation2 python $SCRIPTPATH/test_consumer.py & NEW_CONSUMER=$! sleep 10 kill -0 $NEW_CONSUMER && kill -9 $NEW_CONSUMER # Verify that the database hold the expected records python $SCRIPTPATH/_dbcontent.py | diff $SCRIPTPATH/expected.txt - || exit 1 # Makre sure we do not have pending message left PENDING_STREAM_1=$(redis-cli -u $REDIS --csv XPENDING telstar:stream:$STREAM_NAME validation) PENDING_STREAM_2=$(redis-cli -u $REDIS --csv XPENDING telstar:stream:$STREAM_NAME_TWO validation) PENDING_STREAM_3=$(redis-cli -u $REDIS --csv XPENDING telstar:stream:$STREAM_NAME_TWO validation2) test "$PENDING_STREAM_1" == "0,NIL,NIL,NIL" || { echo "$STREAM_NAME validation has pending records" && exit 1; } test "$PENDING_STREAM_2" == "0,NIL,NIL,NIL" || { echo "$STREAM_NAME validation2 has pending records" && exit 1; } test "$PENDING_STREAM_3" == "0,NIL,NIL,NIL" || { echo "$STREAM_NAME_TWO validation2 has pending records" && exit 1; } } main PKMyNe33telstar-0.1.0.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2019 Bitspark Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HPOtelstar-0.1.0.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!Hj telstar-0.1.0.dist-info/METADATA]N1 E -H,"(N-63Cտ'C,}ǺKRX=S;35rl~٩Y!`>8}`M-FgVʀ&26!B܋B&_ꢙ051P V5L^Y%a's Ŗ^f>6fWch* ;"^zRmn!'?j+@4J`^*ʶ/ Ռc l<ƌjjIS>o.g}>8tjD mݚ*?P#bW,Cy"?Z-]!nUU!2(\ų:D{E|V(kOjzfWڬ?.nՋ{../fֺP9+R}71T'112nqѲʷJp>*ZmMpɪ`^QlcYgAw_=L 2:VX`T:j7R[Y;'hHfh#\X"^Cp[Ŀ(NR_wOֲُMj !Z&3 h*;7]i;>8s |I,e"+/NfX90fKk%uTF\;mɰ0kK!*]_u6;&Ns=a`󭩢#3GڰEe/H!:sP]gN})8-(= N?PKMyN{==telstar/__init__.pyPKMyNWR$$ntelstar/consumer.pyPKMyNn,&telstar/producer.pyPKMyN -telstar/com/__init__.pyPKMyN,3telstar/tests/_dbcontent.pyPKMyNj`DKDK(4telstar/tests/expected.txtPKMyNqVVtelstar/tests/test_consumer.pyPKMyN,J,6telstar/tests/test_producer.pyPKMyNgJL  telstar/tests/test_telstar.shPKMyNe33Ntelstar-0.1.0.dist-info/LICENSEPK!HPOtelstar-0.1.0.dist-info/WHEELPK!Hj Itelstar-0.1.0.dist-info/METADATAPK!Hlwtelstar-0.1.0.dist-info/RECORDPK