PK ! : : epimetheus/__init__.pyfrom .registry import Registry __all__ = ('Registry', ) PK ! epimetheus/exposers/__init__.pyPK ! _Cv; ; epimetheus/exposers/daemon.py""" NOTE: exposition of this webserver into public may be insecure """ from http.server import HTTPServer, SimpleHTTPRequestHandler class MetricRequestHandler(SimpleHTTPRequestHandler): pass def run(addr: str, port: int): httpd = HTTPServer((addr, port), MetricRequestHandler) httpd.serve_forever() PK ! so epimetheus/metrics.pyfrom collections import deque from math import ceil, floor from typing import Tuple import attr from .sample import SampleKey, SampleValue, clock # Metrics should be optimized for fast receiving of data # And relatively rare reporting __all__ = ('Counter', 'Gauge', 'Histogram', 'Summary') @attr.s class MetricWithTimestamp: _use_clock: bool = attr.ib(default=True) _reclock_if_changed: bool = attr.ib(default=False) _ts = attr.ib(init=False, default=None) def __attrs_post_init__(self): self._update_ts(1) def _update_ts(self, vdiff): if not self._use_clock: return if (not self._reclock_if_changed) or vdiff: self._ts = clock() @attr.s class Counter(MetricWithTimestamp): TYPE = 'counter' _count = attr.ib(init=False, default=0) def inc(self, delta: float = 1): assert delta >= 0 self._count += delta self._update_ts(delta) def sample_group(self, skey: SampleKey): yield skey def sample_values(self): yield SampleValue(self._count, self._ts) @attr.s class Gauge(MetricWithTimestamp): TYPE = 'gauge' _value = attr.ib(init=False, default=0) def inc(self, delta: float = 1): self._value += delta self._update_ts(delta) def dec(self, delta: float = 1): self._value -= delta self._update_ts(delta) def set(self, value: float): vdiff = value - self._value self._value = value self._update_ts(vdiff) def set_with_timestamp(self, value: float, ts: int): "For metrics like daily active users" self._value = value self._ts = ts # TODO: set_to_current_time def sample_group(self, skey: SampleKey): yield skey def sample_values(self): yield SampleValue(self._value, self._ts) def to_sorted_tuple(x): return tuple(sorted(x)) @attr.s class Histogram: TYPE = 'histogram' RESERVED_LABELS = frozenset(['le']) _buckets: Tuple[float] = attr.ib(converter=to_sorted_tuple) _bcounts = attr.ib(init=False) _inf_bcount = attr.ib(init=False, default=0) _sum = attr.ib(init=False, default=0) _count = attr.ib(init=False, default=0) def __attrs_post_init__(self): self._bcounts = [0 for _ in self._buckets] def observe(self, value: float): # TODO: optimize, use bisect for index, upper in enumerate(self._buckets): if value <= upper: self._bcounts[index] += 1 break else: self._inf_bcount += 1 self._sum += value self._count += 1 def sample_group(self, skey: SampleKey): bkey = skey.with_suffix('_bucket') for b in self._buckets: yield bkey.with_labels(le=b) yield bkey.with_labels(le='+Inf') yield skey.with_suffix('_sum') yield skey.with_suffix('_count') def sample_values(self): for v in self._bcounts: yield SampleValue(v) yield SampleValue(self._inf_bcount) yield SampleValue(self._sum) yield SampleValue(self._count) @attr.s class Summary: TYPE = 'summary' RESERVED_LABELS = frozenset(['quantile']) # TODO: validate range 0..1 _buckets: Tuple[float] = attr.ib(converter=to_sorted_tuple) _time_window: float = attr.ib(default=3600) _samples = attr.ib(init=False, factory=deque) @_buckets.validator def _validate_buckets(self, attribute, value): for x in value: if not (0 <= x <= 1): raise ValueError('Quantiles must be in range 0..1') def _clean_old_samples(self): before = clock() - int(self._time_window * 1000) while self._samples and self._samples[0].timestamp <= before: self._samples.popleft() def observe(self, value: float): self._samples.append(SampleValue.create(value, clock())) self._clean_old_samples() def sample_group(self, skey: SampleKey): for b in self._buckets: yield skey.with_labels(quantile=b) yield skey.with_suffix('_sum') yield skey.with_suffix('_count') def sample_values(self): self._clean_old_samples() if not self._samples: return quantiles = [] ss = list(sorted(self._samples, key=lambda s: s.value)) n = len(ss) ss.append(ss[-1]) for qp in self._buckets: k = (n - 1) * qp f, c = floor(k), ceil(k) if f == c: qval = ss[f].value else: qval = ( ss[f].value * (c - k) + ss[c].value * (k - f) ) quantiles.append(qval) for v in quantiles: yield SampleValue(v) yield SampleValue(sum(s.value for s in ss[:n])) yield SampleValue(n) @attr.s class Group: _key: SampleKey = attr.ib() _mcls: type = attr.ib() _args: tuple = attr.ib(default=()) _kwargs: dict = attr.ib(factory=dict) _help: str = attr.ib(default=None) _items: dict = attr.ib(init=False, factory=dict) _rendered_keys: dict = attr.ib(init=False, factory=dict) def with_labels(self, **labels): k = self._key.with_labels(**labels) if k in self._items: return self._items[k] # TODO: RESERVED_LABELS m = self._mcls(*self._args, **self._kwargs) self._items[k] = m self._rendered_keys[k] = tuple(rk.expose() for rk in m.sample_group(k)) return m def expose_header(self): if self._help is not None: yield f'# HELP {self._help}' yield f'# TYPE {self._key.name} {self._mcls.TYPE}' def expose(self): he = False for k, m in self._items.items(): for rk, v in zip(self._rendered_keys[k], m.sample_values()): # expose header only if we have samples if not he: yield from self.expose_header() he = True yield f'{rk} {v.expose()}' PK ! `e e epimetheus/registry.pyfrom typing import Dict import attr from . import metrics from .sample import SampleKey __all__ = ('Registry', ) def _create_builder(mcls): def builder( self, *, name: str, labels: Dict[str, str] = attr.NOTHING, help: str = None, **kwargs, ): key = SampleKey(name, labels) if self.get(key) is not None: raise KeyError('Attempt to create same metric group twice') # we do not check there equality of instances # TODO: may be we should? group = metrics.Group( key=key, mcls=mcls, kwargs=kwargs, help=help, ) self.register(key, group) return group return builder @attr.s class Registry: _groups = attr.ib(init=False, factory=dict) def get(self, key: SampleKey): return self._groups.get(key) def register(self, key, group): assert key not in self._groups self._groups[key] = group def unregister(self, key): del self._groups[key] def expose(self): for exp in self._groups.values(): yield from exp.expose() yield '' counter = _create_builder(metrics.Counter) gauge = _create_builder(metrics.Gauge) histogram = _create_builder(metrics.Histogram) summary = _create_builder(metrics.Summary) PK ! mo-'I I epimetheus/sample.pyimport math import re import time from typing import Dict import attr from sortedcontainers import SortedDict __all__ = ('SampleKey', 'SampleValue', 'clock') METRIC_NAME_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') LABEL_NAME_RE = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$') def convert_labels(value): return {k: str(v) for k, v in value.items()} @attr.s(cmp=True, frozen=True) class SampleKey: _name: str = attr.ib(cmp=False, repr=False) _labels: Dict[str, str] = attr.ib( factory=dict, converter=convert_labels, cmp=False, repr=False) _line: str = attr.ib(init=False, cmp=False, repr=True) _cmp_line: str = attr.ib(init=False, cmp=True, repr=False) @_name.validator def _validate_name(self, attribute, value): if METRIC_NAME_RE.fullmatch(value) is None: raise ValueError('Invalid sample name') @_labels.validator def _validate_labels(self, attribute, value): for k, v in value.items(): if k.startswith('__'): raise ValueError('Label names starting with __ are reserved by Prometheus') if LABEL_NAME_RE.fullmatch(k) is None: raise ValueError('Invalid label name') def with_suffix(self, suffix: str) -> 'SampleKey': return type(self)( name=self._name + suffix, labels=self._labels, ) def with_labels(self, **new_labels: Dict[str, str]): return type(self)( name=self._name, labels={**self._labels, **new_labels}, ) @staticmethod def expose_label_value(v) -> str: return str(v).replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"') @classmethod def expose_label_set(cls, labels: Dict[str, str]) -> str: if not labels: return '' x = ','.join(f'{k}="{cls.expose_label_value(v)}"' for k, v in labels.items()) return '{' + x + '}' def __attrs_post_init__(self): line = f'{self._name}{self.expose_label_set(self._labels)}' object.__setattr__(self, '_line', line) line = f'{self._name}{self.expose_label_set(SortedDict(self._labels))}' object.__setattr__(self, '_cmp_line', line) @property def name(self): return self._name def expose(self): return self._line @attr.s class SampleValue: value: float = attr.ib(default=0) timestamp: int = attr.ib(default=None) @classmethod def create(cls, value: float, use_ts=False): if use_ts: return cls(value, clock()) return cls(value) @staticmethod def expose_value(value): if value == math.inf: return 'Inf' elif value == -math.inf: return '-Inf' elif math.isnan(value): return 'Nan' return str(value) @staticmethod def expose_timestamp(ts): return str(ts) def expose(self): if self.timestamp is not None: return f'{self.expose_value(self.value)} {self.expose_timestamp(self.timestamp)}' return f'{self.expose_value(self.value)}' def clock(): return int(time.time() * 1000) PK !HnHT U epimetheus-0.3.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK !HP>" # epimetheus-0.3.1.dist-info/METADATAN0EYXm*XiE*ڴ<&b;ԿiĆ=xD/+e֑F| P v7J<&(%NTҖΚ# eK=b#ncv+b\],l|c A])*IWM[tu^F{|dѥ]tqnsW\{!`ìQX?OV*Eu=hOy>!HףּdxGI>L)4whnO$:c=V^IPK !Hk ! epimetheus-0.3.1.dist-info/RECORD}Ϲ@ |`lPDbie%d_ӌ 7rɊyL[(uݫc&|DJܒ@_