PK!>kletov/__init__.py# flake8: noqa from .handler import ZstdChunkedHandler from .encoder import LoggingEncoder from .formatter import JsonFormatter __version__ = '1.0.0' PK![x nn letov/cli.pyimport signal import subprocess import sys import click from letov.stream import ZstdChunkedWrapper @click.group() @click.option('--name', required=True, help='Log stream name') @click.option( '--hard-limit', default=255000, type=int, help='Size limit in bytes that must not be exceeded.', ) @click.option( '--soft-limit', default=220000, type=int, help='Size limit in bytes that stream heuristics will work against.' ) @click.option( '--flush-every', default=60, type=int, help=( 'How many time, in seconds, should the stream wait before flush, ' 'if not enough data were fed to reach size limit. ' 'Zero or negative values disable this behavior.' ) ) @click.pass_context def cli(context, name, hard_limit, soft_limit, flush_every): context.ensure_object(dict) context.obj.update({ 'flush_every': flush_every, 'hard_limit': hard_limit, 'name': name, 'soft_limit': soft_limit, }) @cli.command(context_settings={'ignore_unknown_options': True}) @click.argument('exec', nargs=-1, type=click.UNPROCESSED) @click.pass_context def run(context, exec): def terminate_child(*args, **kwargs): # will break readline loop and letov will exit proc.terminate() proc.wait() stream = ZstdChunkedWrapper( sys.stdout, group_name=context.obj['name'], soft_limit=context.obj['soft_limit'], hard_limit=context.obj['hard_limit'], flush_every=context.obj['flush_every'], ) proc = subprocess.Popen( exec, universal_newlines=True, stdout=subprocess.PIPE, ) signal.signal(signal.SIGINT, terminate_child) signal.signal(signal.SIGTERM, terminate_child) try: for data in iter(proc.stdout.readline, ''): stream.write(data) stream.flush() finally: terminate_child() PK![letov/encoder.pyimport json from datetime import date, datetime from decimal import Decimal from enum import Enum from types import MappingProxyType class LoggingEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, (datetime, date)): return obj.isoformat() elif isinstance(obj, set): return tuple(obj) elif isinstance(obj, bytes): return obj.decode('utf-8') elif isinstance(obj, Decimal): return float(obj) elif isinstance(obj, Enum): return str(obj) elif isinstance(obj, MappingProxyType): return dict(obj) elif hasattr(obj, 'to_json'): return obj.to_json() return {'_pyobject': repr(obj)} PK!l"letov/formatter.pyimport json import logging import os import socket from datetime import datetime from .encoder import LoggingEncoder logger = logging.getLogger(__name__) LEVELNAME = os.environ.get('LEVELNAME', 'LOCAL') INSTANCE_NAME = socket.gethostname() ECS_TASK_NAME = os.environ.get('TASK_NAME') class JsonFormatter(logging.Formatter): def __init__( self, appname, levelname=LEVELNAME, instance_name=INSTANCE_NAME, ecs_task_name=ECS_TASK_NAME, **kwargs ): super().__init__() self.appname = appname self.levelname = levelname self.instance_name = instance_name self.ecs_task_name = ecs_task_name self.kwargs = kwargs def usesTime(self): return True def format(self, record, etime=None): try: body = super().format(record) except Exception as e: logger.error( f'Log formatting error: {e}, logrecord_msg: {record.msg}, ' f'logrecord_args: {record.args}' ) return None if etime is None: etime = datetime.utcnow() info_keys = [ x for x in record.__dict__ if x not in {'args', 'context'} ] info = { **{key: getattr(record, key, '') for key in info_keys}, **self.kwargs, 'created': etime.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], 'appname': self.appname, 'ecs_task_name': self.ecs_task_name, 'instance_name': self.instance_name, 'levelname': self.levelname, 'loggername': record.name, 'message': body, } if record.args and isinstance(record.args, dict): info.update(record.args) return json.dumps( { 'context': getattr(record, 'context', {}), 'info': info, }, cls=LoggingEncoder, ) PK!頜letov/handler.pyimport logging from letov.stream import ZstdChunkedWrapper logger = logging.getLogger(__name__) class ZstdChunkedHandler(logging.StreamHandler): """ Handler that compresses all input data with ZSTD and flushes it to wrapped stream in chunks less than specified size. Each chunk forms a json with metadata and valid base64-encoded ZSTD frame. Each input line is guaranteed to make it to the chunk entirely. Chunks are delimited with newline. :param stream: Wrapped stream :param group_name: Name that will be a part of chunk's metadata. :param soft_limit: Size limit in bytes that stream heuristics will work against. :param hard_limit: Size limit in bytes that must not be exceeded. :param flush_every: How many time, in seconds, should the stream wait before flush, if not enough data were fed to reach size limit. Zero or negative values disable this behavior. :param compression_params: Params dict to be passed to ZstdCompressor. """ def __init__( self, stream, group_name, flush_every, soft_limit, hard_limit, compression_params=None ): super().__init__( ZstdChunkedWrapper( stream, group_name, flush_every, soft_limit, hard_limit, compression_params ) ) def filter(self, record): return super().filter(record) and not record.name.startswith('letov') def handleError(self, record): logger.exception('Fatal logging error') def emit(self, record): # Basically a StreamHandler.emit, but without flush try: msg = self.format(record) self.stream.write(msg + self.terminator) except Exception: self.handleError(record) PK!}yyletov/stream.pyimport json import time from base64 import b64encode from io import BytesIO from math import inf from typing import Iterator, List import zstd class _LineBuffer: def __init__(self): self.buffer = '' def write(self, data: str) -> List[str]: if not data: return [] self.buffer += data lines = self.buffer.splitlines(keepends=True) if lines[-1].endswith('\n'): self.buffer = '' else: self.buffer = lines.pop(-1) return lines def flush(self) -> str: data, self.buffer = self.buffer, '' return data class ZstdChunkedWrapper: """ Stream wrapper that compresses all input data with ZSTD and flushes it to wrapped stream in chunks less than specified size. Each chunk forms a json with metadata and valid base64-encoded ZSTD frame. Each input line is guaranteed to make it to the chunk entirely. This class is not intended to use with small size limits (orders of size of compressed log message). Chunks are delimited with newline. :param stream: Wrapped stream :param group_name: Name that will be a part of chunk's metadata. :param soft_limit: Size limit in bytes that stream heuristics will work against. :param hard_limit: Size limit in bytes that must not be exceeded. :param flush_every: How many time, in seconds, should the stream wait before flush, if not enough data were fed to reach size limit. Zero or negative values disable this behavior. :param compression_params: Params dict to be passed to ZstdCompressor. """ def __init__( self, stream, group_name, flush_every, soft_limit, hard_limit, compression_params=None ): self.stream = stream self.group_name = group_name self.flush_every = flush_every if flush_every > 0 else inf self.soft_limit = soft_limit self.hard_limit = hard_limit self.last_flush = time.monotonic() self.compression_params = compression_params or {} self._output = BytesIO() # buffer with compressed data self._line_buffer = _LineBuffer() overhead = self._get_formatting_overhead() # considering base64 (hence multiplying by 3/4) # 14 is zstd frame header maximum size self._raw_hard_limit = hard_limit * 3 // 4 - overhead - 14 self._raw_soft_limit = soft_limit * 3 // 4 - overhead - 14 # compression self._compressor = zstd.ZstdCompressor(**self.compression_params) self._decompressor = zstd.ZstdDecompressor(**self.compression_params) self._zstd_stream = self._compressor.stream_writer(self._output) self._frame_progression = self._compressor.frame_progression() self._consumed = 0 self._produced = 0 self._chunk_first_write_ts = None @property def avg_compression_ratio(self): try: return self._consumed / self._produced except ZeroDivisionError: # first block ¯\_(ツ)_/¯ return 2 def write(self, data: str): for line in self._line_buffer.write(data): self._write(line) def flush(self): residue = self._line_buffer.flush() if residue: self._write(residue) self._flush() def close(self): self._zstd_stream.close() def _write(self, line: str): if not self._chunk_first_write_ts: self._chunk_first_write_ts = time.time() if self._write_will_overflow(line): self._flush() if self._zstd_stream.write(line.encode('utf-8')): # internal buffer was flushed self._update_stats() if time.monotonic() - self.last_flush >= self.flush_every: self._flush() def _flush(self): if self._chunk_first_write_ts: # its not empty self._zstd_stream.flush(zstd.FLUSH_FRAME) raw_chunk = self._output.getvalue() for chunk in self._split_chunk(raw_chunk): self.stream.write(self._format_chunk(chunk)) self.stream.flush() self._output.seek(0) self._output.truncate() self._chunk_first_write_ts = None self.last_flush = time.monotonic() def _update_stats(self): _, prev_consumed, prev_produced = self._frame_progression self._frame_progression = _, consumed, produced = ( self._compressor.frame_progression() ) self._consumed += (consumed - prev_consumed) self._produced += (produced - prev_produced) def _format_chunk(self, chunk: bytes) -> str: return json.dumps({ 'data': b64encode(chunk).decode('ascii'), 'end_ts': time.time(), 'name': self.group_name, 'start_ts': self._chunk_first_write_ts, }) + '\n' def _get_formatting_overhead(self): self._chunk_first_write_ts = time.time() overhead = len(self._format_chunk(b'')) self._chunk_first_write_ts = None return overhead def _write_will_overflow(self, data: str) -> bool: ingested, consumed, produced = self._compressor.frame_progression() zstd_buffer_size = ingested - consumed estimated_compressed_size = ( produced + (zstd_buffer_size + len(data)) / self.avg_compression_ratio ) return self._raw_soft_limit <= estimated_compressed_size def _split_chunk(self, chunk: bytes) -> Iterator[bytes]: if len(chunk) <= self._raw_hard_limit: yield chunk return # recompress biggest lines separately until it fits size limit data = self._decompressor.stream_reader(BytesIO(chunk)).readall() lines = data.splitlines(keepends=True) while len(chunk) >= self._raw_hard_limit: # avoid searching for a line through list biggest_line_index = max( range(len(lines)), key=lambda index: len(lines.__getitem__(index)) ) yield self._compressor.compress(lines.pop(biggest_line_index)) chunk = self._compressor.compress(b''.join(lines)) yield chunk PK!HRLq$'&letov-1.0.0.dist-info/entry_points.txtN+I/N.,()I-/z9V@PK!HnHTUletov-1.0.0.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hyletov-1.0.0.dist-info/METADATAKO1]j43`4@TX#ڹ _oKwڜD3r:d iN5D'?լ&dN=[RhM5F@n*z+Yp>`z-Zoy˜bVEyΏʳɍɯCTjl>z0d3Da;Bլi aaPK!Hletov-1.0.0.dist-info/RECORDuKs0}? "7v\ O3]87y^q" T6%G$VĊ-:ɭ6*3y!= + k]4A .F!^W##zN&XɈ;tӑJH#Y4<1MVZYC~OEHˏ+ywrB_yj/y4i'L`^tk܌ޒtNq_8 M3N po_3>ԻU CC,0csVl1ihVi`J\CAIW~d:]Z2.MNnq: g_m 0r$Rgnj%rg"4`2s[gB-yưjWE82m?qfFE"*xn\8PK!>kletov/__init__.pyPK![x nn letov/cli.pyPK![^letov/encoder.pyPK!l"x letov/formatter.pyPK!頜Uletov/handler.pyPK!}yysletov/stream.pyPK!HRLq$'&3letov-1.0.0.dist-info/entry_points.txtPK!HnHTU3letov-1.0.0.dist-info/WHEELPK!Hy4letov-1.0.0.dist-info/METADATAPK!HD5letov-1.0.0.dist-info/RECORDPK ]7