PK!ڳommcarriage/__init__.py from .array import Array from .lambda_ import X, Xcall from .map import Map from .optional import Nothing, NothingError, Optional, Some from .row import Row from .stream import Stream from .streamtable import StreamTable __all__ = ['Row', 'Map', 'Stream', 'Array', 'Xcall', 'X', 'Optional', 'Some', 'Nothing', 'NothingError', 'StreamTable'] PK!*֬MMcarriage/array.py import builtins import collections import itertools as itt from collections import Counter, defaultdict, deque from copy import copy from .monad import Monad from .optional import Nothing, Some from .repr import short_repr from .row import CurrNext, CurrPrev, KeyValues, ValueIndex class Array(Monad): __slots__ = '_items' @classmethod def range(cls, start, end=None, step=1): '''Create a Array from range. >>> Array.range(2, 10, 2).to_list() [2, 4, 6, 8] >>> Array.range(3).to_list() [0, 1, 2] ''' if end is None: start, end = 0, start return cls(range(start, end, step)) @property def _base_type(self): return Array def __init__(self, items=None): '''Create a Array object >>> Array() Array([]) >>> Array([1, 2, 3]) Array([1, 2, 3]) >>> Array(range(2, 8, 2)) Array([2, 4, 6]) ''' if isinstance(items, list): pass elif isinstance(items, collections.Iterable): items = list(items) elif items is None: items = [] else: raise TypeError("items should be iterable") self._items = items @classmethod def unit(cls, value): return Array([value]) def to_list(self, copy=False): '''Convert to a list. >>> Array.range(3).to_list() [0, 1, 2] ''' if copy: return self._items else: return self._items[:] def to_series(self): '''Convert to a pandas Series >>> Array.range(5, 10, 2).to_series() 0 5 1 7 2 9 dtype: int64 Returns ------- pandas.Series ''' import pandas as pd return pd.Series(self) def to_set(self): '''Convert to a set >>> Array([3, 2, 3, 6, 2]).to_set() {2, 3, 6} Returns ------- set ''' return set(self) def to_dict(self): '''Convert to a dict >>> Array.range(5, 10, 2).zip_index().to_dict() {5: 0, 7: 1, 9: 2} Returns ------- dict ''' return dict(self) def to_map(self): '''Convert to a Map >>> Array.range(5, 10, 2).zip_index().to_map() Map({5: 0, 7: 1, 9: 2}) Returns ------- Map ''' from .map import Map return Map(self) def to_stream(self): '''Convert to a Stream >>> strm = Array.range(5, 8, 2).zip_index().to_stream() >>> type(strm) >>> strm.to_array() Array([Row(value=5, index=0), Row(value=7, index=1)]) Returns ------- Stream ''' from .stream import Stream return Stream(self) def map(self, action): '''Create a new Array by applying function to each element >>> Array.range(5, 8).map(lambda x: x * 2) Array([10, 12, 14]) Returns ------- Array ''' return Array(action(item) for item in self) def starmap(self, func): '''Create a new Array by evaluating function using argument tulpe from each element. i.e. ``func(*elem)``. It's convenient that if all elements in Array are iterable and you want to treat each element in elemnts as separate argument while calling the function. >>> Array([(1, 2), (3, 4)]).starmap(lambda a, b: a+b) Array([3, 7]) The map way. Not easy to read and write >>> Array([(1, 2), (3, 4)]).map(lambda a_b: a_b[0]+a_b[1]) Array([3, 7]) ''' return Array(itt.starmap(func, self)) def flatten(self): '''flatten each element >>> Array([(1, 2), (3, 4)]).flatten() Array([1, 2, 3, 4]) Returns ------- Array ''' return Array(itt.chain.from_iterable(self._items)) def flat_map(self, to_iterable_action): '''Apply function to each element, then flatten the result. >>> Array([1, 2, 3]).flat_map(range) Array([0, 0, 1, 0, 1, 2]) Returns ------- Array ''' return Array( itt.chain.from_iterable( to_iterable_action(item) for item in self._items)) def tap(self, tag='', n=5, msg_format='{tag}:{index}: {elem}'): '''A debugging tool. This method create a new Array with the same elements. While creating it, it print first n elements. >>> (Array.range(3).tap('orig') ... .map(lambda x: x * 2).tap('x2') ... .accumulate(lambda a, b: a + b) ... .tap_with(func=lambda i, e: f'{i} -> {e}') ... ) orig:0: 0 orig:1: 1 orig:2: 2 x2:0: 0 x2:1: 2 x2:2: 4 0 -> 0 1 -> 2 2 -> 6 Array([0, 2, 6]) ''' def tap_tr(self_): for index, elem in enumerate(self_): if index < n: print(msg_format.format(tag=tag, index=index, elem=elem)) yield elem return Array(tap_tr(self)) def tap_with(self, func, n=5): '''A debugging tool. This method create a new Array with the same elements. While creating Array, it call the function using index and element then prints the return value for first n elements. >>> (Array.range(3).tap('orig') ... .map(lambda x: x * 2).tap('x2') ... .accumulate(lambda a, b: a + b) ... .tap_with(func=lambda i, e: f'{i} -> {e}') ... ) orig:0: 0 orig:1: 1 orig:2: 2 x2:0: 0 x2:1: 2 x2:2: 4 0 -> 0 1 -> 2 2 -> 6 Array([0, 2, 6]) Parameters ----------- func : ``func(index, elem) -> Any`` Function for building the printing object. n : int First n element will be print. ''' def tap_with_tr(self_): for index, elem in enumerate(self_): if index < n: print(func(index, elem)) yield elem return Array(tap_with_tr(self)) def then(self, alist): # TODO if len(self._items) > 0: return alist else: return self def ap(self, avalue): # TODO pass def __len__(self): return len(self._items) def __iter__(self): return iter(self._items) def __repr__(self): return f'{type(self).__name__}({short_repr.repr(self._items)})' @property def _value_for_cmp(self): return self._items def len(self): '''Get the length''' return len(self) def __getitem__(self, index): '''Get the item in the nth position if index is integer. Get a Array of a slice of items if index is a slice object. Parameters ---------- index : int, slice index of target item or a slice object ''' if isinstance(index, slice): return Array(self._items[index]) return self._items[index] def get(self, index, default=None): '''Get item of the index. Return default value if not exists. ''' if index < len(self): return self._items[index] return default def get_opt(self, index): '''Optionally get item of the index. Return Some(value) if exists. Otherwise return Nothing. ''' if (index if index >= 0 else abs(index) - 1) < len(self): return Some(self._items[index]) return Nothing def slice(self, start, stop, step=None): '''Create a Array from the slice of items. Returns ------- Array ''' return self[slice(start, stop, step)] def first(self): '''Get first element ''' return self._items[0] def second(self): '''Get second element ''' return self._items[1] def last(self): '''Get last element ''' return self._items[-1] def first_opt(self): '''Get first element as Some(element), or Nothing if not exists ''' return self.get_opt(0) def second_opt(self): '''Get second element as Some(element), or Nothing if not exists ''' return self.get_opt(1) def last_opt(self): '''Get last element as Some(element), or Nothing if not exists ''' return self.get_opt(-1) def find(self, pred): '''Get first element satifying predicate ''' for item in self._items: if pred(item): return item def find_opt(self, pred): '''Optionally get first element satifying predicate. Return Some(element) if exist Otherwise return Nothing ''' for item in self._items: if pred(item): return Some(item) else: return Nothing def take(self, n): '''Create a new Array of only first n element ''' return Array(self._items[:n]) def drop(self, n): '''Create a new Array of first n element dropped ''' return Array(self._items[n:]) def tail(self): '''Create a new Array first element dropped ''' return Array(self._items[1:]) def butlast(self): '''Create a new Array that last element dropped ''' return self[:-1] def take_right(self, n): '''Create a new Array with last n elements ''' return Array(self._items[-n:]) takeright = take_right def drop_right(self, n): '''Create a new Array that last n elements dropped ''' return Array(self._items[:-n]) dropright = drop_right def take_while(self, pred): '''Create a new Array with successive elements as long as predicate evaluates to true. ''' return Array(itt.takewhile(pred, self._items)) takewhile = take_while def drop_while(self, pred): '''Create a new Array without elements as long as predicate evaluates to true. ''' return Array(itt.dropwhile(pred, self._items)) dropwhile = drop_while def split_before(self, pred): '''Create a new Array of Arrays by splitting before each element passing predicate. ''' def _split_before(pred, items): segment = [] for item in items: if pred(item) and len(segment) > 0: yield segment segment = [] segment.append(item) yield segment return Array(_split_before(pred, self._items)) def split_after(self, pred): '''Create a new Array of Arrays by splitting after each element passing predicate. ''' def _split_after(pred, items): segment = [] for item in items: if pred(item): yield segment segment = [] segment.append(item) if len(segment) > 0: yield segment return Array(_split_after(pred, self._items)) def pluck(self, key): '''Create a new Array of values by evaluating ``elem[key]`` for each element.''' return self.map(lambda d: d[key]) def pluck_opt(self, key): '''Create a new Array of Optional values by evaluating ``elem[key]`` for each element. Get ``Some(value)`` if the key exists for that element, otherwise get Nothing singleton. ''' return self.map(lambda d: Some(d[key]) if key in d else Nothing) def pluck_attr(self, attr): '''Create a new Array of Optional values by evaluating ``elem.attr`` of each element. Get ``Some(value)`` if attr exists for that element, otherwise get Nothing singleton. ''' return self.map(lambda obj: getattr(obj, attr)) def without(self, *items): '''Create a new Array without specified elements.''' items = set(items) return self.filter(lambda item: item not in items) def filter(self, pred): '''Create a new Array contains only elements passing predicate''' return Array(builtins.filter(pred, self._items)) def filter_false(self, pred): '''Create a new Array contains only elements not passing predicate''' return Array(itt.filterfalse(pred, self._items)) filterfalse = filter_false def where(self, **conds): '''Create a new Array contains only mapping pass all conditions. ''' return self.filter(lambda d: all(key in d and d[key] == value for key, value in conds.items())) def interpose(self, sep): '''Create a new Array by interposing separater between elemens. ''' # TODO for item in self._items: yield item yield def zip(self, *iterable): '''Create a new Array by zipping elements with other iterables. ''' return Array(zip(self._items, *iterable)) def zip_longest(self, *iterables, fillvalue=None): '''Create a new Array by zipping elements with other iterables as long as possible. ''' return Array(itt.zip_longest( self._items, *iterables, fillvalue=fillvalue)) def zip_longest_opt(self, *iterables): iterables = [map(Some, it) for it in iterables] return Array(itt.zip_longest(map(Some, self._items), *iterables, fillvalue=Nothing)) def zip_next(self, fillvalue=None): '''Create a new Array by zipping elements with next one. ''' items_itr = iter(self._items) next(items_itr) nexts = itt.chain(items_itr, [fillvalue]) return Array(itt.starmap(CurrNext, zip(self._items, nexts))) def zip_prev(self, fillvalue=None): '''Create a new Array by zipping elements with previous one. ''' prevs = itt.chain([fillvalue], self._items) return Array(itt.starmap(CurrPrev, zip(self._items, prevs))) def zip_index(self, start=0): '''Create a new Array by zipping elements with index. ''' return Array(itt.starmap(ValueIndex, zip(self._items, itt.count(start)))) def reverse(self): '''In place reverse this Array. ''' self._items.reverse() return self def reversed(self): '''Create a new reversed Array. ''' return Array(reversed(self._items)) def sort(self, key=None, reverse=False): '''In place sort this Array. ''' self._items.sort(key=key, reverse=reverse) return self def sorted(self, key=None, reverse=False): '''Create a new sorted Array. ''' return Array(sorted(self._items, key=key, reverse=reverse)) def sum(self): '''Get sum of elements''' return sum(self) def reduce(self): # TODO pass def group_by(self, key=None): '''Create a new Array using the builtin itertools.groupby, which sequentially groups elements as long as the key function evaluates to the same value. Comparing to ``group_by_as_map``, there're some pros and cons. Cons: - Elements should be sorted by the key function first, or elements with the same key may be broken into different groups. Pros: - Key function doesn't have to be evaluated to a hashable value. It can be any type which supports ``__eq__``. ''' def group_by_tr(self_): for k, vs in itt.groupby(self_, key=key): yield KeyValues(key=k, values=Array(vs)) return Array(group_by_tr(self)) def group_by_as_map(self, key=None): '''Group values in to a Map by the value of key function evaluation result. Comparing to ``group_by``, there're some pros and cons. Pros: * Elements don't need to be sorted by the key function first. You can call ``map_group_by`` anytime and correct grouping result. Cons: * Key function has to be evaluated to a hashable value. ''' from .map import Map key_to_grp = defaultdict(Array) for elem in self: key_to_grp[key(elem)].append(elem) return Map(key_to_grp) def accumulate(self, func=None): '''Create a new Array of calling ``itertools.accumulate``''' return Array(itt.accumulate(self._items, func)) def mean(self): '''Get the average of elements.''' return sum(self) / len(self) def sliding_window(self, n): '''Create a new Array instance that all elements are sliding windows of source elements.''' def sliding_window_tr(self_): self_itr = iter(self) dq = deque(itt.islice(self_itr, n - 1), maxlen=n) for item in self_itr: dq.append(item) yield tuple(dq) return Array(sliding_window_tr(self)) def value_counts(self): '''Get a Counter instance of elements counts''' return Counter(self) def extend(self, iterable): '''Extend the Array from iterable''' self._items.extend(iterable) return self def extended(self, iterable): '''Create a new Array that extends source Array with another iterable''' alist = self.copy() alist.extend(iterable) return alist def append(self, item): '''Append element to the Array''' self._items.append(item) return self def appended(self, item): '''Create a new Array that extends source Array with another element. ''' alist = self.copy() alist.append(item) return alist def distincted(self): '''Create a new Array with non-repeating elements. And elements are with the same order of first occurence in the source Array. ''' def _distincted(items): item_set = set() for item in items: if item not in item_set: item_set.add(item) yield item return Array(_distincted(self._items)) def product(self, *iterables, repeat=1): return Array(itt.product(self._items, *iterables, repeat=repeat)) def permutations(self, r=None): return Array(itt.permutations(self._items, r=r)) def combinations(self, r): return Array(itt.combinations(self._items, r=r)) def combinations_with_replacement(self, r): return Array(itt.combinations_with_replacement(self._items, r=r)) def copy(self): return Array(copy(self._items)) def make_string(self, elem_format='{elem!r}', start='[', elem_sep=', ', end=']'): '''Make string from elements >>> Array.range(5, 8).make_string() '[5, 6, 7]' >>> print(Array.range(5, 8).make_string(elem_sep='\\n', start='', end='', elem_format='{index}: {elem}')) 0: 5 1: 6 2: 7 ''' elems_str = elem_sep.join(elem_format.format(index=idx, elem=elem) for idx, elem in enumerate(self)) return start + elems_str + end PK!$$carriage/lambda_.py import functools as fnt import operator as op from .pipeline import Pipeline, Transformer from .repr import repr_args def lambda_then(f): @fnt.wraps(f) def wraped(self, *args, **kwargs): trfmr = f(self, *args, **kwargs) return self._then(trfmr) def other_lambda(f): @fnt.wraps(f) def wraped_(self, *args, **kwargs): other = args[0] if isinstance(other, Lambda): trfmr = f(self, other) return X._then(trfmr) else: return wraped(self, *args, **kwargs) return wraped_ wraped.other_lambda = other_lambda return wraped class Xcall: '''Elegant partial function builder >>> def func(a, b, c=0, d=0): ... return a, b, c, d >>> partialfunc = Xcall(func)(X, b=3) >>> partialfunc(2) (2, 3, 0, 0) >>> partialfunc(5) (5, 3, 0, 0) >>> partialfunc = Xcall(func)(1, 2, c=X.x, d=X.y) >>> from carriage import Row >>> partialfunc(Row(x=3, y=4)) (1, 2, 3, 4) ''' def __init__(self, f): self.f = f def __call__(self, *args, **kwargs): args_str = repr_args(*args, **kwargs) def func(elem): actual_args = tuple(arg(elem) if isinstance(arg, Lambda) else arg for arg in args) actual_kwargs = {k: arg(elem) if isinstance(arg, Lambda) else arg for k, arg in kwargs.items()} return self.f(*actual_args, **actual_kwargs) return X._then(Transformer(f'{self.f.__name__}({args_str})', func)) class Lambda: __slots__ = '_pipeline', def __init__(self, *, pipeline=None): if pipeline is None: pipeline = Pipeline() self._pipeline = pipeline def _then(self, trfmr): return type(self)(pipeline=self._pipeline.then(trfmr)) @lambda_then def call(self, *args, **kwargs): args_str = repr_args(*args, **kwargs) return Transformer(f'X({args_str})', lambda func: func(*args, **kwargs)) def __call__(self, elem): return self._pipeline.transform(elem) def __repr__(self): return f'<{type(self).__name__} {self._pipeline!r}>' def __str__(self): return str(self._pipeline) @lambda_then def __getattr__(self, name): if name.startswith('__') and name.endswith('__'): raise AttributeError return Transformer(f'X.{name}', op.attrgetter(name)) @lambda_then def __getitem__(self, key): return Transformer(f'X[{key!r}]', op.itemgetter(key)) def __bool__(self): return False @property @lambda_then def not_(self): return Transformer(f'not X', op.not_) @lambda_then def in_(self, other): return Transformer(f'X in {other!r}', lambda elem: elem in other) @in_.other_lambda def in_(self, other): return Transformer(f'X in {other!r}', lambda elem: self(elem) in other(elem)) @lambda_then def has(self, other): return Transformer(f'{other!r} in X', lambda elem: other in elem) @has.other_lambda def has(self, other): return Transformer(f'{other!r} in X', lambda elem: other(elem) in self(elem)) @lambda_then def __pos__(self): return Transformer(f'+X', op.pos) @lambda_then def __neg__(self): return Transformer(f'-X', op.neg) @lambda_then def __abs__(self): return Transformer(f'abs(X)', op.abs) @lambda_then def __add__(self, other): return Transformer(f'X + {other!r}', lambda elem: elem + other) @__add__.other_lambda def __add__(self, other): return Transformer(f'X + {other!r}', lambda elem: self(elem) + other(elem)) @lambda_then def __sub__(self, other): return Transformer(f'X - {other!r}', lambda elem: elem - other) @__sub__.other_lambda def __sub__(self, other): return Transformer(f'X - {other!r}', lambda elem: self(elem) - other(elem)) @lambda_then def __mul__(self, other): return Transformer(f'X * {other!r}', lambda elem: elem * other) @__mul__.other_lambda def __mul__(self, other): return Transformer(f'X * {other!r}', lambda elem: self(elem) * other(elem)) @lambda_then def __truediv__(self, other): return Transformer(f'X / {other!r}', lambda elem: elem / other) @__truediv__.other_lambda def __truediv__(self, other): return Transformer(f'X / {other!r}', lambda elem: self(elem) / other(elem)) @lambda_then def __floordiv__(self, other): return Transformer(f'X // {other!r}', lambda elem: elem // other) @__floordiv__.other_lambda def __floordiv__(self, other): return Transformer(f'X // {other!r}', lambda elem: self(elem) // other(elem)) @lambda_then def __mod__(self, other): return Transformer(f'X % {other!r}', lambda elem: elem % other) @__mod__.other_lambda def __mod__(self, other): return Transformer(f'X % {other!r}', lambda elem: self(elem) % other(elem)) @lambda_then def __divmod__(self, other): return Transformer(f'divmod(X, {other!r})', lambda elem: divmod(elem, other)) @__divmod__.other_lambda def __divmod__(self, other): return Transformer(f'divmod(X % {other!r})', lambda elem: divmod(self(elem), other(elem))) @lambda_then def __pow__(self, other): return Transformer(f'pow(X % {other!r})', lambda elem: pow(elem, other)) @__pow__.other_lambda def __pow__(self, other): return Transformer(f'pow(X % {other!r})', lambda elem: pow(self(elem), other(elem))) @lambda_then def __radd__(self, other): return Transformer(f'{other!r} + X', lambda elem: other + elem) @__radd__.other_lambda def __radd__(self, other): return Transformer(f'{other!r} + X', lambda elem: other(elem) + self(elem)) @lambda_then def __rsub__(self, other): return Transformer(f'{other!r} - X', lambda elem: other - elem) @__rsub__.other_lambda def __rsub__(self, other): return Transformer(f'{other!r} - X', lambda elem: other(elem) - self(elem)) @lambda_then def __rmul__(self, other): return Transformer(f'{other!r} * X', lambda elem: other * elem) @__rmul__.other_lambda def __rmul__(self, other): return Transformer(f'{other!r} * X', lambda elem: other(elem) * self(elem)) @lambda_then def __rtruediv__(self, other): return Transformer(f'{other!r} / X', lambda elem: other / elem) @__rtruediv__.other_lambda def __rtruediv__(self, other): return Transformer(f'{other!r} / X', lambda elem: other(elem) / self(elem)) @lambda_then def __rfloordiv__(self, other): return Transformer(f'{other!r} // X', lambda elem: other // elem) @__rfloordiv__.other_lambda def __rfloordiv__(self, other): return Transformer(f'{other!r} // X', lambda elem: other(elem) // self(elem)) @lambda_then def __rmod__(self, other): return Transformer(f'{other!r} % X', lambda elem: other % elem) @__rmod__.other_lambda def __rmod__(self, other): return Transformer(f'{other!r} % X', lambda elem: other(elem) % self(elem)) @lambda_then def __rdivmod__(self, other): return Transformer(f'divmod({other!r}, X)', lambda elem: divmod(other, elem)) @__rdivmod__.other_lambda def __rdivmod__(self, other): return Transformer(f'divmod({other!r}, X)', lambda elem: divmod(other(elem), self(elem))) @lambda_then def __rpow__(self, other): return Transformer(f'pow({other!r}, X)', lambda elem: pow(other, elem)) @__rpow__.other_lambda def __rpow__(self, other): return Transformer(f'pow({other!r}, X)', lambda elem: pow(other(elem), self(elem))) @lambda_then def __eq__(self, other): return Transformer(f' == {other!r}', lambda elem: elem == other) @__eq__.other_lambda def __eq__(self, other): return Transformer(f' == {other!r}', lambda elem: self(elem) == other(elem)) @lambda_then def __ne__(self, other): return Transformer(f' != {other!r}', lambda elem: elem != other) @__ne__.other_lambda def __ne__(self, other): return Transformer(f' != {other!r}', lambda elem: self(elem) != other(elem)) @lambda_then def __gt__(self, other): return Transformer(f' > {other!r}', lambda elem: elem > other) @__gt__.other_lambda def __gt__(self, other): return Transformer(f' > {other!r}', lambda elem: self(elem) > other(elem)) @lambda_then def __ge__(self, other): return Transformer(f' >= {other!r}', lambda elem: elem >= other) @__ge__.other_lambda def __ge__(self, other): return Transformer(f' >= {other!r}', lambda elem: self(elem) >= other(elem)) @lambda_then def __lt__(self, other): return Transformer(f' < {other!r}', lambda elem: elem < other) @__lt__.other_lambda def __lt__(self, other): return Transformer(f' < {other!r}', lambda elem: self(elem) < other(elem)) @lambda_then def __le__(self, other): return Transformer(f' <= {other!r}', lambda elem: elem <= other) @__le__.other_lambda def __le__(self, other): return Transformer(f' <= {other!r}', lambda elem: self(elem) <= other(elem)) X = Lambda() PK!6+KKcarriage/map.pyimport heapq import itertools as itt import operator as op from collections import OrderedDict, UserDict, defaultdict from .array import Array from .optional import Nothing, Some from .repr import short_repr from .row import KeyValue, Row from .stream import Stream def identity(_): return _ class Map(OrderedDict): '''A mutable dictionary enhanced with a bulk of useful methods. ''' def items(self): return Stream(super().items()).starmap(KeyValue) def values(self): return Stream(super().values()) def keys(self): return Stream(super().keys()) def update(self, *args, **kwds): '''Update Map from dict/iterable and ``return self`` >>> m = Map(a=3, b=4) >>> m2 = m.update(a=5, c=3).update({'d': 2}) >>> m is m2 True >>> m Map({'a': 5, 'b': 4, 'c': 3, 'd': 2}) ''' super().update(*args, **kwds) return self def updated(self, *args, **kwds): '''Create a new Map instance that is updated from dict/iterable. This method is the same as ``m.copy().update(...)`` >>> m = Map(a=3, b=4) >>> m2 = m.updated(a=5, c=3).update({'d': 2}) >>> m2 Map({'a': 5, 'b': 4, 'c': 3, 'd': 2}) >>> m Map({'a': 3, 'b': 4}) ''' m = self.copy() return m.update(*args, **kwds) def join(self, *others, fillvalue=None, agg=None): """Create a new Map instance with keys merged and values joined. >>> m1 = Map(a=1, b=2) >>> m2 = m1.join(dict(a=3, b=4, c=5)) >>> m2 is m1 False >>> m2 Map({'a': Row(f0=1, f1=3), 'b': Row(f0=2, f1=4), 'c': Row(f0=None, f1=5)}) >>> m1 = Map(a=1, b=2) >>> m2 = m1.join(dict(a=3, b=4, c=5), agg=sum, fillvalue=0) >>> m2 Map({'a': 4, 'b': 6, 'c': 5}) """ return Map(self.iter_joined(*others, fillvalue=fillvalue, agg=agg)) def iter_joined(self, *others, fillvalue=None, agg=None): """Create a ``Row(key, Row(v0, v1, ...))`` iterator with keys from all Maps and value joined. >>> m = Map(a=1, b=2) >>> l = list(m.iter_joined( ... Map(a=3, b=4, c=5), ... Map(a=6, c=7), ... fillvalue=0)) >>> l[0] Row(key='a', values=Row(f0=1, f1=3, f2=6)) >>> l[1] Row(key='b', values=Row(f0=2, f1=4, f2=0)) >>> l[2] Row(key='c', values=Row(f0=0, f1=5, f2=7)) """ if agg is None: agg = identity keys = list(self.keys()) keys_set = set(keys) for other in others: for key in other.keys(): if key not in keys_set: keys_set.add(key) keys.append(key) dicts = (self,) + others for key in keys: yield Row(key=key, values=agg(Row.from_values( d.get(key, fillvalue) for d in dicts))) def __repr__(self): return f'Map({self.make_string()})' def map(self, func): '''Create a new Map instance that each key, value pair is derived by applying function to original key, value. >>> Map(a=3, b=4).map(lambda k, v: (v, k)) Map({3: 'a', 4: 'b'}) Parameters ---------- func : ``pred(key, value) -> (key, value)`` function for computing new key/value pair ''' return Map(func(key, value) for key, value in self.items()) def map_keys(self, func): '''Create a new Map instance that all values remains the same, while each corresponding key is updated by applying function to original key, value. >>> Map(a=3, b=4).map_keys(lambda k, v: k + '_1') Map({'a_1': 3, 'b_1': 4}) Parameters ---------- func : ``pred(key, value) -> key`` function for computing new keys ''' return Map((func(key, value), value) for key, value in self.items()) def map_values(self, func): '''Create a new Map instance that all keys remains the same, while each corresponding value is updated by applying function to original key, value. >>> Map(a=3, b=4).map_values(lambda k, v: v * 2) Map({'a': 6, 'b': 8}) Parameters ---------- func : ``pred(key, value) -> value`` function for computing new values ''' return Map((key, func(key, value)) for key, value in self.items()) def revamp_values(self, func): '''Update values of current Map and return self. Each value is derived by computing the function using both key and value. >>> m = Map(a=3, b=4) >>> m.revamp_values(lambda k, v: v * 2) Map({'a': 6, 'b': 8}) >>> m Map({'a': 6, 'b': 8}) Parameters ---------- func : ``pred(key, value) -> value`` function for computing new values Returns ------- self ''' for key, value in self.items(): self[key] = func(key, value) return self def keep(self, *keys): '''Delete keys not specified and return self >>> m = Map(a=3, b=4, c=5) >>> m.keep('a', 'c') Map({'a': 3, 'c': 5}) >>> m Map({'a': 3, 'c': 5}) Returns ------- self ''' keys = set(keys) current_keys = set(self.keys()) keys_to_delete = current_keys - keys for key, in keys_to_delete: del self[key] return self def project(self, *keys): '''Create a new Map instance contains only specified keys. >>> m = Map(a=3, b=4, c=5) >>> m.project('a', 'c') Map({'a': 3, 'c': 5}) >>> m Map({'a': 3, 'b': 4, 'c': 5}) Returns ------- Map[key, value] ''' return Map((k, self[k]) for k in keys) def get_opt(self, key): '''Get the value of specified key as Optional type. Return Some(value) if key exists, otherwise return Nothing. >>> m = Map(a=3, b=4) >>> m.get_opt('a') Some(3) >>> m.get_opt('c') Nothing >>> m.get_opt('a').map(lambda v: v * 2) Some(6) >>> m.get_opt('c').map(lambda v: v * 2) Nothing Returns ------- Optional[value] ''' if key in self: return Some(self[key]) return Nothing def remove(self, *keys): '''Delete keys and return self >>> m = Map(a=3, b=4, c=5) >>> m.remove('a', 'c') Map({'b': 4}) >>> m Map({'b': 4}) Returns ------- self ''' for key in keys: del self[key] return self def without(self, *keys): '''Create a new Map instance with those keys >>> m = Map(a=3, b=4, c=6) >>> m.without('a', 'c') Map({'b': 4}) >>> m Map({'a': 3, 'b': 4, 'c': 6}) Returns ------- Map[key, value] ''' return Map((key, value) for key, value in self.items() if key not in keys) def retain(self, pred): '''Delete key/value pairs not satisfying the predicate and return self >>> m = Map(a=3, b=4, c=5) >>> m.retain(lambda k, v: k == 'b' or v == 5) Map({'b': 4, 'c': 5}) >>> m Map({'b': 4, 'c': 5}) Parameters ---------- pred : ``(k, v) -> bool`` Returns ------- self ''' keys_to_delete = [] for key, value in self.items(): if not pred(key, value): keys_to_delete.append(key) return self.remove(*keys_to_delete) def retain_false(self, pred): '''Delete key/value pairs satisfying the predicate and return self >>> m = Map(a=3, b=4, c=5) >>> m.retain_false(lambda k, v: k == 'b' or v == 5) Map({'a': 3}) >>> m Map({'a': 3}) Parameters ---------- pred : ``(k, v) -> bool`` Returns ------- self ''' keys_to_delete = [] for key, value in self.items(): if pred(key, value): keys_to_delete.append(key) return self.remove(*keys_to_delete) def retain_by_key(self, pred): '''Delete key/value pairs not satisfying the predicate and return self >>> m = Map(a=3, b=4, c=5) >>> m.retain_by_key(lambda k: k == 'b') Map({'b': 4}) >>> m Map({'b': 4}) Parameters ---------- pred : ``(k) -> bool`` Returns ------- self ''' keys_to_delete = [] for key, value in self.items(): if not pred(key): keys_to_delete.append(key) return self.remove(*keys_to_delete) def retain_by_value(self, pred): '''Delete key/value pairs not satisfying the predicate and return self >>> m = Map(a=3, b=4, c=5) >>> m.retain_by_value(lambda v: v == 4) Map({'b': 4}) >>> m Map({'b': 4}) Parameters ---------- pred : ``(k) -> bool`` Returns ------- self ''' keys_to_delete = [] for key, value in self.items(): if not pred(value): keys_to_delete.append(key) return self.remove(*keys_to_delete) def filter(self, pred): '''Create a new Map with key/value pairs satisfying the predicate >>> m = Map({1: 2, 2: 4, 3: 6}) >>> m2 = m.filter(lambda k, v: (v-k) % 3 == 0) >>> m2 Map({3: 6}) Parameters ---------- pred : ``(k, v) -> bool`` predicate Returns ------- Map[key, value] ''' return Map((k, v) for k, v in self.items() if pred(k, v)) def filter_false(self, pred): '''Create a new Map with key/value pairs not satisfying the predicate >>> m = Map({1: 2, 2: 4, 3: 6}) >>> m2 = m.filter_false(lambda k, v: (v-k) % 3 == 0) >>> m2 Map({1: 2, 2: 4}) Parameters ---------- pred : ``(k, v) -> bool`` predicate Returns ------- Map[key, value] ''' return Map((k, v) for k, v in self.items() if not pred(k, v)) def filter_by_key(self, pred): '''Create a new Map with keys satisfying the predicate >>> m = Map({1: 2, 2: 4, 3: 6}) >>> m2 = m.filter_by_key(lambda k: k % 3 == 0) >>> m2 Map({3: 6}) Parameters ---------- pred : ``(k, v) -> bool`` predicate Returns ------- Map[key, value] ''' return Map((k, v) for k, v in self.items() if pred(k)) def filter_by_value(self, pred): '''Create a new Map with values satisfying the predicate >>> m = Map({1: 2, 2: 4, 3: 6}) >>> m2 = m.filter_by_value(lambda v: v % 3 == 0) >>> m2 Map({3: 6}) Parameters ---------- pred : ``(k, v) -> bool`` predicate Returns ------- Map[key, value] ''' return Map((k, v) for k, v in self.items() if pred(v)) def group_by(self, key_func): '''Group key/value pairs into nested Maps. >>> Map(a=3, b=4, c=5).group_by(lambda k, v: v % 2) Map({1: Map({'a': 3, 'c': 5}), 0: Map({'b': 4})}) Parameters ---------- key_func : ``(key, value) -> group_key`` predicate Returns ------- Map[key_func(key), Map[key, value]] ''' grouped_d = defaultdict(Map) for key, value in self.items(): grouped_d[key_func(key, value)][key] = value return Map(grouped_d) def reduce(self, key): pass def make_string(self, key_value_format='{key!r}: {value!r}', start='{', item_sep=', ', end='}'): '''Construct a string from key/values. >>> m = Map(a=3, b=4, c=5) >>> m.make_string() "{'a': 3, 'b': 4, 'c': 5}" >>> m.make_string(start='(', key_value_format='{key}={value!r}', ... item_sep=', ', end=')') '(a=3, b=4, c=5)' Parameters ---------- key_value_format : str string template using builtin ``str.format()`` for formatting key/value pairs. Default to ``'{key!r}: {value!r}'``. Available named placeholders: ``{key}``, ``{value}`` start : str Default to ``'{'``. item_sep : str Default to ``', '`` end : str Default to ``}`` Returns ------- str ''' items_str = item_sep.join( key_value_format.format(key=key, value=value) for key, value in self.items()) return start + items_str + end def take(self, n): '''create a Stream instance of first ``n`` ``Row(key, value)`` elements. >>> m = Map(a=4, b=5, c=6, d=7) >>> m.take(2).to_list() [Row(key='a', value=4), Row(key='b', value=5)] Returns ------- Stream[Row[key, value]] ''' return self.to_stream().take(n) def first(self): '''Get the first item in ``Row(key, value)`` type >>> m = Map(a=4, b=5, c=6, d=7) >>> m.first() Row(key='a', value=4) >>> m.first().key 'a' >>> m.first().value 4 >>> m = Map() >>> m.first() Traceback (most recent call last): ... IndexError: index out of range. Returns ------- Row[key, value] ''' return self.nth(0) def first_opt(self): '''Optionally get the first item. Return Some(Row(key, value)) if first item exists, otherwise return Nothing >>> m = Map(a=4, b=5, c=6, d=7) >>> m.first_opt().map(lambda kv: kv.transform(value=lambda v: v * 2)) Some(Row(key='a', value=8)) >>> m.first_opt().map(lambda kv: kv.value) Some(4) >>> m = Map() >>> m.first_opt() Nothing Returns ------- Optional[Row[key, value]] ''' return self.nth_opt(0) def nth(self, index): '''Get the nth item in ``Row(key, value)`` type. >>> m = Map(a=4, b=5, c=6, d=7) >>> m.nth(2) Row(key='c', value=6) >>> m = Map(a=4, b=5) >>> m.nth(2) Traceback (most recent call last): ... IndexError: index out of range. Returns ------- Row[key, value] ''' try: key, value = next(itt.islice(self.items(), index, None)) return KeyValue(key, value) except StopIteration: raise IndexError('index out of range.') def nth_opt(self, index): '''Optionally get the nth item. Return ``Some(Row(key, value))`` if first item exists, otherwise return Nothing. >>> m = Map(a=4, b=5, c=6, d=7) >>> m.first_opt().map(lambda kv: kv.transform(value=lambda v: v * 2)) Some(Row(key='a', value=8)) >>> m = Map() >>> m.first_opt() Nothing Returns ------- Optional[Row[key, value]] ''' try: return Some(self.nth(index)) except IndexError: return Nothing def len(self): '''Get the length of this Map >>> m = Map(a=4, b=5, c=6, d=7) >>> m.len() 4 Returns ------- int ''' return len(self) def to_stream(self, key_field='key', value_field='value'): '''Convert to a Stream instance of ``Row(key, value)`` iterable. >>> m = Map(a=4, b=5, c=6, d=7) >>> m.to_stream().take(2).to_list() [Row(key='a', value=4), Row(key='b', value=5)] Returns ------- Stream[Row[key, value]] ''' return (Stream(super().items()) .starmap(lambda key, value: Row(**{key_field: key, value_field: value}))) def to_array(self): '''Convert to an Array instance of ``Row(key, value)`` iterable. >>> m = Map(a=4, b=5, c=6, d=7) >>> m.to_array().take(2) Array([Row(key='a', value=4), Row(key='b', value=5)]) Returns ------- Array[Row[key, value]] ''' return self.to_stream().to_array() def to_list(self): '''Convert to an list instance of ``Row(key, value)`` iterable. >>> m = Map(a=4, b=5) >>> m.to_list() [Row(key='a', value=4), Row(key='b', value=5)] Returns ------- Array[Row[key, value]] ''' return self.to_stream().to_list() def to_dict(self): '''Convert to dict''' return dict(self) def flip(self): '''Create a new Map which key/value pairs are fliped >>> m = Map(a=4, b=5, c=6) >>> m.flip() Map({4: 'a', 5: 'b', 6: 'c'}) ''' return Map((value, key) for key, value in self.items()) def for_each(self, func): '''Call func for each key/value pair >>> m = Map(a=[], b=[], c=[]) >>> m.for_each(lambda k, v: v.append(k)) >>> m Map({'a': ['a'], 'b': ['b'], 'c': ['c']}) ''' for k, v in self.items(): func(k, v) def for_each_key(self, func): '''Call func for each key >>> m = Map(a=[], b=[], c=[]) >>> keys = [] >>> m.for_each_key(lambda k: keys.append(k)) >>> keys ['a', 'b', 'c'] ''' for k in self.keys(): func(k) def for_each_value(self, func): '''Call func for each value >>> m = Map(a=[], b=[], c=[]) >>> m.for_each_value(lambda v: v.append(3)) >>> m Map({'a': [3], 'b': [3], 'c': [3]}) ''' for v in self.values(): func(v) def nlargest_value_items(self, n=None): '''Get top n largest values >>> m = Map(a=6, b=2, c=10, d=9) >>> m.nlargest_value_items(n=2) Array([Row(key='c', value=10), Row(key='d', value=9)]) Returns ------- Array[Row[key, value]] ''' if n is None: vs = sorted(self.items(), key=op.itemgetter(1), reverse=True) vs = heapq.nlargest(n, self.items(), key=op.itemgetter(1)) return Array(vs) def nsmallest_value_items(self, n=None): '''Get top n smallest values >>> m = Map(a=6, b=2, c=10, d=9) >>> m.nsmallest_value_items(n=2) Array([Row(key='b', value=2), Row(key='a', value=6)]) Returns ------- Array[Row[key, value]] ''' if n is None: vs = sorted(self.items(), key=op.itemgetter(1), reverse=False) vs = heapq.nsmallest(n, self.items(), key=op.itemgetter(1)) return Array(vs) PK!̍M carriage/monad.pyfrom abc import ABC, abstractclassmethod, abstractmethod, abstractproperty class Monad(ABC): @abstractclassmethod def unit(cls, value): raise NotImplementedError() @abstractmethod def flat_map(self, to_monad_action): raise NotImplementedError() def bind(self, to_monad_action): return self.flat_map(to_monad_action) @abstractmethod def map(self, action): raise NotImplementedError() def fmap(self, action): return self.map(action) @abstractmethod def flatten(self): raise NotImplementedError() def join(self): return self.flatten() @abstractmethod def then(self, monad_value): raise NotImplementedError() @abstractmethod def ap(self, monad_value): raise NotImplementedError() @abstractmethod def pluck(self, key): raise NotImplementedError() @abstractmethod def pluck_opt(self, key): raise NotImplementedError() @abstractmethod def pluck_attr(self, attr): raise NotImplementedError() # @abstractmethod # def __len__(self): # raise NotImplementedError() @abstractmethod def __iter__(self): raise NotImplementedError() @abstractmethod def __repr__(self): raise NotImplementedError() @abstractproperty def _base_type(self): raise NotImplementedError() @abstractproperty def _value_for_cmp(self): raise NotImplementedError() def __eq__(self, other): if isinstance(other, self._base_type): return self._value_for_cmp == other._value_for_cmp raise TypeError( "'==' not supported between instances of " f"'{self._base_type.__name__}' and {type(other).__name__!r}") def __ne__(self, other): if isinstance(other, self._base_type): return self._value_for_cmp != other._value_for_cmp raise TypeError( "'!=' not supported between instances of " f"'{self._base_type.__name__}' and {type(other).__name__!r}") def __gt__(self, other): if isinstance(other, self._base_type): return self._value_for_cmp > other._value_for_cmp raise TypeError( "'>' not supported between instances of " f"'{self._base_type.__name__}' and {type(other).__name__!r}") def __lt__(self, other): if isinstance(other, self._base_type): return self._value_for_cmp < other._value_for_cmp raise TypeError( "'<' not supported between instances of " f"'{self._base_type.__name__}' and {type(other).__name__!r}") def __ge__(self, other): if isinstance(other, self._base_type): return self._value_for_cmp >= other._value_for_cmp raise TypeError( "'>=' not supported between instances of " f"'{self._base_type.__name__}' and {type(other).__name__!r}") def __le__(self, other): if isinstance(other, self._base_type): return self._value_for_cmp <= other._value_for_cmp raise TypeError( "'<=' not supported between instances of " f"'{self._base_type.__name__}' and {type(other).__name__!r}") PK!F8jDjDcarriage/optional.pyimport inspect from abc import abstractmethod, abstractproperty from functools import wraps from .monad import Monad class NothingError(AttributeError): pass class Optional(Monad): '''An type for handling special value or exception. Create Optional from a function call that may raise exception. >>> def divide(a, b): ... return a / b >>> Optional.call_exceptable(divide, 2, 4) Some(0.5) >>> Optional.call_exceptable(divide, 2, 0) Nothing Create Optional from a value that may be None. >>> adict = {'a': 1, 'b': 2, 'c': 3} >>> Optional.value_noneable(adict.get('c')) Some(3) >>> Optional.value_noneable(adict.get('d')) Nothing Escape from if hell of handling None value or exception. >>> class TreeNode: ... def __init__(self, value, left=None, right=None): ... self.value = value ... self.left = left ... self.right = right >>> n = TreeNode(30, TreeNode(5, TreeNode(2))) It seem convenient to traversal in tree. >>> n.left.left.value 2 But both ``left`` and ``right`` might be None. You should take care of that. >>> result = 0 >>> if n.left is not None: ... if n.left.left is not None: ... if n.left.left.right is not None: ... if n.left.left.right.left is not None: ... result = n.left.left.right.left.value * 2 >>> result 0 Now let's replace Noneable value with Optional. >>> class TreeNodeOpt: ... def __init__(self, value, left=None, right=None): ... self.value = value ... self.left = Some(left) if left is not None else Nothing ... self.right = Some(left) if left is not None else Nothing >>> nopt = TreeNodeOpt(30, TreeNodeOpt(5, TreeNodeOpt(2))) >>> (nopt.left ... .bind(lambda n: n.left) ... .map(lambda n: n.value) ... .get_or()) 2 It seems getting harder to get a value, but actually you are now free from having the risk of getting attribute from None. And you don't have to build a big, nested if statement for handling it. >>> (nopt.left ... .bind(lambda n: n.left) ... .bind(lambda n: n.right) ... .bind(lambda n: n.left) ... .map(lambda n: n.value * 2) ... .get_or(0)) 0 >>> (nopt.left ... .pluck_attr('left').join() ... .pluck_attr('right').join() ... .pluck_attr('left').join() ... .pluck_attr('value') ... .map(lambda v: v * 2) ... .get_or(0)) 0 And you can use Optional to handle the original TreeNode. >>> n = TreeNode(30, TreeNode(5, TreeNode(2))) >>> (Optional.value_noneable(n.left) ... .map(lambda n: n.left).join_noneable() ... .map(lambda n: n.right).join_noneable() ... .map(lambda n: n.left).join_noneable() ... .map(lambda n: n.value * 2) ... .get_or(0)) 0 It's more elegant to have functions return Optional rather than None or other special value. >>> def get_right_opt(tn): ... if tn.right is None: ... return Nothing ... return Some(tn.right) >>> def get_left_opt(tn): ... if tn.left is None: ... return Nothing ... return Some(tn.left) >>> (Optional.value_noneable(n) ... .bind(get_left_opt) ... .bind(get_left_opt) ... .bind(get_right_opt) ... .bind(get_left_opt) ... .map(lambda n: n.value * 2) ... .get_or(0)) 0 You can create an Optional function with these decorator. >>> @Optional.noneable ... def get_first(l): ... if len(l) > 0: ... return l[0] ... return None >>> get_first([3,4,5]) Some(3) >>> get_first([]) Nothing >>> @Optional.exceptable(IndexError) ... def get_first(l): ... return l[0] >>> get_first([3,4,5]) Some(3) >>> get_first([]) Nothing ''' @property def _base_type(self): return Optional @classmethod def call_exceptable(cls, getter, *args, errors=Exception, **kwargs): try: value = getter(*args, **kwargs) except errors: return Nothing else: return Some(value) ecall = call_exceptable expcall = call_exceptable @classmethod def value_noneable(cls, value): if value is None: return Nothing return Some(value) noneval = value_noneable @classmethod def noneable(cls, f): if not callable(f): raise TypeError("'f' should be a callable: {f!r}") @wraps(f) def wrapper(*args, **kwargs): res = f(*args, **kwargs) if res is None: return Nothing return Some(res) return wrapper @classmethod def exceptable(cls, f_or_error, *more_errors): if inspect.isclass(f_or_error) and issubclass(f_or_error, Exception): errors = (f_or_error,) + tuple(more_errors) def deco(f): return cls._wrap_exceptable(f, errors) return deco if callable(f_or_error): f = f_or_error errors = Exception return cls._wrap_exceptable(f, errors) @classmethod def _wrap_exceptable(cls, f, errors): @wraps(f) def wrapper(*args, **kwargs): try: res = f(*args, **kwargs) except errors: return Nothing else: return Some(res) return wrapper @classmethod def unit(cls, value): return Some(value) @abstractmethod def join_noneable(self): raise NotImplementedError() @abstractproperty def value(self): raise NotImplementedError() @abstractproperty def _value_for_cmp(self): raise NotImplementedError() @abstractmethod def get_or(self, default): raise NotImplementedError() @abstractmethod def or_opt_call(self, func): raise NotImplementedError() @abstractmethod def get_or_none(self): raise NotImplementedError() @abstractmethod def is_some(self): raise NotImplementedError() @abstractmethod def is_nothing(self): raise NotImplementedError() def pluck(self, key): return self.map(lambda d: d[key]) def pluck_opt(self, key): return self.flat_map(lambda d: Some(d[key]) if key in d else Nothing) def pluck_attr(self, attr): return self.map(lambda obj: getattr(obj, attr)) class NothingCls(Optional): __slots__ = () __instance = None def __new__(cls): if cls.__instance is None: cls.__instance = super().__new__(cls) return cls.__instance def flat_map(self, maybe_action): return Nothing def map(self, action): return Nothing def then(self, maybe_value): return Nothing def flatten(self): return Nothing def join_noneable(self): return Nothing def ap(self, maybe_value): return Nothing @property def value(self): raise NothingError('Nothing here') _value_for_cmp = () def get_or(self, else_value=None): return else_value def get_or_none(self): return None def or_opt_call(self, func): return func() def is_some(self): return False def is_nothing(self): return True def __len__(self): return 0 def __iter__(self): return def __repr__(self): return 'Nothing' def __str__(self): return 'Nothing' NothingCls.__name__ = 'Nothing' Nothing = NothingCls() def identity(_): return _ class Some(Optional): __slots__ = '_some_value' def __init__(self, value): self._some_value = value def flat_map(self, maybe_action): value = maybe_action(self._some_value) if isinstance(value, Optional): return value else: raise TypeError('function should return a Optional') def map(self, action): return Some(action(self._some_value)) def then(self, maybe_value): return maybe_value def flatten(self): if isinstance(self._some_value, Optional): return self._some_value else: raise TypeError('value should be a Optional') def join_noneable(self): if self._some_value is None: return Nothing return self def ap(self, maybe_value): if maybe_value.is_some(): return self._some_value(maybe_value.get()) elif maybe_value.is_nothing(): return Nothing @property def value(self): return self._some_value @property def _value_for_cmp(self): return (self._some_value,) def get_or(self, else_value=None): return self._some_value def or_opt_call(self, func): return self def get_or_none(self): return self._some_value def is_some(self): return True def is_nothing(self): return False def __len__(self): raise 1 def __iter__(self): yield self._some_value def __repr__(self): return f'Some({self._some_value!r})' def __str__(self): return f'Some({self._some_value!s})' def do(self, func): '''Call function using this item as parameter. ''' func(self._some_value) class SomeOp(Some): def __call__(self, *args, **kwargs): return Some(self._some_value(*args, **kwargs)) def __getattr__(self, name): if hasattr(self._some_value, name): return Some(getattr(self._some_value, name)) else: return Nothing def __setattr__(self, name, v): if name == f"_some_value": return super().__setattr__(name, v) return setattr(self._some_value, name, v) def __getitem__(self, key): return Some(self._some_value[key]) def __setitem__(self, key, value): self._some_value[key] = value def __delitem__(self, key): del self._some_value[key] def __len__(self): return len(self._some_value) def __iter__(self): return iter(self._some_value) def __reversed__(self): return Some(reversed(self._some_value)) # def __missing__(self, key): # klass = self._some_value.__class__ # if hasattr(klass, '__missing__') and \ # callable(getattr(klass, '__missing__')): # return Some(self._some_value.__missing__(key)) # return Nothing def __repr__(self): return f"{type(self).__name__}({self._some_value!r})" def __str__(self): return f"{type(self).__name__}({self._some_value!s})" def __int__(self): return int(self._some_value) def __float__(self): return float(self._some_value) def __complex__(self): return complex(self._some_value) def __oct__(self): return oct(self._some_value) def __hex__(self): return hex(self._some_value) def __index__(self): return self._some_value.__index__() def __trunc__(self): return self._some_value.__trunc__() def __dir__(self): return dir(self._some_value) def __add__(self, other): return Some(self._some_value + other) def __sub__(self, other): return Some(self._some_value - other) def __mul__(self, other): return Some(self._some_value * other) def __floordiv__(self, other): return Some(self._some_value // other) def __div__(self, other): return Some(self._some_value / other) def __mod__(self, other): return Some(self._some_value % other) def __divmod__(self, other): return Some(divmod(self._some_value, other)) def __pow__(self, other): return Some(self._some_value ** other) def __lshift__(self, other): return Some(self._some_value << other) def __rshift__(self, other): return Some(self._some_value >> other) def __and__(self, other): return Some(self._some_value & other) def __or__(self, other): return Some(self._some_value | other) def __xor__(self, other): return Some(self._some_value ^ other) def __radd__(self, other): return Some(other + self._some_value) def __rsub__(self, other): return Some(other - self._some_value) def __rmul__(self, other): return Some(other * self._some_value) def __rfloordiv__(self, other): return Some(other // self._some_value) def __rdiv__(self, other): return Some(other / self._some_value) def __rmod__(self, other): return Some(other % self._some_value) def __rdivmod__(self, other): return Some(divmod(other, self._some_value)) def __rpow__(self, other): return Some(other ** self._some_value) def __rlshift__(self, other): return Some(other << self._some_value) def __rrshift__(self, other): return Some(other >> self._some_value) def __rand__(self, other): return Some(other & self._some_value) def __ror__(self, other): return Some(other | self._some_value) def __rxor__(self, other): return Some(other ^ self._some_value) def __iadd__(self, other): self._some_value += other return self def __isub__(self, other): self._some_value -= other return self def __imul__(self, other): self._some_value *= other return self def __ifloordiv__(self, other): self._some_value //= other return self def __idiv__(self, other): self._some_value /= other return self def __imod__(self, other): self._some_value %= other return self def __ipow__(self, other): self._some_value **= other return self def __ilshift__(self, other): self._some_value <<= other return self def __irshift__(self, other): self._some_value >>= other return self def __iand__(self, other): self._some_value &= other return self def __ior__(self, other): self._some_value |= other return self def __ixor__(self, other): self._some_value ^= other return self class SomeOptionalNone(Some): def __init__(self, value): self._some_value = value def __call__(self, *args, **kwargs): return Optional(self._some_value(*args, **kwargs)) def __getattr__(self, name): attr = getattr(self._some_value, name) if callable(attr): return SomeOptionalNone(attr) return Optional(attr) def __getitem__(self, key): return Optional(self._some_value[key]) def __reversed__(self): return Optional(reversed(self._some_value)) def __missing__(self, key): # TODO: review klass = self._some_value.__class__ if (hasattr(klass, '__missing__') and callable(getattr(klass, '__missing__'))): return Optional(self._some_value.__missing__(key)) return Nothing def __add__(self, other): return Optional(self._some_value + other) def __sub__(self, other): return Optional(self._some_value - other) def __mul__(self, other): return Optional(self._some_value * other) def __floordiv__(self, other): return Optional(self._some_value // other) def __div__(self, other): return Optional(self._some_value / other) def __mod__(self, other): return Optional(self._some_value % other) def __divmod__(self, other): return Optional(divmod(self._some_value, other)) def __pow__(self, other): return Optional(self._some_value ** other) def __lshift__(self, other): return Optional(self._some_value << other) def __rshift__(self, other): return Optional(self._some_value >> other) def __and__(self, other): return Optional(self._some_value & other) def __or__(self, other): return Optional(self._some_value | other) def __xor__(self, other): return Optional(self._some_value ^ other) def __radd__(self, other): return Optional(other + self._some_value) def __rsub__(self, other): return Optional(other - self._some_value) def __rmul__(self, other): return Optional(other * self._some_value) def __rfloordiv__(self, other): return Optional(other // self._some_value) def __rdiv__(self, other): return Optional(other / self._some_value) def __rmod__(self, other): return Optional(other % self._some_value) def __rdivmod__(self, other): return Optional(divmod(other, self._some_value)) def __rpow__(self, other): return Optional(other ** self._some_value) def __rlshift__(self, other): return Optional(other << self._some_value) def __rrshift__(self, other): return Optional(other >> self._some_value) def __rand__(self, other): return Optional(other & self._some_value) def __ror__(self, other): return Optional(other | self._some_value) def __rxor__(self, other): return Optional(other ^ self._some_value) PK! ;TTcarriage/pipeline.py class Transformer: __slots__ = '_name', '_func' def __init__(self, name, func): self._name = name self._func = func def transform(self, data): return self._func(data) @property def name(self): return self._name def __repr__(self): return f'<{type(self).__name__} {self._name}>' class Pipeline: __slots__ = '_transformers', def __init__(self, transformers=None): if transformers is None: transformers = [] self._transformers = transformers def transform(self, data): for transformer in self._transformers: data = transformer.transform(data) return data @property def transformers(self): return self._transformers def then(self, transformer): return type(self)(self._transformers + [transformer]) def extended(self, other): return type(self)(self._transformers + other._transformers) def __repr__(self): return f'<{type(self).__name__} {self._transformers!r}>' def is_empty(self): return len(self._transformers) == 0 def __len__(self): return len(self._transformers) def __str__(self): return ( # f'{type(self).__name__}\n -> ' + '\n'.join(f' -> {trfmr.name}' for trfmr in self._transformers) ) PK!:ttcarriage/repr.pyimport reprlib short_repr = reprlib.Repr() short_repr.maxlist = 30 short_repr.maxset = 30 short_repr.maxdict = 30 short_repr.maxtuple = 30 short_repr.maxset = 30 short_repr.maxfrozenset = 30 short_repr.maxdeque = 30 short_repr.maxarray = 30 short_repr.maxlong = 100 short_repr.maxstring = 200 short_repr.maxother = 200 def repr_args(*args, **kwargs): args_str = ', '.join(repr(arg) for arg in args) kwargs_str = ', '.join(f'{k}={v!r}' for k, v in kwargs.items()) if args_str and kwargs_str: return f'{args_str}, {kwargs_str}' elif args_str: return args_str else: return kwargs_str PK!<,ocarriage/row.pyimport itertools as itt from .repr import short_repr class Row(tuple): '''A named tuple like type without the need of declaring field names in advance. A Row object can be created anytime when you need it. >>> Row(name='Joe', age=30, height=170) Row(name='Joe', age=30, height=170) >>> Row.from_values([1, 2, 3], fields=['x', 'y', 'z']) Row(x=1, y=2, z=3) If you are too lazy to name the fields. >>> Row.from_values([1, 'a', 9]) Row(f0=1, f1='a', f2=9) You can access field using index or field name in ``O(1)``. >>> row = Row(name='Joe', age=30, height=170) >>> row.name 'Joe' >>> row[2] 170 And it provides some useful method for transforming, converting. Because Row is immutable type, all these method create a new Row object. >>> row.evolve(height=180) # I hope so Row(name='Joe', age=30, height=180) >>> row.evolve(age=row.age + 1) Row(name='Joe', age=31, height=170) >>> row.to_dict() {'name': 'Joe', 'age': 30, 'height': 170} >>> row.to_map() Map({'name': 'Joe', 'age': 30, 'height': 170}) Row is iterable. You can unpack it. >>> name, age, height = row >>> name 'Joe' >>> age 30 ''' @classmethod def from_values(cls, values, fields=None): '''Create Row from values >>> Row.from_values([1, 2, 3]) Row(f0=1, f1=2, f2=3) >>> Row.from_values([1, 2, 3], fields=['x', 'y', 'z']) Row(x=1, y=2, z=3) ''' if fields is None: return cls(**{f'f{i}': v for i, v in enumerate(values)}) else: return cls(**{f: v for f, v in zip(fields, values)}) @classmethod def from_dict(cls, adict, fields=None): '''Create Row from a iterable >>> Row.from_dict({'name': 'Joe', 'age': 30}) Row(name='Joe', age=30) ''' if fields is None: return cls(**adict) else: return cls(**{f: adict[f] for f in fields}) def __new__(self, **kwargs): '''Create Row by field names and values >>> Row(name='Joe', age=30, height=170) Row(name='Joe', age=30, height=170) ''' row = tuple.__new__(self, kwargs.values()) row._dict = kwargs return row def __getnewargs_ex__(self): return ((), self._dict) def __getattribute__(self, name): if name in tuple.__getattribute__(self, '_dict'): # Accessing Row fields has higher priority # than accessing tuple methods return tuple.__getattribute__(self, '_dict')[name] else: return tuple.__getattribute__(self, name) # def __getattr__(self, name): # if name in self._dict: # return self._dict[name] # else: # raise AttributeError(f'{self!r} has no attribute {name!r}') def get_opt(self, field): '''Get field in Optional type >>> from carriage.optional import Some, Nothing >>> Row(x=3, y=4).get_opt('x') Some(3) >>> Row(x=3, y=4).get_opt('z') Nothing Parameters ---------- field : str field name Returns ------- Just(value) if field exist Nothing if field doesn't exist ''' from .optional import Some, Nothing if field in self._dict: return Some(getattr(self, field)) return Nothing def get(self, field, fillvalue=None): '''Get field >>> Row(x=3, y=4).get('x') 3 >>> Row(x=3, y=4).get('z', 0) 0 ''' if field in self._dict: return getattr(self, field) return fillvalue def has_field(self, field): '''Has field >>> Row(x=3, y=4).has_field('x') True ''' return field in self._dict def __setattr__(self, name, value): if name != '_dict': raise TypeError("'Row' object does not support item assignment") else: super().__setattr__(name, value) def fields(self): return self._dict.keys() def evolve(self, **kwargs): '''Create a new Row by replacing or adding other fields >>> row = Row(x=23, y=9) >>> row.evolve(y=12) Row(x=23, y=12) >>> row.evolve(z=3) Row(x=23, y=9, z=3) ''' d = self._dict.copy() d.update(kwargs) return Row(**d) def project(self, *fields): '''Create a new Row by keeping only specified fields >>> row = Row(x=2, y=3, z=4) >>> row.project('x', 'y') Row(x=2, y=3) ''' fields = set(fields) return Row(**{field: value for field, value in self._dict.items() if field in fields}) def without(self, *fields): '''Create a new Row by removing only specified fields >>> row = Row(x=2, y=3, z=4) >>> row.without('z') Row(x=2, y=3) ''' fields = set(fields) return Row(**{field: value for field, value in self._dict.items() if field not in fields}) def merge(self, *rows): '''Create a new merged Row. If there's duplicated field name, keep the last value. >>> row = Row(x=2, y=3) >>> row.merge(Row(y=4, z=5), Row(z=6, u=7)) Row(x=2, y=4, z=6, u=7) ''' field_value_pairs = itt.chain.from_iterable(row._dict.items() for row in (self,) + rows) return Row(**{k: v for k, v in field_value_pairs}) def rename_fields(self, **kwargs): '''Create a new Row that field names renamed. >>> row = Row(a=2, b=3, c=4) >>> row.rename_fields(a='x', b='y') Row(x=2, y=3, c=4) ''' return Row(**{kwargs.get(k, k): v for k, v in self._dict.items()}) def transform(self, **kwargs): d = self._dict.copy() d.update({k: f(getattr(self, k))for k, f in kwargs.items()}) return Row(**d) def to_dict(self): '''Convert to dict''' return self._dict.copy() def to_map(self): '''Convert to Map''' from .map import Map return Map(**self.to_dict()) def to_tuple(self): '''Convert to tuple''' return tuple(self) def to_fields(self): '''Convert to rows >>> Row(x=3, y=4).to_fields() [Row(field='x', value=3), Row(field='y', value=4)] ''' return [Row(field=k, value=v) for k, v in self._dict.items()] def iter_fields(self): '''Convert to rows >>> list(Row(x=3, y=4).iter_fields()) [Row(field='x', value=3), Row(field='y', value=4)] ''' return (Row(field=k, value=v) for k, v in self._dict.items()) def to_list(self): '''Convert to list''' return list(self) def __repr__(self): kwargs_str = ', '.join( f'{k}={short_repr.repr(v)}' for k, v in self._dict.items()) return f'Row({kwargs_str})' class namedrow: def __init__(self, *fields): self._fields = fields def __call__(self, *args, **kwargs): if args and kwargs: raise ValueError( 'Cannot use both args and kwargs to create {type(self)}') if args: return Row(**{field: value for field, value in zip(self._fields, args)}) if kwargs: return Row(**{field: kwargs[field] for field in self._fields}) CurrPrev = namedrow('curr', 'prev') CurrNext = namedrow('curr', 'prev') ValueIndex = namedrow('value', 'index') KeyValues = namedrow('key', 'values') KeyValue = namedrow('key', 'value') PK!E\]]carriage/stream.py import builtins import functools as fnt import heapq import io import itertools as itt import reprlib from collections import Counter, defaultdict, deque from pathlib import Path from tabulate import tabulate, tabulate_formats from .array import Array from .monad import Monad from .optional import Nothing, Some from .pipeline import Pipeline, Transformer from .repr import repr_args, short_repr from .row import CurrNext, CurrPrev, KeyValues, Row, ValueIndex def as_stream(f): @fnt.wraps(f) def wraped(self, *args, **kwargs): args_str = repr_args(*args, **kwargs) trfmr = Transformer(name=f'{f.__name__}({args_str})', func=f(self, *args, **kwargs)) return type(self)( iterable=self._iterable, pipeline=self._pipeline.then(trfmr)) wraped.call = f return wraped class Stream(Monad): '''An iterable wrapper for building a lazy-evaluating sequence transformation pipeline. Stream is initiated by providing any iterable object like list, tuple, iterator and even an infinite one. >>> strm = Stream(range(10)) >>> strm = Stream([1, 2, 3]) Some classmethods are provided for creating common Stream instances. >>> strm = Stream.range(0, 10, 2) >>> strm = Stream.count(0, 5) Stream instance is immutable. Calling a transforamtion function would create a new Stream instance everytime. But don't worry, because of it's lazy-evaluating characteristic, no duplicated data are generated. >>> strm1 = Stream.range(5, 10) >>> strm2 = strm1.map(lambda n: n * 2) >>> strm3 = strm1.map(lambda n: n * 3) >>> strm1 is strm2 or strm1 is strm3 or strm2 is strm3 False To evaluate a Stream instance, call an action function. >>> strm = Stream.range(5, 10).map(lambda n: n * 2).take(3) >>> strm.sum() 36 >>> strm.to_list() [10, 12, 14] ''' __slots__ = '_iterable', '_pipeline' def __init__(self, iterable, *, pipeline=None): '''Create a Stream from any iterable object. >>> strm = Stream([1,2,3]) >>> strm = Stream(range(10, 15)) >>> adict = {'a': 1, 'b': 2} >>> strm = Stream(adict.items()) Parameters ---------- iterable : any iterable. list, tuple, iterator Input iterable object. transformer : function of type ``iterable -> iterable`` This is for internal use only. ''' self._iterable = iterable if pipeline is None: pipeline = Pipeline() self._pipeline = pipeline def show_pipeline(self, n=2): '''Show pipeline and some examples for debugging >>> def mul_2(x): ... return x*2 >>> (Stream ... .range(10) ... .map(mul_2) ... .nlargest(3) ... .show_pipeline(2)) # doctest: +SKIP range(0, 10) [0] 0 [1] 1 -> map() [0] 0 [1] 2 -> nlargest(3) [0] 2 [1] 0 ''' print(short_repr.repr(self._iterable)) elems = list(itt.islice(self._iterable, 0, n)) for index, elem in enumerate(elems): print(f' [{index}] {elem!r}') for trfmr in self._pipeline.transformers: print(f' -> {trfmr.name}') elems = list(trfmr.transform(elems)) for index, elem in enumerate(elems): print(f' [{index}] {elem!r}') @classmethod def range(cls, start, end=None, step=1): '''Create a Stream from range. >>> Stream.range(2, 10, 2).to_list() [2, 4, 6, 8] >>> Stream.range(3).to_list() [0, 1, 2] ''' if end is None: start, end = 0, start return cls(range(start, end, step)) @classmethod def count(cls, start, step=1): '''Create a infinite consecutive Stream >>> Stream.count(0, 3).take(3).to_list() [0, 3, 6] ''' return cls(itt.count(start, step)) @classmethod def repeat(cls, elems, times=None): '''Create a Stream repeating elems >>> Stream.repeat(1, 3).to_list() [1, 1, 1] >>> Stream.repeat([1, 2, 3], 2).to_list() [[1, 2, 3], [1, 2, 3]] ''' if times is None: return cls(itt.repeat(elems)) return cls(itt.repeat(elems, times=times)) @classmethod def cycle(cls, iterable): '''Create a Stream cycling a iterable >>> Stream.cycle([1,2]).take(5).to_list() [1, 2, 1, 2, 1] ''' return cls(itt.cycle(iterable)) @classmethod def repeatedly(cls, func, times=None): '''Create a Stream repeatedly calling a zero parameter function >>> def counter(): ... counter.num += 1 ... return counter.num >>> counter.num = -1 >>> Stream.repeatedly(counter, 5).to_list() [0, 1, 2, 3, 4] ''' def repeatedly_gen(times): while True: if times is None: yield func() elif times > 0: yield func() times -= 1 else: return return cls(repeatedly_gen(times)) @classmethod def iterate(cls, func, x): '''Create a Stream recursively applying a function to last return value. >>> def multiply2(x): return x * 2 >>> Stream.iterate(multiply2, 3).take(4).to_list() [3, 6, 12, 24] ''' def iterate_gen(x): while True: yield x x = func(x) return cls(iterate_gen(x)) @classmethod def read_txt(cls, path): '''Create from a text file. Treat lines as elements and remove newline character. >>> Stream.read_txt(path) # doctest: +SKIP Parameters ---------- path : str or path or file object path to the input file ''' if isinstance(path, io.TextIOBase): f = path else: f = Path(path).open('rt') return Stream(f).map(lambda line: line.strip('\n')) def write_txt(self, path, sep='\n'): '''Write into a text file. All elements will be applied ``str()`` before write to the file. >>> Stream.range(10).write_txt('nums.txt') Parameters ---------- path : str or path or file object path to the input file sep : str element separator. defaults to '\n' ''' if isinstance(path, io.TextIOBase): f = path self._write_txt_file(f, sep) else: with Path(path).open('wt') as f: self._write_txt_file(f, sep) def _write_txt_file(self, f, sep='\n'): self.for_each(lambda line: f.write(str(line) + sep)) @property def _base_type(self): return Stream @classmethod def unit(cls, value): return Stream([value]) def to_list(self): '''Convert to a list. >>> Stream.range(5, 10, 2).to_list() [5, 7, 9] Returns ------- list ''' return list(self) def to_series(self): '''Convert to a pandas Series >>> Stream.range(5, 10, 2).to_series() 0 5 1 7 2 9 dtype: int64 Returns ------- pandas.Series ''' import pandas as pd return pd.Series(list(self)) def to_streamtable(self): '''Convert to StreamTable All elements should be in Row type Returns ------- StreamTable ''' from .streamtable import StreamTable return StreamTable(self) def to_set(self): '''Convert to a set >>> Stream.cycle([1, 2, 3]).take(5).to_set() {1, 2, 3} Returns ------- set ''' return set(self) def to_dict(self): '''Convert to a dict >>> Stream.range(5, 10, 2).zip_index().to_dict() {5: 0, 7: 1, 9: 2} Returns ------- dict ''' return dict(self) def to_map(self): '''Convert to a Map >>> Stream.range(5, 10, 2).zip_index().to_map() Map({5: 0, 7: 1, 9: 2}) Returns ------- Map ''' from .map import Map return Map(self) def to_array(self): '''Convert to a Map >>> Stream.range(5, 8, 2).zip_index().to_array() Array([Row(value=5, index=0), Row(value=7, index=1)]) Returns ------- Array ''' return Array(self) @as_stream def tuple_as_row(self, fields): '''Create a new Stream with elements as Row objects >>> Stream([(1, 2), (3, 4)]).tuple_as_row(['x', 'y']).to_list() [Row(x=1, y=2), Row(x=3, y=4)] ''' return fnt.partial(map, lambda tpl: Row.from_values(tpl, fields=fields)) @as_stream def dict_as_row(self, fields=None): '''Create a new Stream with elements as Row objects >>> stm = Stream([{'name': 'John', 'age': 35}, ... {'name': 'Frank', 'age': 28}]) >>> stm.dict_as_row().to_list() [Row(name='John', age=35), Row(name='Frank', age=28)] >>> stm.dict_as_row(['age', 'name']).to_list() [Row(age=35, name='John'), Row(age=28, name='Frank')] ''' return fnt.partial(map, lambda d: Row.from_dict(d, fields=fields)) @as_stream def map(self, func): '''Create a new Stream by applying function to each element >>> Stream.range(5, 8).map(lambda x: x * 2).to_list() [10, 12, 14] Returns ------- Stream ''' return fnt.partial(map, func) @as_stream def starmap(self, func): '''Create a new Stream by evaluating function using argument tulpe from each element. i.e. ``func(*elem)``. It's convenient that if all elements in Stream are iterable and you want to treat each element in elemnts as separate argument while calling the function. >>> Stream([(1, 2), (3, 4)]).starmap(lambda a, b: a+b).to_list() [3, 7] >>> Stream([(1, 2), (3, 4)]).map(lambda a_b: a_b[0]+a_b[1]).to_list() [3, 7] ''' return fnt.partial(itt.starmap, func) @as_stream def flatten(self): '''flatten each element >>> Stream([(1, 2), (3, 4)]).flatten().to_list() [1, 2, 3, 4] Returns ------- Stream ''' return itt.chain.from_iterable @as_stream def flat_map(self, to_iterable_func): '''Apply function to each element, then flatten the result. >>> Stream([1, 2, 3]).flat_map(range).to_list() [0, 0, 1, 0, 1, 2] Returns ------- Stream ''' def flat_map_tr(iterable): return itt.chain.from_iterable(map(to_iterable_func, iterable)) return flat_map_tr @as_stream def tap(self, tag='', n=5, msg_format='{tag}:{index}: {elem}'): '''A debugging tool. This method create a new Stream with the same elements. While evaluating Stream, it print first n elements. >>> (Stream.range(3).tap('orig') ... .map(lambda x: x * 2).tap_with(lambda i, e: f'{i} -> {e}') ... .accumulate(lambda a, b: a + b).tap('acc') ... .tap(msg_format='end\\n') ... .to_list()) orig:0: 0 0 -> 0 acc:0: 0 end orig:1: 1 1 -> 2 acc:1: 2 end orig:2: 2 2 -> 4 acc:2: 6 end [0, 2, 6] ''' def tap_tr(self_): for index, elem in enumerate(self_): if index < n: print(msg_format.format(tag=tag, index=index, elem=elem)) yield elem return tap_tr @as_stream def tap_with(self, func, n=5): '''A debugging tool. This method create a new Stream with the same elements. While evaluating Stream, it call the function using index and element then prints the return value for first n elements. >>> (Stream.range(3).tap('orig') ... .map(lambda x: x * 2).tap('x2') ... .accumulate(lambda a, b: a + b).tap('acc') ... .to_list()) orig:0: 0 x2:0: 0 acc:0: 0 orig:1: 1 x2:1: 2 acc:1: 2 orig:2: 2 x2:2: 4 acc:2: 6 [0, 2, 6] Parameters ----------- func : ``func(index, elem) -> Any`` Function for building the printing object. n : int First n element will be print. ''' def tap_with_tr(self_): for index, elem in enumerate(self_): if index < n: print(func(index, elem)) yield elem return tap_with_tr def then(self, alist): # TODO if len(self._items) > 0: return alist else: return self def ap(self, avalue): # TODO pass def __iter__(self): return iter(self._pipeline.transform(self._iterable)) @reprlib.recursive_repr() def __repr__(self): # TODO: let user control use or not to use short_repr if self._pipeline.is_empty(): return (f'{type(self).__name__}' f'({short_repr.repr(self._iterable)})') else: return (f'{type(self).__name__}' f'({short_repr.repr(self._iterable)}, {self._pipeline!r})') @property def _value_for_cmp(self): return list(self) def len(self): '''Get the length of the Stream Returns ------- int ''' return sum(1 for item in self) @classmethod def _check_index_range(cls, index): if index is not None and index < 0: raise ValueError( 'Stream index should be greater than 0. ' 'Be aware of that indexing an iterator would ' 'consume items from it.') def __getitem__(self, index): '''Get the item in the nth position if index is integer. Get a Stream of a slice of items if index is a slice object. Note if the source iterable is an iterator, you might get unexpected element if you call ``__getitem__()`` multiple times. >>> s = Stream(range(5, 12)) >>> s[2] 7 >>> s[2] 7 >>> s = Stream(iter(range(5, 12))) >>> s[2] 7 >>> s[2] 10 >>> s[2] Traceback (most recent call last): ... IndexError: Stream index out of range. Be aware of that indexing an iterator would consume items from it. >>> s = Stream(range(5, 12)) >>> type(s[:3]) >>> s[:3].to_list() [5, 6, 7] Parameters ---------- index : int, slice index of target item or a slice object ''' # noqa # TODO: support negative index if isinstance(index, slice): return self.slice(index.start, index.stop, index.step) else: try: return next(itt.islice(self, index, None)) except StopIteration: raise IndexError( 'Stream index out of range. ' 'Be aware of that indexing an iterator would ' 'consume items from it.') def get(self, index, default=None): '''Get item of the index. Return default value if not exists. >>> s = Stream.range(5, 12) >>> s.get(3) 8 >>> s.get(10) is None True >>> s.get(10, 0) 0 Returns ------- element ''' # TODO: support negative index # dd = deque(aa, maxlen=1) self._check_index_range(index) return next(itt.islice(self, index, None), default) def get_opt(self, index): '''Optionally get item of the index. Return Some(value) if exists. Otherwise return Nothing. >>> s = Stream.range(5, 12) >>> s.get_opt(3) Some(8) >>> s.get_opt(10) Nothing >>> s.get_opt(10).get_or(0) 0 >>> s.get_opt(3).map(lambda n: n * 2).get_or(0) 16 >>> s.get_opt(10).map(lambda n: n * 2).get_or(0) 0 Returns ------- Optional[element] ''' try: return Some(self[index]) except IndexError: return Nothing @as_stream def slice(self, start, stop, step=None): '''Create a Stream from the slice of items. >>> Stream(list(range(10))).slice(5, 8).to_list() [5, 6, 7] Returns ------- Stream[element] ''' self._check_index_range(start) self._check_index_range(stop) return lambda iterable: itt.islice(iterable, start, stop, step) def first(self): '''Get first element >>> Stream(dict(a=3, b=4, c=5).items()).first() ('a', 3) Returns ------- element ''' return self[0] def second(self): '''Get second element >>> Stream(dict(a=3, b=4, c=5).items()).second() ('b', 4) Returns ------- element ''' return self[1] def last(self): '''Get last element Returns ------- element ''' return deque(self, 1)[-1] def first_opt(self): '''Get first element as Some(element), or Nothing if not exists Returns ------- Optional[element] ''' return self.get_opt(0) def second_opt(self): '''Get second element as Some(element), or Nothing if not exists Returns ------- Optional[element] ''' return self.get_opt(1) def last_opt(self): '''Get last element as Some(element), or Nothing if not exists Returns ------- Optional[element] ''' dq = deque(self, 1) if len(dq) > 0: return Some(dq[-1]) return Nothing def find(self, pred): '''Get first element satifying predicate >>> Stream.range(5, 100).find(lambda n: n % 7 == 0) 7 Returns ------- element ''' for item in self: if pred(item): return item def find_opt(self, pred): '''Optionally get first element satifying predicate. Return Some(element) if exist Otherwise return Nothing >>> Stream.range(5, 100).find_opt(lambda n: n * 3 + 5 == 40) Nothing >>> Stream.range(5, 100).find_opt(lambda n: n % 7 == 0) Some(7) Returns ------- Optional[element] ''' for item in self: if pred(item): return Some(item) else: return Nothing def take(self, n): '''Create a new Stream contains only first n element >>> Stream(dict(a=3, b=4, c=5).items()).take(2).to_list() [('a', 3), ('b', 4)] ''' return self[:n] def drop(self, n): '''Create a new Stream with first n element dropped >>> Stream(dict(a=3, b=4, c=5).items()).drop(2).to_list() [('c', 5)] ''' return self[n:] def tail(self): '''Create a new Stream with first element dropped >>> Stream(dict(a=3, b=4, c=5).items()).tail().to_list() [('b', 4), ('c', 5)] ''' return self[1:] # def butlast(self): # return self[:-1] # def takeright(self, n): # return Array(self._items[-n:]) # def dropright(self, n): # return Array(self._items[:-n]) @as_stream def take_while(self, pred): '''Create a new Stream with successive elements as long as predicate evaluates to true. >>> Stream.range(10).take_while(lambda n: n % 5 < 3).to_list() [0, 1, 2] ''' return fnt.partial(itt.takewhile, pred) takewhile = take_while @as_stream def drop_while(self, pred): '''Create a new Stream without elements as long as predicate evaluates to true. ''' return fnt.partial(itt.dropwhile, pred) dropwhile = drop_while @as_stream def split_before(self, pred): '''Create a new Stream of Arrays by splitting before each element passing predicate. >>> Stream.range(10).split_before(lambda n: n % 3 == 2).to_list() [Array([0, 1]), Array([2, 3, 4]), Array([5, 6, 7]), Array([8, 9])] ''' def split_before_tr(iterable): segment = [] for item in iterable: if pred(item) and len(segment) > 0: yield Array(segment) segment = [] segment.append(item) yield Array(segment) return split_before_tr @as_stream def split_after(self, pred): '''Create a new Stream of Arrays by splitting after each element passing predicate. >>> Stream.range(10).split_after(lambda n: n % 3 == 2).to_list() [Array([0, 1, 2]), Array([3, 4, 5]), Array([6, 7, 8]), Array([9])] ''' def split_after_tr(iterable): segment = [] for item in iterable: segment.append(item) if pred(item): yield Array(segment) segment = [] if len(segment) > 0: yield Array(segment) return split_after_tr def pluck(self, key): '''Create a new Stream of values by evaluating ``elem[key]`` for each element. >>> s = Stream([dict(x=3, y=4), dict(x=4, y=5), dict(x=8, y=9)]) >>> s.pluck('x').to_list() [3, 4, 8] Returns ------- Stream[``element[key]``] ''' return self.map(lambda d: d[key]) def pluck_opt(self, key): '''Create a new Stream of Optional values by evaluating ``elem[key]`` for each element. Get ``Some(value)`` if the key exists for that element, otherwise get Nothing singleton. >>> s = Stream([dict(x=3, y=4), dict(y=5), dict(x=8, y=9)]) >>> s.pluck_opt('x').to_list() [Some(3), Nothing, Some(8)] >>> s.pluck_opt('x').map(lambda n_opt: n_opt.get_or(1)).to_list() [3, 1, 8] Returns ------- Stream[Optional(type of ``element[key]``)] ''' return self.map(lambda d: Some(d[key]) if key in d else Nothing) def pluck_attr(self, attr): '''Create a new Stream of Optional values by evaluating ``elem.attr`` of each element. Get ``Some(value)`` if attr exists for that element, otherwise get Nothing singleton. >>> from carriage import Row >>> s = Stream([Row(x=3, y=4), Row(x=4, y=5), Row(x=8, y=9)]) >>> s.pluck_attr('x').to_list() [3, 4, 8] Returns -------- Stream[type of ``element.attr``] ''' return self.map(lambda obj: getattr(obj, attr)) def without(self, *elems): '''Create a new Stream without specified elements. >>> Stream.range(10).without(3, 6, 9).to_list() [0, 1, 2, 4, 5, 7, 8] Returns -------- Stream[element] ''' try: elems = set(elems) except TypeError: # TODO: warn bad performance pass return self.filter(lambda elem: elem not in elems) @as_stream def filter(self, pred): '''Create a new Stream contains only elements passing predicate >>> Stream.range(10).filter(lambda n: n % 2 == 0).to_list() [0, 2, 4, 6, 8] ''' return fnt.partial(filter, pred) @as_stream def filter_false(self, pred): '''Create a new Stream contains only elements not passing predicate >>> Stream.range(10).filter_false(lambda n: n % 2 == 0).to_list() [1, 3, 5, 7, 9] ''' return fnt.partial(itt.filterfalse, pred) @as_stream def unique(self, key_func=None): '''Create a new Stream of unique elements >>> Stream.range(10).unique(lambda x: x // 3).to_list() [0, 3, 6, 9] ''' if key_func is None: def key_func(x): return x def unique_tr(iterable): visited_keys = set() for item in iterable: key = key_func(item) if key not in visited_keys: visited_keys.add(key) yield item return unique_tr @as_stream def interpose(self, sep): '''Create a new Stream by interposing separater between elemens. >>> Stream.range(5, 10).interpose(0).to_list() [5, 0, 6, 0, 7, 0, 8, 0, 9] ''' def interpose_tr(iterable): iterator = iter(iterable) yield next(iterator) for item in iterator: yield sep yield item return interpose_tr @as_stream def zip(self, *iterables): '''Create a new Stream by zipping elements with other iterables. >>> Stream.range(5, 8).zip([1,2,3]).to_list() [Row(f0=5, f1=1), Row(f0=6, f1=2), Row(f0=7, f1=3)] >>> Stream.range(5, 8).zip([1,2,3], [9, 10, 11]).to_list() [Row(f0=5, f1=1, f2=9), Row(f0=6, f1=2, f2=10), Row(f0=7, f1=3, f2=11)] >>> Stream.range(5, 8).zip([1,2]).to_list() [Row(f0=5, f1=1), Row(f0=6, f1=2)] >>> import itertools as itt >>> Stream.range(5, 8).zip(itt.count(10)).to_list() [Row(f0=5, f1=10), Row(f0=6, f1=11), Row(f0=7, f1=12)] ''' from carriage import Row def zip_tr(items): return map(Row.from_values, builtins.zip(items, *iterables)) return zip_tr @as_stream def zip_longest(self, *iterables, fillvalue=None): '''Create a new Stream by zipping elements with other iterables as long as possible. >>> Stream.range(5, 8).zip_longest([1,2]).to_list() [Row(f0=5, f1=1), Row(f0=6, f1=2), Row(f0=7, f1=None)] >>> Stream.range(5, 8).zip_longest([1,2], fillvalue=0).to_list() [Row(f0=5, f1=1), Row(f0=6, f1=2), Row(f0=7, f1=0)] ''' from carriage import Row def zip_longest_tr(items): return map(Row.from_values, itt.zip_longest(items, *iterables, fillvalue=fillvalue)) return zip_longest_tr @as_stream def zip_prev(self, fillvalue=None): '''Create a new Stream by zipping elements with previous one. >>> Stream.range(5, 8).zip_prev().to_list() [Row(curr=5, prev=None), Row(curr=6, prev=5), Row(curr=7, prev=6)] >>> Stream.range(5, 8).zip_prev(fillvalue=0).to_list() [Row(curr=5, prev=0), Row(curr=6, prev=5), Row(curr=7, prev=6)] ''' def zip_prev_tr(items): items, prevs = itt.tee(items) prevs = itt.chain([fillvalue], prevs) return itt.starmap(CurrPrev, builtins.zip(items, prevs)) return zip_prev_tr @as_stream def zip_next(self, fillvalue=None): '''Create a new Stream by zipping elements with next one. >>> Stream.range(5, 8).zip_next().to_list() [Row(curr=5, prev=6), Row(curr=6, prev=7), Row(curr=7, prev=None)] >>> Stream.range(5, 8).zip_next(fillvalue=1).to_list() [Row(curr=5, prev=6), Row(curr=6, prev=7), Row(curr=7, prev=1)] ''' def zip_next_tr(items): items, nexts = itt.tee(items) next(nexts, None) return itt.starmap( CurrNext, itt.zip_longest(items, nexts, fillvalue=fillvalue)) return zip_next_tr def zip_index(self, start=0): '''Create a new Stream by zipping elements with index. >>> Stream(['a', 'b', 'c']).zip_index().to_list() [Row(value='a', index=0), Row(value='b', index=1), Row(value='c', index=2)] >>> Stream(['a', 'b', 'c']).zip_index(1).to_list() [Row(value='a', index=1), Row(value='b', index=2), Row(value='c', index=3)] ''' # noqa return self.zip(itt.count(start)).starmap(ValueIndex) @as_stream def reversed(self): '''Create a new reversed Stream. >>> Stream(['a', 'b', 'c']).reversed().to_list() ['c', 'b', 'a'] ''' def reversed_tr(items): try: return reversed(items) except TypeError: return reversed(list(items)) return reversed_tr @as_stream def sorted(self, key=None, reverse=False): '''Create a new sorted Stream. ''' return fnt.partial(sorted, key=key, reverse=reverse) def sum(self): '''Get sum of elements''' return sum(self) def reduce(self, func): '''Apply a function of two arguments cumulatively to the elements in Stream from left to right. ''' return fnt.reduce(func, self) def fold_left(self, func, initial): '''Apply a function of two arguments cumulatively to the elements in Stream from left to right. ''' return fnt.reduce(func, self, initial) # def key_by(self, func): # return self.map(lambda elem: Row(key=func(elem), value=elem)) @as_stream def group_by_as_stream(self, key=None): '''Create a new Stream using the builtin itertools.groupby, which sequentially groups elements as long as the key function evaluates to the same value. Comparing to ``group_by_as_map``, there're some pros and cons. Cons: - Elements should be sorted by the key function first, or elements with the same key may be broken into different groups. Pros: - Key function doesn't have to be evaluated to a hashable value. It can be any type which supports ``__eq__``. - Lazy-evaluating. Consume less memory while grouping. Yield a group as soon as possible. ''' def group_by_tr(self_): for k, vs in itt.groupby(self_, key=key): yield KeyValues(key=k, values=Stream(vs)) return group_by_tr def group_by_as_map(self, key_func=None): '''Group values in to a Map by the value of key function evaluation result. Comparing to ``group_by_as_stream``, there're some pros and cons. Pros: * Elements don't need to be sorted by the key function first. You can call ``map_group_by`` anytime and correct grouping result. Cons: * Key function has to be evaluated to a hashable value. * Not Lazy-evaluating. Consume more memory while grouping. Yield a group as soon as possible. >>> Stream.range(10).group_by_as_map(key_func=lambda n: n % 3) Map({0: Array([0, 3, 6, 9]), 1: Array([1, 4, 7]), 2: Array([2, 5, 8])}) ''' from .map import Map key_to_grp = defaultdict(Array) for elem in self: key_to_grp[key_func(elem)].append(elem) return Map(key_to_grp) def multi_group_by_as_map(self, key=None): from .map import Map key_to_grp = defaultdict(list) for elem in self: for k in key(elem): key_to_grp[k].append(elem) return Map(key_to_grp) @as_stream def sliding_window(self, n, step=1): '''Create a new Stream instance that all elements are sliding windows of source elements. >>> (Stream('they have the same meaning'.split()) ... .sliding_window(3) ... .to_list()) [('they', 'have', 'the'), ('have', 'the', 'same'), ('the', 'same', 'meaning')] >>> (Stream('they have the same meaning'.split()) ... .sliding_window(3, step=2) ... .to_list()) [('they', 'have', 'the'), ('the', 'same', 'meaning')] ''' # noqa def sliding_window_tr(self_): self_itr = iter(self) dq = deque(itt.islice(self_itr, n - 1), maxlen=n) for item, cyc_idx in zip(self_itr, itt.cycle(range(step))): dq.append(item) if cyc_idx == 0: yield tuple(dq) return sliding_window_tr def mean(self): '''Get the average of elements. >>> Array.range(10).mean() 4.5 ''' length, summation = deque(enumerate(itt.accumulate(self), 1), 1).pop() return summation / length @as_stream def accumulate(self, func=None): '''Create a new Stream of calling ``itertools.accumulate``''' return fnt.partial(itt.accumulate, func=func) def value_counts(self): '''Get a Counter instance of elements counts Returns ------- Map[E, int] ''' from carriage import Map return Map(Counter(self)) @as_stream def extended(self, iterable): '''Create a new Stream that extends source Stream with another iterable''' def extended_tr(self_): return itt.chain(self_, iterable) return extended_tr def appended(self, elem): '''Create a new Stream that extends source Stream with another element. ''' return self.extended((elem,)) @as_stream def distincted(self, key_func=None): '''Create a new Stream with non-repeating elements. And elements are with the same order of first occurence in the source Stream. >>> Stream.range(10).distincted(lambda n: n//3).to_list() [0, 3, 6, 9] ''' if key_func is None: def key_func(x): return x def distincted_tr(items): key_set = set() for item in items: key_value = key_func(item) if key_value not in key_set: key_set.add(key_value) yield item return distincted_tr @as_stream def product(self, *iterables, repeat=1): def product_tr(self_): return itt.product(self_, *iterables, repeat=repeat) return product_tr @as_stream def permutations(self, r=None): return fnt.partial(itt.permutations, r=r) @as_stream def combinations(self, r): return fnt.partial(itt.combinations, r=r) @as_stream def combinations_with_replacement(self, r): return fnt.partial(itt.combinations_with_replacement, r=r) @as_stream def nsmallest(self, n, key=None): '''Get the n smallest elements. >>> Stream([1, 5, 2, 3, 6]).nsmallest(2).to_list() [1, 2] ''' def nsmallest_tr(self_): return heapq.nsmallest(n, iter(self_), key=key) return nsmallest_tr @as_stream def nlargest(self, n, key=None): '''Get the n largest elements. >>> Stream([1, 5, 2, 3, 6]).nlargest(2).to_list() [6, 5] ''' def nlargest_tr(self_): return heapq.nlargest(n, iter(self_), key=key) return nlargest_tr def tee(self, n=2): '''Copy the Stream into multiple Stream with the same elements. >>> itr = iter(range(3, 6)) >>> s1 = Stream(itr).map(lambda x: x * 2) >>> s2, s3 = s1.tee(2) >>> s2.map(lambda x: x * 2).to_list() [12, 16, 20] >>> s3.map(lambda x: x * 3).to_list() [18, 24, 30] ''' itrs = itt.tee(self, 2) return tuple(map(type(self), itrs)) # def copy(self): # return Array(copy(self._items)) def cache(self): '''Cache result ''' return type(self)(self.to_list()) @as_stream def chunk(self, n, strict=False): '''divide elements into chunks of n elements >>> s = Stream.range(5) >>> s.chunk(2).to_list() [Row(f0=0, f1=1), Row(f0=2, f1=3), Row(f0=4)] >>> s.chunk(2, strict=True).to_list() [Row(f0=0, f1=1), Row(f0=2, f1=3)] ''' from .row import Row def chunk_tr(self_): self_ = iter(self_) while True: row = Row.from_values(itt.islice(self_, n)) if len(row) == 0 or strict and len(row) != n: break yield row return self_ return chunk_tr def for_each(self, func): '''Call function for each element >>> s = Stream.range(3) >>> s.for_each(print) 0 1 2 ''' for elem in self: func(elem) def star_for_each(self, func): '''Call function for each element as agument tuple >>> s = Stream(['a', 'b', 'c']).zip_index(1) >>> s.star_for_each(lambda c, i: print(f'{i}:{c}')) 1:a 2:b 3:c ''' for elem in self: func(*elem) def make_string(self, elem_format='{elem!r}', start='[', elem_sep=', ', end=']'): '''Make string from elements >>> Stream.range(5, 8).make_string() '[5, 6, 7]' >>> print(Stream.range(5, 8).make_string(elem_sep='\\n', start='', end='', elem_format='{index}: {elem}')) 0: 5 1: 6 2: 7 ''' elems_str = elem_sep.join(elem_format.format(index=idx, elem=elem) for idx, elem in enumerate(self)) return start + elems_str + end PK! FZP-P-carriage/streamtable.pyimport functools as fnt import io import itertools as itt import json from pathlib import Path from tabulate import tabulate, tabulate_formats from .row import Row from .stream import Stream, as_stream class StreamTable(Stream): '''StreamTable is similar to Stream but designed to work on Rows only. ''' def __init__(self, iterable, *, pipeline=None): '''Create a StreamTable from an iterable object of Rows >>> stb = StreamTable([Row(x=1, y=3), Row(x=2, y=4)]) >>> stb.show() | x | y | |-----+-----| | 1 | 3 | | 2 | 4 | ''' Stream.__init__(self, iterable, pipeline=pipeline) @classmethod def range(cls, start, end=None, step=1): '''Create a StreamTable from range >>> StreamTable.range(1, 10, 3).show() | range | |---------| | 1 | | 4 | | 7 | ''' strm = Stream.range(start, end, step).map(lambda elem: Row(range=elem)) return cls(strm) @classmethod def count(cls, start, step=1): '''Create a inifinite consecutive StreamTable >>> StreamTable.count(3, 5).take(3).show() | count | |---------| | 3 | | 8 | | 13 | ''' strm = Stream.count(start, step).map(lambda elem: Row(count=elem)) return cls(strm) @classmethod def repeat(cls, elems, times=None): '''Create a StreamTable repeating elems >>> StreamTable.repeat(1, 3).show() | repeat | |----------| | 1 | | 1 | | 1 | ''' strm = Stream.repeat(elems, times).map(lambda elem: Row(repeat=elem)) return cls(strm) @classmethod def cycle(cls, iterable): '''Create a StreamTable cycling a iterable >>> StreamTable.cycle([1,2]).take(5).show() | cycle | |---------| | 1 | | 2 | | 1 | | 2 | | 1 | ''' strm = Stream.cycle(iterable).map(lambda elem: Row(cycle=elem)) return cls(strm) @classmethod def repeatedly(cls, func, times=None): '''Create a StreamTable repeatedly calling a zero parameter function >>> def counter(): ... counter.num += 1 ... return counter.num >>> counter.num = -1 >>> StreamTable.repeatedly(counter, 5).show() | repeatedly | |--------------| | 0 | | 1 | | 2 | | 3 | | 4 | ''' strm = Stream.repeatedly(func, times).map( lambda elem: Row(repeatedly=elem)) return cls(strm) @classmethod def iterate(cls, func, x): '''Create a StreamTable recursively applying a function to last return value. >>> def multiply2(x): return x * 2 >>> StreamTable.iterate(multiply2, 3).take(4).show() | iterate | |-----------| | 3 | | 6 | | 12 | | 24 | ''' strm = Stream.iterate(func, x).map(lambda elem: Row(iterate=elem)) return cls(strm) @classmethod def from_dataframe(cls, df, with_index=False): '''Create from Pandas DataFrame >>> import pandas as pd >>> df = pd.DataFrame([(0, 1), (2, 3)], columns=['a', 'b']) >>> StreamTable.from_dataframe(df).show() | a | b | |-----+-----| | 0 | 1 | | 2 | 3 | Parameters ---------- df : pandas.DataFrame source DataFrame with_index : bool include index value or not Returns ------- StreamTable ''' rows = Stream(df.itertuples()) rows = rows.map(lambda t: Row.from_values(t, fields=t._fields)) if not with_index: rows = rows.map(lambda row: row.without('Index')) return cls(rows.to_list()) @classmethod def from_tuples(cls, tuples, fields=None): '''Create from iterable of tuple >>> StreamTable.from_tuples([(1, 2), (3, 4)], fields=('x', 'y')).show() | x | y | |-----+-----| | 1 | 2 | | 3 | 4 | Parameters ---------- tuples : Iterable[tuple] data fields : Tuple[str] field names ''' stm = Stream(tuples).tuple_as_row(fields=fields) return cls(stm) @classmethod def read_jsonl(cls, path): '''Create from a jsonlines file >>> StreamTable.read_jsonl('person.jsonl') # doctest: +SKIP | name | age | |--------+-------| | john | 18 | | jane | 26 | Parameters ---------- path : str or path or file object path to the input file ''' from carriage import Row if isinstance(path, io.TextIOBase): f = path else: f = Path(path).open('rt') stm = (Stream(f) .map(json.loads) # dicts .map(Row.from_dict) ) return cls(stm) def write_jsonl(self, path): '''Write into file in the format of jsonlines >>> stb.write_jsonl('person.jsonl') # doctest: +SKIP Parameters ---------- path : str or path or file object path to the input file ''' if isinstance(path, io.TextIOBase): f = path self._write_jsonl_file(f) else: with Path(path).open('wt') as f: self._write_jsonl_file(f) def _write_jsonl_file(self, f): ( self .map(Row.to_dict) .map(json.dumps) .for_each(lambda line: f.write(line + '\n')) ) def to_dataframe(self): '''Convert to Pandas DataFrame Returns ------- pandas.DataFrame ''' import pandas as pd rows = self.to_list() fields = self._scan_fields(rows[:10]) return pd.DataFrame(rows, columns=fields) def to_stream(self): '''Convert to Stream Returns ------- Stream ''' return Stream(self) def to_dicts(self): return self.map(lambda row: row.to_dict()).to_list() def show(self, n=10): '''print rows Parameters ---------- n : int number of rows to show ''' try: from IPython.display import display display_func = display except ImportError: display_func = print showing_obj = _StreamTableShowing(self, n) display_func(showing_obj) def tabulate(self, n=10, tablefmt='orgtbl'): '''return tabulate formatted string Parameters ---------- n : int number of rows to show tablefmt : str output table format. all possible format strings are in `StreamTable.tabulate.tablefmts`` ''' rows = list(itt.islice(self, 0, n)) header_fields = self._scan_fields(rows) return tabulate( rows, headers=header_fields, tablefmt=tablefmt) @as_stream def map_fields(self, **field_funcs): '''Add or replace fields by applying each row to function >>> from carriage import Row, X >>> st = StreamTable([Row(x=3, y=4), Row(x=-1, y=2)]) >>> st.map_fields(z=X.x + X.y).to_list() [Row(x=3, y=4, z=7), Row(x=-1, y=2, z=1)] Parameters ---------- **field_funcs : Map[field_name, Function] Each function will be evaluated with the current row as the only argument, and the return value will be the new value of the field. Returns ------- StreamTable ''' return fnt.partial( map, lambda row: row.evolve(**{field: func(row) for field, func in field_funcs.items()})) @as_stream def select(self, *fields, **field_funcs): '''Keep only specified fields, and add/replace fields. >>> from carriage import Row, X >>> st = StreamTable([Row(x=3, y=4), Row(x=-1, y=2)]) >>> st.select('x', z=X.x + X.y, pi=3.14).to_list() [Row(x=3, z=7, pi=3.14), Row(x=-1, z=1, pi=3.14)] Parameters ---------- *fields : List[str] fields to keep **field_funcs : Map[str, Function or scalar] If value is a function, this function will be evaluated with the current row as the only argument. If value is not callable, use the value directly. Returns ------- StreamTable ''' return fnt.partial( map, lambda row: row.evolve(**{field: func(row) if callable(func) else func for field, func in field_funcs.items()}) .project(*fields, *field_funcs.keys())) @as_stream def explode(self, field): '''Expand each row into multiple rows for each element in the field >>> stb = StreamTable([Row(name='a', nums=[1,3,4]), Row(name='b', nums=[2, 1])]) >>> stb.explode('nums').show() | name | nums | |--------+--------| | a | 1 | | a | 3 | | a | 4 | | b | 2 | | b | 1 | ''' def _explode_row(row): for field_elem in getattr(row, field): yield row.evolve(**{field: field_elem}) def _flatmap_explodes(rows_iter): for row in rows_iter: yield from _explode_row(row) return _flatmap_explodes @as_stream def where(self, *conds, **kwconds): '''Create a new Stream contains only Rows pass all conditions. >>> from carriage import Row, X >>> st = StreamTable([Row(x=3, y=4), Row(x=3, y=5), Row(x=4, y=5)]) >>> st.where(x=3).to_list() [Row(x=3, y=4), Row(x=3, y=5)] >>> st.where(X.y > 4).to_list() [Row(x=3, y=5), Row(x=4, y=5)] Returns ------- StreamTable ''' return fnt.partial( filter, lambda row: all(cond(row) for cond in conds) and all(getattr(row, field) == value for field, value in kwconds.items())) @classmethod def _scan_fields(cls, rows): all_fields = [] all_fields_set = set() for row in rows: missing_fields = set(row.fields()) - all_fields_set for field in row.fields(): if field in missing_fields: all_fields.append(field) all_fields_set.update(missing_fields) return all_fields def _repr_html_(self): return self.tabulate(tablefmt='html') def __str__(self): return self.tabulate(tablefmt='orgtbl') class _StreamTableShowing(): def __init__(self, streamtable, n): self.streamtable = streamtable self.n = n def _repr_html_(self): return self.streamtable.tabulate(n=self.n, tablefmt='html') def __repr__(self): return self.streamtable.tabulate(n=self.n, tablefmt='orgtbl') StreamTable.tabulate.tablefmts = tabulate_formats PK!]{],],!carriage-0.4.13.dist-info/LICENSE Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. PK!H_zTTcarriage-0.4.13.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]n0H*J>mlcAPK!Hɩ 8"carriage-0.4.13.dist-info/METADATAMN0> +M*~,ZQh+*`X $1OPNp6 -pԖ`ek{;ASuhN!CPi (|beLj6tS)1XZYSXPJFeC¨ w9= GCr,2. d%F ɠ~<==?'**Yzh',3m-,h6$RNvL"ͽ(t49 !]tbr/GcEڙaK/bp %HyEW%)K׶e2ẖqn%ZPK!H˵l carriage-0.4.13.dist-info/RECORD}˲cPy?fc!!~AH<}gr:G{տWC (r E:!hw2+T}ToϮS`A$ _ai:>qp2@d{8W#Cćj6Jm**q0xpiW/ʻn,$ڰ8 s].>q#mj(vV?Uw(PօɆzY0GGP)-2D-c Huj1{b~[?o`ژfF5E1M -O(۔՗}ڔ]ъHo:cCB'&}4W'8f61p i?lPk!l1?Y{h+9UKΆ7dNV{M[ 7m0),_ke,\*%pB~ _! 9̕\RG F=(~JZnɒa`xSFͶ+]-WrQjx9dDMY ڡ_rve~j /'~>Ѫ{7O!l8 4~4 З(j2Ku\W^DClU2~] ~tw:;L `~_ Ӳ;ݭHre[䏶 ?~PK!ڳommcarriage/__init__.pyPK!*֬MMcarriage/array.pyPK!$$zOcarriage/lambda_.pyPK!6+KKtcarriage/map.pyPK!̍M carriage/monad.pyPK!F8jDjDcarriage/optional.pyPK! ;TT<carriage/pipeline.pyPK!:ttcarriage/repr.pyPK!<,odcarriage/row.pyPK!E\]] 9carriage/stream.pyPK! FZP-P-carriage/streamtable.pyPK!]{],],!2carriage-0.4.13.dist-info/LICENSEPK!H_zTT(carriage-0.4.13.dist-info/WHEELPK!Hɩ 8"_)carriage-0.4.13.dist-info/METADATAPK!H˵l *carriage-0.4.13.dist-info/RECORDPK-