PKe^K;aiologstash/__init__.py"""asyncio-compatible logstash logging handler.""" __version__ = '0.0.3' import asyncio import logging from .tcp_handler import TCPLogstashHandler __all__ = ('create_tcp_handler',) async def create_tcp_handler(host, port, level=logging.NOTSET, close_timeout=5, reconnect_delay=1, reconnect_jitter=0.3, qsize=10000, loop=None, **kwargs): if loop is None: loop = asyncio.get_event_loop() handler = TCPLogstashHandler(host=host, port=port, level=level, close_timeout=close_timeout, qsize=qsize, loop=loop, reconnect_delay=reconnect_delay, reconnect_jitter=reconnect_jitter, **kwargs) try: await handler._connect() except OSError: handler.close() await handler.wait_closed() raise return handler PKe^KKubbaiologstash/base_handler.pyimport abc import asyncio import logging import random from async_timeout import timeout from logstash import LogstashFormatterVersion1 from .log import logger class BaseLogstashHandler(logging.Handler): def __init__(self, *, level, close_timeout, qsize, loop, reconnect_delay, reconnect_jitter): self._close_timeout = close_timeout self._reconnect_delay = reconnect_delay self._reconnect_jitter = reconnect_jitter self._random = random.Random() self._loop = loop self._queue = asyncio.Queue(maxsize=qsize, loop=self._loop) super().__init__(level=level) formatter = LogstashFormatterVersion1() self.setFormatter(formatter) self._closing = False self._worker = self._loop.create_task(self._work()) @abc.abstractmethod async def _connect(self): pass # pragma: no cover @abc.abstractmethod async def _send(self, data): pass # pragma: no cover @abc.abstractmethod async def _disconnect(self): pass # pragma: no cover def emit(self, record): if self._closing: msg = 'Log message skipped due shutdown "%(record)s"' context = {'record': record} logger.warning(msg, context) return if self._queue.full(): msg = 'Queue is full, drop oldest message: "%(record)s"' context = {'record': self._queue.get_nowait()} logger.warning(msg, context) self._queue.put_nowait(record) async def _work(self): reconnection = False while True: if not reconnection: record = await self._queue.get() if record is ...: self._queue.put_nowait(...) break reconnection = False try: data = self._serialize(record) try: await self._send(data) except OSError: reconnection = True await self._reconnect() except asyncio.CancelledError: raise except Exception as exc: msg = 'Unexpected exception while sending log' logger.warning(msg, exc_info=exc) async def _reconnect(self): logger.info('Transport disconnected') await self._disconnect() while True: try: await self._connect() logger.info('Transport reconnected') return except OSError: delay = self._random.gauss(self._reconnect_delay, self._reconnect_jitter) await asyncio.sleep(delay, loop=self._loop) def _serialize(self, record): return self.format(record) + b'\n' # dummy statement for default handler close() # non conditional close() usage actually def close(self): if self._closing: return self._closing = True if self._queue.full(): msg = ('Queue is full, drop oldest message before closing' ': "%(record)s"') context = {'record': self._queue.get_nowait()} logger.warning(msg, context) self._queue.put_nowait(...) super().close() @abc.abstractmethod async def wait_closed(self): if self._worker is None: return # already closed try: async with timeout(self._close_timeout, loop=self._loop): await self._worker except asyncio.TimeoutError: self._worker.cancel() try: await self._worker except: # noqa pass self._worker = None assert self._queue.qsize() == 1 assert self._queue.get_nowait() is ... await self._disconnect() PKe^KXX99aiologstash/log.pyimport logging logger = logging.getLogger(__package__) PKe^KRQaiologstash/tcp_handler.pyimport asyncio from .base_handler import BaseLogstashHandler class TCPLogstashHandler(BaseLogstashHandler): def __init__(self, *, host, port, level, close_timeout, qsize, loop, reconnect_delay, reconnect_jitter, **kwargs): super().__init__(level=level, close_timeout=close_timeout, qsize=qsize, reconnect_delay=reconnect_delay, reconnect_jitter=reconnect_jitter, loop=loop) self._reader = None self._writer = None self._host = host self._port = port self._kwargs = kwargs async def _connect(self): self._reader, self._writer = await asyncio.open_connection( self._host, self._port, loop=self._loop, **self._kwargs) async def _send(self, data): self._writer.write(data) await self._writer.drain() async def _disconnect(self): if self._writer is not None: self._writer.close() await asyncio.sleep(0, loop=self._loop) # wait for writer closing self._reader = None self._writer = None PKe^KLBB#aiologstash-0.0.3.dist-info/LICENSEThe MIT License Copyright (c) 2017 Ocean S. A. https://ocean.io/ Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H}0RR!aiologstash-0.0.3.dist-info/WHEEL1 0 RZtMDtPI{w<wUnbaKM*A1ѭ g\Ic c~PK!Hh)z$aiologstash-0.0.3.dist-info/METADATAT]o0 |ׯ[[`vٖ,nb36QK$9A(N =(2 ֡ Wl"$\)Mtۆ{'٬VJ5aZhԨJz;Ns^H`7 J]x_$sE=DWڡ#LA;"=M~N&lP؄OS@G(e[|1!(а_F .zXJԿ7TR: j: AJSQ{O@LgC>*kD QɭT*\H꼦K>/nNf|c< _I ] #[i܎du KyGk,ؗN_Oxdn:.ԞF.ƄB6r.JQo-TN΃e'^(bG}&,9xo<1nEg2!8tQKEmׁ]/Xqa6p !jڕ%Yh^􅼺ʒ^aqCܛRQY TN֨]Z}Z=4Mu<@_$!֬>+t.>0O/^we[cXQ(R, rLEP_$cf2v_7ocW!!;BBN*Ň<p$?#[_ޝK0L@vG̜KCBrT%mv/OfQ˵Uْ S̊+bfBd; @e~' ; 4| HaR֠<9͂˹;)~ o;njxBq*r=t-Qţ1q3I33ff&ۢ{7|O$q}#5N`u(Žv⯖k; 4 aTbؠQ&˻C\pdQ }3Qҵ')' `Vo{auJmyN|puzPKe^K;aiologstash/__init__.pyPKe^KKubbJaiologstash/base_handler.pyPKe^KXX99aiologstash/log.pyPKe^KRQNaiologstash/tcp_handler.pyPKe^KLBB#Jaiologstash-0.0.3.dist-info/LICENSEPK!H}0RR!aiologstash-0.0.3.dist-info/WHEELPK!Hh)z$^aiologstash-0.0.3.dist-info/METADATAPK!H:Ϭv"!aiologstash-0.0.3.dist-info/RECORDPKX"