PKSNѰ  telstar/__init__.py""" Telstar is a package to write producer and consumers groups against redis streams. """ import inspect import logging from datetime import datetime from functools import wraps from typing import Callable, Dict, List, Union from uuid import UUID import redis from marshmallow import Schema, ValidationError from .admin import admin from .com import Message, StagedMessage from .consumer import MultiConsumer, ThreadedMultiConsumer __version__ = "0.2.4" logging.getLogger(__package__).addHandler(logging.NullHandler()) log = logging.getLogger(__package__) admin = admin def stage(topic: str, data: Dict[str, Union[int, str, datetime, UUID]]) -> UUID: e = StagedMessage.create(topic=topic, data=data) return e.msg_uid def staged() -> List[Message]: return [e.to_msg() for e in StagedMessage.unsent()] class app: def __init__(self, link: redis.Redis, consumer_name: str, consumer_cls: MultiConsumer = ThreadedMultiConsumer, **kwargs) -> None: self.link: redis = link self.config: dict = {} self.consumer_name: str = consumer_name self.consumer_cls: MultiConsumer = consumer_cls self.kwargs = kwargs self.error_handlers = {} def _register_error_handler(self, exc_class, fn): self.error_handlers[exc_class] = fn def errorhandler(self, exc_class): def decorator(fn): self._register_error_handler(exc_class, fn) return decorator def get_consumer(self) -> MultiConsumer: return self.consumer_cls(self.link, self.consumer_name, self.config, error_handlers=self.error_handlers, **self.kwargs) def start(self): self.get_consumer().run() def run_once(self) -> None: self.get_consumer().run_once() def requires_full_message(self, fn: Callable) -> bool: argsspec = inspect.getfullargspec(fn) arg = argsspec.args[0] return argsspec.annotations[arg] is Message def consumer(self, group: str, streams: list, schema: Schema, strict=True, acknowledge_invalid=False) -> Callable: def decorator(fn): fullmessage = self.requires_full_message(fn) nonlocal streams if not isinstance(streams, list): streams = [streams] for stream in streams: @wraps(fn) def actual_consumer(consumer: MultiConsumer, msg: Message, done: callable): try: msg.data = schema().load(msg.data) fn(msg) if fullmessage else fn(msg.data) done() except ValidationError as err: log.error(f"Unable to validate message: {msg}", exc_info=True) if acknowledge_invalid: done() if strict: raise err if group in self.config: self.config[group][stream] = actual_consumer else: self.config[group] = {stream: actual_consumer} return decorator PKSN8@ktelstar/admin.pyimport json import uuid from typing import Dict, List, Optional, Tuple, Union import redis from .com import Message, decrement_msg_id class admin: def __init__(self, link: redis.Redis) -> None: self.link: redis.Redis = link def get_streams(self, match: None = None) -> List["Stream"]: match = match or "" streams = self.link.scan_iter(match=f"telstar:stream:{match}*") return [Stream(self, s) for s in streams] def get_consumers(self) -> List["Consumer"]: return sum([g.get_consumers() for s in self.get_streams() for g in s.get_groups()], []) class Stream: def __init__(self, admin: admin, stream_name: str) -> None: self.name = stream_name self.admin = admin self.link = admin.link @property def display_name(self) -> bytes: return self.name.replace(b"telstar:stream:", b"") def get_groups(self) -> List["Group"]: return [Group(self, name=info["name"], **self.link.xpending(self.name, info["name"])) for info in self.link.xinfo_groups(self.name)] def get_pending_messages(self) -> List["AdminMessage"]: return sum([g.get_pending_messages() for g in self.get_groups()], []) def get_length(self) -> int: return self.link.xlen(self.name) class Group: def __init__(self, stream: Stream, name: str, pending: int, min: Optional[bytes], max: Optional[bytes], consumers: List[Dict[str, Union[bytes, int]]]) -> None: self.stream = stream self.link = stream.link self.name = name self.pending, self.min, self.max, self.consumers = pending, min, max, consumers def get_pending_messages(self) -> List["AdminMessage"]: if self.pending == 0: return [] return [AdminMessage(self, **info) for info in self.link.xpending_range(self.stream.name, self.name, self.min, self.max, self.pending)] def get_consumers(self) -> List["Consumer"]: return [Consumer(self, **info) for info in self.link.xinfo_consumers(self.stream.name, self.name)] def get_seen_messages(self) -> int: stream_name = self.stream.name.replace(b"telstar:stream:", b"").decode("ascii") name = self.name.decode("ascii") return len(self.link.keys(f"telstar:seen:{stream_name}:{name}*")) def delete(self) -> bool: return self.link.xgroup_destroy(self.stream.name, self.name) class Consumer: def __init__(self, group: Group, name: bytes, pending: int, idle: int) -> None: self.group = group self.name = name self.pending_messages = pending self.idle_time = idle def delete(self) -> int: return self.group.stream.admin.link.xgroup_delconsumer(self.group.stream.name, self.group.name, self.name) class AdminMessage: def __init__(self, group: Group, message_id: bytes, consumer: str, time_since_delivered: int, times_delivered: int) -> None: self.group = group self.message_id = message_id self.consumer = consumer self.time_since_delivered = time_since_delivered self.times_delivered = times_delivered def read_raw(self) -> List[List[Union[bytes, List[Tuple[bytes, Dict[bytes, bytes]]]]]]: return self.group.stream.admin.link.xread({ self.group.stream.name: decrement_msg_id(self.message_id) }, count=1) def read(self) -> Message: for stream_name, records in self.read_raw(): for record in records: stream_msg_id, record = record return Message(stream_name, uuid.UUID(record[Message.IDFieldName].decode("ascii")), json.loads(record[Message.DataFieldName])) PKSNqy::telstar/consumer.pyimport json import logging import threading import time import uuid from functools import partial from typing import Callable, Dict import redis from .com import Message, decrement_msg_id, increment_msg_id, MessageError # An important concept to understand here is the consumer group which give us the following consumer properties: # msg -> consumer # # msg:1 -> cg-userSignUp.1 # msg:2 -> cg-userSignUp.2 # msg:3 -> cg-userSignUp.3 # msg:4 -> cg-userSignUp.1 # msg:5 -> cg-userSignUp.2 # msg:6 -> cg-userSignUp.3 # msg:7 -> cg-userSignUp.1 # # Which basically means that inside a group a single consumer will only get message the others have not yet seen. # For a deep dive read to this -> https://redis.io/topics/streams-intro # This allows us to create N consumers w/o needing to figure out which message already has been processed. log = logging.getLogger(__name__) class MultiConsumer(object): def __init__(self, link: redis.Redis, group_name: str, consumer_name: str, config: dict, block: int = 2000, claim_the_dead_after: int = 20 * 1000, error_handlers=None) -> None: self.link = link self.block = block self.claim_the_dead_after = claim_the_dead_after self.consumer_name = consumer_name self.group_name = group_name self.error_handlers = error_handlers or {} self.processors = {f"telstar:stream:{stream_name}": fn for stream_name, fn in config.items()} self.streams = self.processors.keys() for stream_name in self.streams: self.create_consumer_group(stream_name) def get_consumer_name(self, stream: str) -> str: return f"cg:{self.group_name}:{self.consumer_name}" def _seen_key(self, msg: Message) -> str: return f"telstar:seen:{msg.stream}:{self.group_name}:{msg.msg_uuid}" def _checkpoint_key(self, stream: str) -> str: return f"telstar:checkpoint:{stream}:{self.get_consumer_name(stream)}" # A new consumer group for the given stream, if the stream does not exist yet # create one (`mkstream`) - if it does we want all messages present `id=0` def create_consumer_group(self, stream_name: str) -> None: try: self.link.xgroup_create(stream_name, self.group_name, mkstream=True, id="0") except redis.exceptions.ResponseError: log.debug(f"Group: {self.group_name} for Stream: {stream_name} already exists") # In consumer groups, consumers can disappear, when they do they can leave non ack'ed message # which we want to claim and be delivered to a new consumer def claim_message_from_the_dead(self, stream_name: str) -> None: # Get information about all consumers in the group and how many messages are pending pending_info = self.link.xpending(stream_name, self.group_name) # {'pending': 10, # 'min': b'1560032216285-0', # 'max': b'1560032942270-0', # 'consumers': [{'name': b'cg-userSignUp.1', 'pending': 10}]} # Nothing to do if pending_info["pending"] == 0: log.debug(f"Stream: {stream_name} in Group: {self.group_name} has no pending messages") return # Get all messages ids within that range and select the ones we want to claim and claim them # But only if they are pending for more than 20secs. pending_messages = self.link.xpending_range(stream_name, self.group_name, pending_info["min"], pending_info["max"], pending_info["pending"]) # [ # {'message_id': b'1560194528886-0', # 'consumer': b'cg-userSignUp.1', # 'time_since_delivered': 22020, # 'times_delivered': 1} # ...] messages_to_claim = [p["message_id"] for p in pending_messages] if not messages_to_claim: # The pending messages are all our own no need to claim anything # This can happen when we simply restart a consumer with the same name return # It might be cheaper to claim *and* receive the message so we can work on them directly # w/o catching up through the history with the potential of a lot of already seen keys. log.debug(f"Stream: {stream_name} in Group: {self.group_name} claiming: {len(messages_to_claim)} message(s)") claimed_messages = self.link.xclaim(stream_name, self.group_name, self.consumer_name, self.claim_the_dead_after, messages_to_claim, justid=True) log.debug(f"Stream: {stream_name} in Group: {self.group_name} claimed: {len(messages_to_claim)} message(s)") return claimed_messages # We claim the message from other dead/non-responsive consumers. # When new message have been claimed they are usually from the past # which means in order to process them we need to start processing our history. def transfer_and_process_stream_history(self, streams: list): last_seen = dict() for stream_name in streams: last_seen[stream_name] = self.get_last_seen_id(stream_name) stream_msg_ids = self.claim_message_from_the_dead(stream_name) if stream_msg_ids: # if there are message that we have claimed we need to determine where to start processing # because we can't just wait for new message to arrive. before_earliest = decrement_msg_id(min(stream_msg_ids)) next_after_seen = increment_msg_id(last_seen[stream_name]) last_seen[stream_name] = min([before_earliest, next_after_seen]) # Read all message for the past up until now. log.info(f"Stream: {', '.join(last_seen)} in Group: {self.group_name} as Consumer: {self.consumer_name} reading past messages") self.catchup(last_seen) # This is the main loop where we start from the history # and claim message and reprocess our history. # We also loop the transfer_and_process_history as other consumers might have died while we waited def run(self): log.info(f"Starting consumer loop for Group {self.group_name}") while True: self.run_once() def run_once(self) -> None: self.transfer_and_process_stream_history(self.streams) # With our history processes we can now start waiting for new message to arrive `>` config = {k: ">" for k in self.streams} log.info(f"Stream: {', '.join(self.streams)} in Group: {self.group_name} as Consumer: {self.consumer_name} reading pending message or waiting for new") self.read(config, block=self.block) def get_last_seen_id(self, stream_name: str) -> bytes: check_point_key = self._checkpoint_key(stream_name) return self.link.get(check_point_key) or b"0-0" # Multiple things are happening here. # 1. Save the stream_msg_id as checkpoint, which means # that we know where to start should the consumer be restarted # 2. Each message has a UUID and in order to process each meassage only once we remember # the UUID for 14 days # 3. Acknowledge the message to meaning that we have processed it def acknowledge(self, msg: Message, stream_msg_id: bytes) -> None: log.debug(f"Stream: telstar:stream:{msg.stream} in Group: {self.group_name} acknowledging Message: {msg.msg_uuid} - {stream_msg_id}") check_point_key = self._checkpoint_key(f"telstar:stream:{msg.stream}") seen_key = self._seen_key(msg) # Execute the following statments in a transaction e.g. redis speak `pipeline` pipe = self.link.pipeline() # If this key changes before we execute the pipeline than the ack fails and this the processor reverts all the work. # Which is exactly what we want in this case as the work has already been completed by another consumer. pipe.watch(seen_key) # Mark this as a seen key for 14 Days meaning if the message reappears after 14 days we reprocess it pipe.set(seen_key, 1, ex=14 * 24 * 60 * 60) # 14 days # Set the checkpoint for this consumer so that it knows where to start agains once it restarts. pipe.set(check_point_key, stream_msg_id) # Acknowledge the actual message pipe.xack(f"telstar:stream:{msg.stream}", self.group_name, stream_msg_id) pipe.execute() def work(self, stream_name: bytes, stream_msg_id: bytes, record: Dict[bytes, bytes]) -> None: try: msg = Message(stream_name, uuid.UUID(record[Message.IDFieldName].decode("ascii")), json.loads(record[Message.DataFieldName])) except KeyError as exc: msg = f"Malformed message, record: {record} does not have fields {Message.IDFieldName} and {Message.DataFieldName} " log.exception(msg) raise MessageError(msg) from exc done = partial(self.acknowledge, msg, stream_msg_id) key = self._seen_key(msg) if self.link.get(key): # This is a double send log.debug(f"Stream: telstar:stream:{msg.stream} in Group: {self.group_name} skipping already processed Message: {msg.msg_uuid} - {stream_msg_id} ") return done() log.info(f"Stream: telstar:stream:{msg.stream} in Group: {self.group_name} processing Message: {msg.msg_uuid} - {stream_msg_id}") self.processors[stream_name.decode("ascii")](self, msg, done) # Process all message from `start` def catchup(self, streams: Dict[str, bytes]) -> int: return self._xreadgroup(streams) # Process wait for new messages def read(self, streams: Dict[str, str], block: int) -> int: return self._xreadgroup(streams, block=block) def _xreadgroup(self, streams: Dict[str, str], block: int = 0) -> int: result = list() for stream_name, records in self.link.xreadgroup(self.group_name, self.consumer_name, streams, block=block): for record in records: stream_msg_id, record = record result.append((stream_name, stream_msg_id, record)) if not result: return 0 # Sort the message afterwards in order to restore the order they where sent in, this can only be a best effort # approach and does not guarantee the correct order when using `xreadgroup` with multiple streams. for processed, t in enumerate(sorted(result, key=lambda t: t[1]), start=1): stream_name, stream_msg_id, record = t try: self.work(stream_name, stream_msg_id, record) except Exception as exc: self._handle_exception(exc, stream_name, stream_msg_id, record) return processed def _find_error_handler(self, exc): for cls in type(exc).__mro__: handler = self.error_handlers.get(cls) if handler is not None: return handler def _handle_exception(self, exc, stream_name, stream_msg_id, record): handler = self._find_error_handler(exc) if handler is None: raise exc bare_ack = partial(self._bare_ack, stream_name, stream_msg_id) return handler(exc, bare_ack) def _bare_ack(self, stream_name, stream_msg_id): check_point_key = self._checkpoint_key(stream_name) pipe = self.link.pipeline() pipe.set(check_point_key, stream_msg_id) pipe.xack(stream_name, self.group_name, stream_msg_id) pipe.execute() class Consumer(MultiConsumer): def __init__(self, link: redis.Redis, group_name: str, consumer_name: str, stream_name: str, processor_fn: Callable) -> None: super().__init__(link, group_name, consumer_name, {stream_name: processor_fn}) class MultiConsumeOnce(MultiConsumer): # This lets you write code that is executed once against an entire stream # TODO: What would be really cool is to have this sort of like migrations # where `telstar` itself has cli commands to add and run files containing # `MultiConsumeOnce` code. def __init__(self, link: redis.Redis, group_name: str, config: dict) -> None: super().__init__(link, group_name, "once-consumer", config, 2000, 20000) def _applied_key(self) -> str: return f"telstar:once:{self.group_name}" def is_applied(self) -> bool: key = self._applied_key() return bool(self.link.get(key)) def mark_as_applied(self) -> bool: key = self._applied_key() return self.link.set(key, int(time.time())) def has_pending_message(self) -> bool: for stream_name in self.streams: if self.link.xpending(stream_name, self.group_name)["pending"] != 0: return True return False def run(self) -> int: num_processed = 0 if self.is_applied(): log.info(f"Group: {self.group_name} for Streams: {self.streams} will not run as it already ran") return num_processed # This is the first time we try to apply this. if not self.has_pending_message(): # Reading a stream from ">" has a special meaning, it instructs redis to send all messages to the group # Which does two things first it puts them all into the pending list of that consumer inside the group # and also delivers them to the client. streams = {s: ">" for s in self.streams} num_processed = self.read(streams, 0) else: # Now the data as already been delivered to the group we can now start reading from the beginning streams = {s: "0" for s in self.streams} num_processed = self.read(streams, 0) # everything has been seen and processed where can mark this as applied if not self.has_pending_message(): self.mark_as_applied() return num_processed class PropagatingThread(threading.Thread): def run(self): self.exc = None try: self.ret = self._target(*self._args, **self._kwargs) except BaseException as e: self.exc = e def join(self) -> None: super(PropagatingThread, self).join() if self.exc: raise self.exc return self.ret class ThreadedMultiConsumer: def __init__(self, link: redis.Redis, consumer_name: str, group_configs: dict, **kw) -> None: self.consumers = list() for group_name, config in group_configs.items(): self.consumers.append(MultiConsumer(link, group_name, consumer_name, config, **kw)) def run(self): self._run_threaded("run") def run_once(self) -> None: self._run_threaded("run_once") def _run_threaded(self, target: str) -> None: threads = list() for c in self.consumers: t = PropagatingThread(target=getattr(c, target), daemon=True) t.start() threads.append(t) for t in threads: t.join() PKSN 3 telstar/producer.pyimport json import logging from time import sleep from typing import Callable, List, Optional, Tuple from peewee import Database from redis.client import Redis from .com import Message, StagedMessage log = logging.getLogger(__name__) class Producer(object): def __init__(self, link: Redis, get_records: Callable[[], Tuple[List[Message], Callable[[], None]]], context_callable: Optional[Callable] = None) -> None: self.link = link self.get_records = get_records self.context_callable = context_callable def run_once(self) -> None: records, done = self.get_records() pipe = self.link.pipeline() for msg in records: # Why the sleep here? It helps with sorting the events on the receiving side. # But it also limits to amount of possible sends to under 1k messages per send. # Which for now seems acceptable. sleep(.001) pipe.xadd(f"telstar:stream:{msg.stream}", { Message.IDFieldName: str(msg.msg_uuid), Message.DataFieldName: json.dumps(msg.data)}) pipe.execute() done() def run(self): log.info("Starting main producer loop") while True: if callable(self.context_callable): with self.context_callable(): self.run_once() else: self.run_once() class StagedProducer(Producer): def __init__(self, link: Redis, database: Database, batch_size: int = 5, wait: float = 0.5) -> None: self.batch_size = batch_size self.wait = wait StagedMessage.bind(database) super().__init__(link, self.create_puller(), StagedMessage._meta.database.atomic) def create_puller(self) -> Callable: def puller() -> Tuple[List[Message], Callable[[], None]]: qs = StagedMessage.unsent().limit(self.batch_size).order_by(StagedMessage.id) msgs = [e.to_msg() for e in qs] log.debug(f"Found {len(msgs)} messages to be send") def done(): ids = list(map(lambda l: l.id, qs)) if ids: log.debug(f"Attempting to mark {len(ids)} messages as being sent") result = StagedMessage.update(sent=True).where(StagedMessage.id << ids).execute() log.debug(f"Result was: {result}") sleep(self.wait) return msgs, done return puller PKSN[;B B telstar/com/__init__.pyimport json import uuid from datetime import datetime from typing import Dict, Union import peewee from peewee import ModelSelect class MessageError(Exception): pass class TelstarEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, datetime): return o.isoformat() if isinstance(o, uuid.UUID): return str(o) return json.JSONEncoder.default(self, o) class JSONField(peewee.TextField): def db_value(self, value: Dict[str, Union[int, str]]) -> str: return json.dumps(value, cls=TelstarEncoder) def python_value(self, value: str) -> Dict[str, Union[int, str]]: if value is not None: return json.loads(value) class Message(object): IDFieldName = b"message_id" DataFieldName = b"data" def __init__(self, stream: str, msg_uuid: uuid.UUID, data: dict) -> None: if not isinstance(msg_uuid, uuid.UUID): raise TypeError(f"msg_uuid needs to be uuid.UUID not {type(msg_uuid)}") if isinstance(stream, bytes): stream = stream.decode("ascii") self.stream = stream.replace("telstar:stream:", "") self.msg_uuid = msg_uuid self.data = data def __repr__(self): return f"" class StagedMessage(peewee.Model): msg_uid = peewee.UUIDField(default=uuid.uuid4, index=True) topic = peewee.CharField(index=True) data = JSONField() sent = peewee.BooleanField(default=False, index=True) created_at = peewee.TimestampField(resolution=10**3) @classmethod def unsent(cls) -> ModelSelect: return cls.select().where(cls.sent == False) # noqa def to_msg(self) -> Message: return Message(self.topic, self.msg_uid, self.data) def increment_msg_id(id) -> bytes: # IDs are of the form "1509473251518-0" and comprise a millisecond # timestamp plus a sequence number to differentiate within the timestamp. time, sequence = id.decode("ascii").split("-") if not sequence: raise Exception("Argument error, {id} has wrong format not #-#") next_sequence = int(sequence) + 1 return bytes(f"{time}-{next_sequence}", "ascii") def decrement_msg_id(id: bytes) -> bytes: time, sequence = id.decode("ascii").split("-") if not sequence: raise Exception("Argument error, {id} has wrong format not #-#") sequence = int(sequence) time = int(time) if sequence == 0: time = time - 1 else: sequence = int(sequence) - 1 return bytes(f"{time}-{sequence}", "ascii") PKSNtelstar/tests/_dbcontent.pyimport os from playhouse.db_url import connect db = connect(os.environ["DATABASE"]) result = db.execute_sql("SELECT number, group_name, topic FROM test") print("\n".join(map(str, list(result)))) PKSNj`DKDKtelstar/tests/expected.txt(1, 'validation', 'mystream') (1, 'validation', 'mystream2') (1, 'validation2', 'mystream') (1, 'validation2', 'mystream2') (2, 'validation', 'mystream') (2, 'validation', 'mystream2') (2, 'validation2', 'mystream') (2, 'validation2', 'mystream2') (3, 'validation', 'mystream') (3, 'validation', 'mystream2') (3, 'validation2', 'mystream') (3, 'validation2', 'mystream2') (4, 'validation', 'mystream') (4, 'validation', 'mystream2') (4, 'validation2', 'mystream') (4, 'validation2', 'mystream2') (5, 'validation', 'mystream') (5, 'validation', 'mystream2') (5, 'validation2', 'mystream') (5, 'validation2', 'mystream2') (6, 'validation', 'mystream') (6, 'validation', 'mystream2') (6, 'validation2', 'mystream') (6, 'validation2', 'mystream2') (7, 'validation', 'mystream') (7, 'validation', 'mystream2') (7, 'validation2', 'mystream') (7, 'validation2', 'mystream2') (8, 'validation', 'mystream') (8, 'validation', 'mystream2') (8, 'validation2', 'mystream') (8, 'validation2', 'mystream2') (9, 'validation', 'mystream') (9, 'validation', 'mystream2') (9, 'validation2', 'mystream') (9, 'validation2', 'mystream2') (10, 'validation', 'mystream') (10, 'validation', 'mystream2') (10, 'validation2', 'mystream') (10, 'validation2', 'mystream2') (11, 'validation', 'mystream') (11, 'validation', 'mystream2') (11, 'validation2', 'mystream') (11, 'validation2', 'mystream2') (12, 'validation', 'mystream') (12, 'validation', 'mystream2') (12, 'validation2', 'mystream') (12, 'validation2', 'mystream2') (13, 'validation', 'mystream') (13, 'validation', 'mystream2') (13, 'validation2', 'mystream') (13, 'validation2', 'mystream2') (14, 'validation', 'mystream') (14, 'validation', 'mystream2') (14, 'validation2', 'mystream') (14, 'validation2', 'mystream2') (15, 'validation', 'mystream') (15, 'validation', 'mystream2') (15, 'validation2', 'mystream') (15, 'validation2', 'mystream2') (16, 'validation', 'mystream') (16, 'validation', 'mystream2') (16, 'validation2', 'mystream') (16, 'validation2', 'mystream2') (17, 'validation', 'mystream') (17, 'validation', 'mystream2') (17, 'validation2', 'mystream') (17, 'validation2', 'mystream2') (18, 'validation', 'mystream') (18, 'validation', 'mystream2') (18, 'validation2', 'mystream') (18, 'validation2', 'mystream2') (19, 'validation', 'mystream') (19, 'validation', 'mystream2') (19, 'validation2', 'mystream') (19, 'validation2', 'mystream2') (20, 'validation', 'mystream') (20, 'validation', 'mystream2') (20, 'validation2', 'mystream') (20, 'validation2', 'mystream2') (21, 'validation', 'mystream') (21, 'validation', 'mystream2') (21, 'validation2', 'mystream') (21, 'validation2', 'mystream2') (22, 'validation', 'mystream') (22, 'validation', 'mystream2') (22, 'validation2', 'mystream') (22, 'validation2', 'mystream2') (23, 'validation', 'mystream') (23, 'validation', 'mystream2') (23, 'validation2', 'mystream') (23, 'validation2', 'mystream2') (24, 'validation', 'mystream') (24, 'validation', 'mystream2') (24, 'validation2', 'mystream') (24, 'validation2', 'mystream2') (25, 'validation', 'mystream') (25, 'validation', 'mystream2') (25, 'validation2', 'mystream') (25, 'validation2', 'mystream2') (26, 'validation', 'mystream') (26, 'validation', 'mystream2') (26, 'validation2', 'mystream') (26, 'validation2', 'mystream2') (27, 'validation', 'mystream') (27, 'validation', 'mystream2') (27, 'validation2', 'mystream') (27, 'validation2', 'mystream2') (28, 'validation', 'mystream') (28, 'validation', 'mystream2') (28, 'validation2', 'mystream') (28, 'validation2', 'mystream2') (29, 'validation', 'mystream') (29, 'validation', 'mystream2') (29, 'validation2', 'mystream') (29, 'validation2', 'mystream2') (30, 'validation', 'mystream') (30, 'validation', 'mystream2') (30, 'validation2', 'mystream') (30, 'validation2', 'mystream2') (31, 'validation', 'mystream') (31, 'validation', 'mystream2') (31, 'validation2', 'mystream') (31, 'validation2', 'mystream2') (32, 'validation', 'mystream') (32, 'validation', 'mystream2') (32, 'validation2', 'mystream') (32, 'validation2', 'mystream2') (33, 'validation', 'mystream') (33, 'validation', 'mystream2') (33, 'validation2', 'mystream') (33, 'validation2', 'mystream2') (34, 'validation', 'mystream') (34, 'validation', 'mystream2') (34, 'validation2', 'mystream') (34, 'validation2', 'mystream2') (35, 'validation', 'mystream') (35, 'validation', 'mystream2') (35, 'validation2', 'mystream') (35, 'validation2', 'mystream2') (36, 'validation', 'mystream') (36, 'validation', 'mystream2') (36, 'validation2', 'mystream') (36, 'validation2', 'mystream2') (37, 'validation', 'mystream') (37, 'validation', 'mystream2') (37, 'validation2', 'mystream') (37, 'validation2', 'mystream2') (38, 'validation', 'mystream') (38, 'validation', 'mystream2') (38, 'validation2', 'mystream') (38, 'validation2', 'mystream2') (39, 'validation', 'mystream') (39, 'validation', 'mystream2') (39, 'validation2', 'mystream') (39, 'validation2', 'mystream2') (40, 'validation', 'mystream') (40, 'validation', 'mystream2') (40, 'validation2', 'mystream') (40, 'validation2', 'mystream2') (41, 'validation', 'mystream') (41, 'validation', 'mystream2') (41, 'validation2', 'mystream') (41, 'validation2', 'mystream2') (42, 'validation', 'mystream') (42, 'validation', 'mystream2') (42, 'validation2', 'mystream') (42, 'validation2', 'mystream2') (43, 'validation', 'mystream') (43, 'validation', 'mystream2') (43, 'validation2', 'mystream') (43, 'validation2', 'mystream2') (44, 'validation', 'mystream') (44, 'validation', 'mystream2') (44, 'validation2', 'mystream') (44, 'validation2', 'mystream2') (45, 'validation', 'mystream') (45, 'validation', 'mystream2') (45, 'validation2', 'mystream') (45, 'validation2', 'mystream2') (46, 'validation', 'mystream') (46, 'validation', 'mystream2') (46, 'validation2', 'mystream') (46, 'validation2', 'mystream2') (47, 'validation', 'mystream') (47, 'validation', 'mystream2') (47, 'validation2', 'mystream') (47, 'validation2', 'mystream2') (48, 'validation', 'mystream') (48, 'validation', 'mystream2') (48, 'validation2', 'mystream') (48, 'validation2', 'mystream2') (49, 'validation', 'mystream') (49, 'validation', 'mystream2') (49, 'validation2', 'mystream') (49, 'validation2', 'mystream2') (50, 'validation', 'mystream') (50, 'validation', 'mystream2') (50, 'validation2', 'mystream') (50, 'validation2', 'mystream2') (51, 'validation', 'mystream') (51, 'validation', 'mystream2') (51, 'validation2', 'mystream') (51, 'validation2', 'mystream2') (52, 'validation', 'mystream') (52, 'validation', 'mystream2') (52, 'validation2', 'mystream') (52, 'validation2', 'mystream2') (53, 'validation', 'mystream') (53, 'validation', 'mystream2') (53, 'validation2', 'mystream') (53, 'validation2', 'mystream2') (54, 'validation', 'mystream') (54, 'validation', 'mystream2') (54, 'validation2', 'mystream') (54, 'validation2', 'mystream2') (55, 'validation', 'mystream') (55, 'validation', 'mystream2') (55, 'validation2', 'mystream') (55, 'validation2', 'mystream2') (56, 'validation', 'mystream') (56, 'validation', 'mystream2') (56, 'validation2', 'mystream') (56, 'validation2', 'mystream2') (57, 'validation', 'mystream') (57, 'validation', 'mystream2') (57, 'validation2', 'mystream') (57, 'validation2', 'mystream2') (58, 'validation', 'mystream') (58, 'validation', 'mystream2') (58, 'validation2', 'mystream') (58, 'validation2', 'mystream2') (59, 'validation', 'mystream') (59, 'validation', 'mystream2') (59, 'validation2', 'mystream') (59, 'validation2', 'mystream2') (60, 'validation', 'mystream') (60, 'validation', 'mystream2') (60, 'validation2', 'mystream') (60, 'validation2', 'mystream2') (61, 'validation', 'mystream') (61, 'validation', 'mystream2') (61, 'validation2', 'mystream') (61, 'validation2', 'mystream2') (62, 'validation', 'mystream') (62, 'validation', 'mystream2') (62, 'validation2', 'mystream') (62, 'validation2', 'mystream2') (63, 'validation', 'mystream') (63, 'validation', 'mystream2') (63, 'validation2', 'mystream') (63, 'validation2', 'mystream2') (64, 'validation', 'mystream') (64, 'validation', 'mystream2') (64, 'validation2', 'mystream') (64, 'validation2', 'mystream2') (65, 'validation', 'mystream') (65, 'validation', 'mystream2') (65, 'validation2', 'mystream') (65, 'validation2', 'mystream2') (66, 'validation', 'mystream') (66, 'validation', 'mystream2') (66, 'validation2', 'mystream') (66, 'validation2', 'mystream2') (67, 'validation', 'mystream') (67, 'validation', 'mystream2') (67, 'validation2', 'mystream') (67, 'validation2', 'mystream2') (68, 'validation', 'mystream') (68, 'validation', 'mystream2') (68, 'validation2', 'mystream') (68, 'validation2', 'mystream2') (69, 'validation', 'mystream') (69, 'validation', 'mystream2') (69, 'validation2', 'mystream') (69, 'validation2', 'mystream2') (70, 'validation', 'mystream') (70, 'validation', 'mystream2') (70, 'validation2', 'mystream') (70, 'validation2', 'mystream2') (71, 'validation', 'mystream') (71, 'validation', 'mystream2') (71, 'validation2', 'mystream') (71, 'validation2', 'mystream2') (72, 'validation', 'mystream') (72, 'validation', 'mystream2') (72, 'validation2', 'mystream') (72, 'validation2', 'mystream2') (73, 'validation', 'mystream') (73, 'validation', 'mystream2') (73, 'validation2', 'mystream') (73, 'validation2', 'mystream2') (74, 'validation', 'mystream') (74, 'validation', 'mystream2') (74, 'validation2', 'mystream') (74, 'validation2', 'mystream2') (75, 'validation', 'mystream') (75, 'validation', 'mystream2') (75, 'validation2', 'mystream') (75, 'validation2', 'mystream2') (76, 'validation', 'mystream') (76, 'validation', 'mystream2') (76, 'validation2', 'mystream') (76, 'validation2', 'mystream2') (77, 'validation', 'mystream') (77, 'validation', 'mystream2') (77, 'validation2', 'mystream') (77, 'validation2', 'mystream2') (78, 'validation', 'mystream') (78, 'validation', 'mystream2') (78, 'validation2', 'mystream') (78, 'validation2', 'mystream2') (79, 'validation', 'mystream') (79, 'validation', 'mystream2') (79, 'validation2', 'mystream') (79, 'validation2', 'mystream2') (80, 'validation', 'mystream') (80, 'validation', 'mystream2') (80, 'validation2', 'mystream') (80, 'validation2', 'mystream2') (81, 'validation', 'mystream') (81, 'validation', 'mystream2') (81, 'validation2', 'mystream') (81, 'validation2', 'mystream2') (82, 'validation', 'mystream') (82, 'validation', 'mystream2') (82, 'validation2', 'mystream') (82, 'validation2', 'mystream2') (83, 'validation', 'mystream') (83, 'validation', 'mystream2') (83, 'validation2', 'mystream') (83, 'validation2', 'mystream2') (84, 'validation', 'mystream') (84, 'validation', 'mystream2') (84, 'validation2', 'mystream') (84, 'validation2', 'mystream2') (85, 'validation', 'mystream') (85, 'validation', 'mystream2') (85, 'validation2', 'mystream') (85, 'validation2', 'mystream2') (86, 'validation', 'mystream') (86, 'validation', 'mystream2') (86, 'validation2', 'mystream') (86, 'validation2', 'mystream2') (87, 'validation', 'mystream') (87, 'validation', 'mystream2') (87, 'validation2', 'mystream') (87, 'validation2', 'mystream2') (88, 'validation', 'mystream') (88, 'validation', 'mystream2') (88, 'validation2', 'mystream') (88, 'validation2', 'mystream2') (89, 'validation', 'mystream') (89, 'validation', 'mystream2') (89, 'validation2', 'mystream') (89, 'validation2', 'mystream2') (90, 'validation', 'mystream') (90, 'validation', 'mystream2') (90, 'validation2', 'mystream') (90, 'validation2', 'mystream2') (91, 'validation', 'mystream') (91, 'validation', 'mystream2') (91, 'validation2', 'mystream') (91, 'validation2', 'mystream2') (92, 'validation', 'mystream') (92, 'validation', 'mystream2') (92, 'validation2', 'mystream') (92, 'validation2', 'mystream2') (93, 'validation', 'mystream') (93, 'validation', 'mystream2') (93, 'validation2', 'mystream') (93, 'validation2', 'mystream2') (94, 'validation', 'mystream') (94, 'validation', 'mystream2') (94, 'validation2', 'mystream') (94, 'validation2', 'mystream2') (95, 'validation', 'mystream') (95, 'validation', 'mystream2') (95, 'validation2', 'mystream') (95, 'validation2', 'mystream2') (96, 'validation', 'mystream') (96, 'validation', 'mystream2') (96, 'validation2', 'mystream') (96, 'validation2', 'mystream2') (97, 'validation', 'mystream') (97, 'validation', 'mystream2') (97, 'validation2', 'mystream') (97, 'validation2', 'mystream2') (98, 'validation', 'mystream') (98, 'validation', 'mystream2') (98, 'validation2', 'mystream') (98, 'validation2', 'mystream2') (99, 'validation', 'mystream') (99, 'validation', 'mystream2') (99, 'validation2', 'mystream') (99, 'validation2', 'mystream2') (100, 'validation', 'mystream') (100, 'validation', 'mystream2') (100, 'validation2', 'mystream') (100, 'validation2', 'mystream2') (101, 'validation', 'mystream') (101, 'validation2', 'mystream') (102, 'validation', 'mystream') (102, 'validation2', 'mystream') (103, 'validation', 'mystream') (103, 'validation2', 'mystream') (104, 'validation', 'mystream') (104, 'validation2', 'mystream') (105, 'validation', 'mystream') (105, 'validation2', 'mystream') (106, 'validation', 'mystream') (106, 'validation2', 'mystream') (107, 'validation', 'mystream') (107, 'validation2', 'mystream') (108, 'validation', 'mystream') (108, 'validation2', 'mystream') (109, 'validation', 'mystream') (109, 'validation2', 'mystream') (110, 'validation', 'mystream') (110, 'validation2', 'mystream') (111, 'validation', 'mystream') (111, 'validation2', 'mystream') (112, 'validation', 'mystream') (112, 'validation2', 'mystream') (113, 'validation', 'mystream') (113, 'validation2', 'mystream') (114, 'validation', 'mystream') (114, 'validation2', 'mystream') (115, 'validation', 'mystream') (115, 'validation2', 'mystream') (116, 'validation', 'mystream') (116, 'validation2', 'mystream') (117, 'validation', 'mystream') (117, 'validation2', 'mystream') (118, 'validation', 'mystream') (118, 'validation2', 'mystream') (119, 'validation', 'mystream') (119, 'validation2', 'mystream') (120, 'validation', 'mystream') (120, 'validation2', 'mystream') (121, 'validation', 'mystream') (121, 'validation2', 'mystream') (122, 'validation', 'mystream') (122, 'validation2', 'mystream') (123, 'validation', 'mystream') (123, 'validation2', 'mystream') (124, 'validation', 'mystream') (124, 'validation2', 'mystream') (125, 'validation', 'mystream') (125, 'validation2', 'mystream') (126, 'validation', 'mystream') (126, 'validation2', 'mystream') (127, 'validation', 'mystream') (127, 'validation2', 'mystream') (128, 'validation', 'mystream') (128, 'validation2', 'mystream') (129, 'validation', 'mystream') (129, 'validation2', 'mystream') (130, 'validation', 'mystream') (130, 'validation2', 'mystream') (131, 'validation', 'mystream') (131, 'validation2', 'mystream') (132, 'validation', 'mystream') (132, 'validation2', 'mystream') (133, 'validation', 'mystream') (133, 'validation2', 'mystream') (134, 'validation', 'mystream') (134, 'validation2', 'mystream') (135, 'validation', 'mystream') (135, 'validation2', 'mystream') (136, 'validation', 'mystream') (136, 'validation2', 'mystream') (137, 'validation', 'mystream') (137, 'validation2', 'mystream') (138, 'validation', 'mystream') (138, 'validation2', 'mystream') (139, 'validation', 'mystream') (139, 'validation2', 'mystream') (140, 'validation', 'mystream') (140, 'validation2', 'mystream') (141, 'validation', 'mystream') (141, 'validation2', 'mystream') (142, 'validation', 'mystream') (142, 'validation2', 'mystream') (143, 'validation', 'mystream') (143, 'validation2', 'mystream') (144, 'validation', 'mystream') (144, 'validation2', 'mystream') (145, 'validation', 'mystream') (145, 'validation2', 'mystream') (146, 'validation', 'mystream') (146, 'validation2', 'mystream') (147, 'validation', 'mystream') (147, 'validation2', 'mystream') (148, 'validation', 'mystream') (148, 'validation2', 'mystream') (149, 'validation', 'mystream') (149, 'validation2', 'mystream') (150, 'validation', 'mystream') (150, 'validation2', 'mystream') (151, 'validation', 'mystream') (151, 'validation2', 'mystream') (152, 'validation', 'mystream') (152, 'validation2', 'mystream') (153, 'validation', 'mystream') (153, 'validation2', 'mystream') (154, 'validation', 'mystream') (154, 'validation2', 'mystream') (155, 'validation', 'mystream') (155, 'validation2', 'mystream') (156, 'validation', 'mystream') (156, 'validation2', 'mystream') (157, 'validation', 'mystream') (157, 'validation2', 'mystream') (158, 'validation', 'mystream') (158, 'validation2', 'mystream') (159, 'validation', 'mystream') (159, 'validation2', 'mystream') (160, 'validation', 'mystream') (160, 'validation2', 'mystream') (161, 'validation', 'mystream') (161, 'validation2', 'mystream') (162, 'validation', 'mystream') (162, 'validation2', 'mystream') (163, 'validation', 'mystream') (163, 'validation2', 'mystream') (164, 'validation', 'mystream') (164, 'validation2', 'mystream') (165, 'validation', 'mystream') (165, 'validation2', 'mystream') (166, 'validation', 'mystream') (166, 'validation2', 'mystream') (167, 'validation', 'mystream') (167, 'validation2', 'mystream') (168, 'validation', 'mystream') (168, 'validation2', 'mystream') (169, 'validation', 'mystream') (169, 'validation2', 'mystream') (170, 'validation', 'mystream') (170, 'validation2', 'mystream') (171, 'validation', 'mystream') (171, 'validation2', 'mystream') (172, 'validation', 'mystream') (172, 'validation2', 'mystream') (173, 'validation', 'mystream') (173, 'validation2', 'mystream') (174, 'validation', 'mystream') (174, 'validation2', 'mystream') (175, 'validation', 'mystream') (175, 'validation2', 'mystream') (176, 'validation', 'mystream') (176, 'validation2', 'mystream') (177, 'validation', 'mystream') (177, 'validation2', 'mystream') (178, 'validation', 'mystream') (178, 'validation2', 'mystream') (179, 'validation', 'mystream') (179, 'validation2', 'mystream') (180, 'validation', 'mystream') (180, 'validation2', 'mystream') (181, 'validation', 'mystream') (181, 'validation2', 'mystream') (182, 'validation', 'mystream') (182, 'validation2', 'mystream') (183, 'validation', 'mystream') (183, 'validation2', 'mystream') (184, 'validation', 'mystream') (184, 'validation2', 'mystream') (185, 'validation', 'mystream') (185, 'validation2', 'mystream') (186, 'validation', 'mystream') (186, 'validation2', 'mystream') (187, 'validation', 'mystream') (187, 'validation2', 'mystream') (188, 'validation', 'mystream') (188, 'validation2', 'mystream') (189, 'validation', 'mystream') (189, 'validation2', 'mystream') (190, 'validation', 'mystream') (190, 'validation2', 'mystream') (191, 'validation', 'mystream') (191, 'validation2', 'mystream') (192, 'validation', 'mystream') (192, 'validation2', 'mystream') (193, 'validation', 'mystream') (193, 'validation2', 'mystream') (194, 'validation', 'mystream') (194, 'validation2', 'mystream') (195, 'validation', 'mystream') (195, 'validation2', 'mystream') (196, 'validation', 'mystream') (196, 'validation2', 'mystream') (197, 'validation', 'mystream') (197, 'validation2', 'mystream') (198, 'validation', 'mystream') (198, 'validation2', 'mystream') (199, 'validation', 'mystream') (199, 'validation2', 'mystream') (200, 'validation', 'mystream') (200, 'validation2', 'mystream') PKSNKJ+telstar/tests/test_consumer.pyimport logging import os import random import sys from time import sleep import peewee import redis from playhouse.db_url import connect from telstar.com import Message from telstar.consumer import MultiConsumer link = redis.from_url(os.environ["REDIS"]) db = connect(os.environ["DATABASE"]) db.connect() logger = logging.getLogger('telstar') logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.DEBUG) class Test(peewee.Model): number = peewee.IntegerField() group_name = peewee.CharField() topic = peewee.CharField() class Meta: indexes = ( (('number', 'group_name', 'topic'), True), ) database = db if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "setup": print("Recreating table in order to start from scratch") db.drop_tables([Test]) db.create_tables([Test]) def simple(consumer, record: Message, done): with db.atomic(): Test.create(number=int(record.data["value"]), group_name=consumer.group_name, topic=record.stream) sleep(random.randrange(int(os.environ.get("SLEEPINESS"))) / 100) done() MultiConsumer(link=link, group_name=os.environ.get("GROUP_NAME"), consumer_name=os.environ.get("CONSUMER_NAME"), config={ os.environ["STREAM_NAME"]: simple, os.environ["STREAM_NAME_TWO"]: simple }).run() PKSN?Xtelstar/tests/test_producer.pyimport json import logging import os import random import sys import uuid from time import sleep from typing import Callable, List, Tuple import peewee import redis from playhouse.db_url import connect from telstar.com import Message from telstar.producer import Producer link = redis.from_url(os.environ["REDIS"]) db = connect(os.environ["DATABASE"]) db.connect() logger = logging.getLogger('telstar') logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.DEBUG) class JSONField(peewee.TextField): def db_value(self, value): return json.dumps(value) def python_value(self, value): if value is not None: return json.loads(value) class Events(peewee.Model): msg_uid = peewee.UUIDField(default=uuid.uuid4) topic = peewee.CharField() data = JSONField() class Meta: database = db if __name__ == "__main__": if len(sys.argv) > 1: if sys.argv[1] == "setup": print("Recreating table in order to start from scratch") db.drop_tables([Events]) db.create_tables([Events]) if "create" in sys.argv[1:]: for i in range(int(os.environ["RANGE_FROM"]), int(os.environ["RANGE_TO"])): Events.create(topic=os.environ["STREAM_NAME"], data=dict(value=i)) def puller() -> Tuple[List[Message], Callable[[], None]]: qs = Events.select().order_by(Events.id).limit(5) msgs = [Message(e.topic, e.msg_uid, e.data) for e in qs] def done(): if not os.environ.get("KEEPEVENTS"): for r in qs: r.delete_instance() sleep(random.randrange(int(os.environ.get("SLEEPINESS"))) / 10) return msgs, done print("starting") Producer(link, puller, context_callable=db.atomic).run() PKSN33telstar/tests/test_telstar.sh#!/bin/bash # This script is used to verify/test a assumpations and guaranteesabout the system. # # Todos: # * kill redis in the process # * kill mysql in the process # * use more fuzzing - meaning kill and start processes more randomly to simulate failure # * improve the sleeping, maybe look at how we can wait until all pending messages are done and then make final diff. # Configuration export STREAM_NAME="${STREAM_NAME:-mystream}" export STREAM_NAME_TWO="${STREAM_NAME_TWO:-mystream2}" export GROUP_NAME="${GROUP_NAME:-validation}" export SLEEPINESS="${SLEEPINESS:-10}" export REDIS="${REDIS:-redis://localhost:6379/10}" export DATABASE="${DATABASE:-mysql://root:root@127.0.0.1:3306/test}" export PYTHONPATH="${PYTHONPATH}:${SCRIPTPATH}../}" export PYTHONUNBUFFERED=True readonly SCRIPTPATH="$( cd "$(dirname "$0")" pwd -P )" function kill_childs_and_exit() { echo "Attempting to kill all childs" echo "..." pkill -P $$ echo "ok, bye" exit 1 } # TRAP CTRL-C and kill all childs trap kill_childs_and_exit INT if [ -x "$(command -v redis-cli)" ]; then # Clear redis only if available echo "flushing" redis-cli -u $REDIS FLUSHDB fi main() { # Start the first customer with `create` which drops and creates the needed tables in mysql CONSUMER_NAME=1 python $SCRIPTPATH/test_consumer.py setup & CONSUMER_1=$! # This saves the PID of the last command - which we can use to `kill` the process later # What for the creation to be done sleep 2 # Start more consumers # The `SLEEPINESS` will result in race conditions in the database, which is what we want for testing SLEEPINESS=20 CONSUMER_NAME=2 python $SCRIPTPATH/test_consumer.py & CONSUMER_2=$! CONSUMER_NAME=3 python $SCRIPTPATH/test_consumer.py & CONSUMER_3=$! # Create a producer that double sends messages because it does not delete them after sending. KEEPEVENTS=1 RANGE_FROM=1 RANGE_TO=101 python $SCRIPTPATH/test_producer.py setup create & PRODUCER_1=$! sleep 2 # Create another producer that keeps double sending KEEPEVENTS=1 python $SCRIPTPATH/test_producer.py & PRODUCER_2=$! # Create a producer that emits messages onto the second stream RANGE_FROM=1 RANGE_TO=101 STREAM_NAME=$STREAM_NAME_TWO python $SCRIPTPATH/test_producer.py create & PRODUCER_TWO=$! # Since CONSUMER_1 is in the background in already has consumed some messages from the stream kill -0 $CONSUMER_1 && kill -9 $CONSUMER_1 # Wait some more for CONSUMER_2 and CONSUMER_3 to process more data. sleep 2 # Let's kill the producer while it is sending kill -0 $PRODUCER_1 && kill -9 $PRODUCER_1 kill -0 $PRODUCER_2 && kill -9 $PRODUCER_2 # Now we restart CONSUMER_1 CONSUMER_NAME=1 python $SCRIPTPATH/test_consumer.py & CONSUMER_1=$! # Let all consumers process a bit more data sleep 5 # Kill more consumers kill -0 $CONSUMER_1 && kill -9 $CONSUMER_1 kill -0 $CONSUMER_3 && kill -9 $CONSUMER_3 # Oops all consumers are dead now kill $CONSUMER_2 # Create another producer that generates the rest of the data but now also delete what was send already RANGE_FROM=101 RANGE_TO=201 python $SCRIPTPATH/test_producer.py create & PRODUCER_3=$! # Start a new one CONSUMER_NAME=4 python $SCRIPTPATH/test_consumer.py & CONSUMER_4=$! # Wait for `CONSUMER_4` to process all data sleep 30 kill -0 $CONSUMER_4 && kill -9 $CONSUMER_4 kill -0 $PRODUCER_3 && kill -9 $PRODUCER_3 kill -0 $PRODUCER_TWO && kill -9 $PRODUCER_TWO # Restart `CONSUMER_4` and kill it later CONSUMER_NAME=4 python $SCRIPTPATH/test_consumer.py & CONSUMER_4=$! sleep 30 kill -0 $CONSUMER_4 && kill -9 $CONSUMER_4 # Create a new consumer group that reads everything from the beginning of time. SLEEPINESS=1 CONSUMER_NAME=4 GROUP_NAME=validation2 python $SCRIPTPATH/test_consumer.py & NEW_CONSUMER=$! sleep 10 kill -0 $NEW_CONSUMER && kill -9 $NEW_CONSUMER # Verify that the database hold the expected records python $SCRIPTPATH/_dbcontent.py | diff $SCRIPTPATH/expected.txt - || exit 1 # Makre sure we do not have pending message left PENDING_STREAM_1=$(redis-cli -u $REDIS --csv XPENDING telstar:stream:$STREAM_NAME validation) PENDING_STREAM_2=$(redis-cli -u $REDIS --csv XPENDING telstar:stream:$STREAM_NAME_TWO validation) PENDING_STREAM_3=$(redis-cli -u $REDIS --csv XPENDING telstar:stream:$STREAM_NAME_TWO validation2) test "$PENDING_STREAM_1" == "0,NIL,NIL,NIL" || { echo "$STREAM_NAME validation has pending records" && exit 1; } test "$PENDING_STREAM_2" == "0,NIL,NIL,NIL" || { echo "$STREAM_NAME validation2 has pending records" && exit 1; } test "$PENDING_STREAM_3" == "0,NIL,NIL,NIL" || { echo "$STREAM_NAME_TWO validation2 has pending records" && exit 1; } } main PKSNe33telstar-0.2.4.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2019 Bitspark Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HPOtelstar-0.2.4.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!H$~ telstar-0.2.4.dist-info/METADATAeN@ ~lKATD4E$&.oϢ3/(a ';#S':M(ۉ=1{V@X[D%CEP^9Hb^h{M TuMB՚ੈ`RT7p҈ښWk6r\ِ6A\B 9hm7~ӹ؞U)9X0;՟bH׬#іE uvX3PK!H]Ntelstar-0.2.4.dist-info/RECORDuv@< 4Y@l ݶ~9q&ljQU?.ڧ0a8LwYY,*bY(G[9e5VQ1(;yNA߭(-qJa"R~9Ls wAAVJ>WGQAI]u},ދ"`k0qTw`-L^?i%Bo\"dڶ[ڞ,~a _},ۭ$piW}YۣhĒpA:=iz1*<%k:dЛfN-Ύv !]f2&"%4KgtF*dSU_RFGO &3,ʪ߷ U3. h+K?g\ުEIV9H)&}d^VA`p^:,bWPK=a9E`9bplٲO9]a$fL,6ʃQ=ʟ7ȹ;u5܂[v7Z ϩ߻~ց~ l$=:xo7jND%dT[Ӂ;}Ir/;d(!h:\ϯQgw;gPKSNѰ  telstar/__init__.pyPKSN8@kO telstar/admin.pyPKSNqy::%telstar/consumer.pyPKSN 3 Utelstar/producer.pyPKSN[;B B _telstar/com/__init__.pyPKSN9jtelstar/tests/_dbcontent.pyPKSNj`DKDK5ktelstar/tests/expected.txtPKSNKJ+telstar/tests/test_consumer.pyPKSN?Xļtelstar/tests/test_producer.pyPKSN33telstar/tests/test_telstar.shPKSNe33telstar-0.2.4.dist-info/LICENSEPK!HPOtelstar-0.2.4.dist-info/WHEELPK!H$~ telstar-0.2.4.dist-info/METADATAPK!H]Ntelstar-0.2.4.dist-info/RECORDPK