PK!S  pfun/__init__.py# flake8: noqa from . import maybe from . import reader from . import writer from . import state from . import result from . import io from .util import (identity, compose, pipeline, Unary, Predicate, always) from .dict import Dict from .list import List from .curry import curry from .immutable import Immutable __all__ = [ 'maybe', 'reader', 'writer', 'state', 'result', 'io', 'identity', 'compose', 'pipeline', 'always', 'Dict', 'List', 'curry', 'Immutable' ] PK!Ex pfun/cont.pyfrom typing import Generic, Callable, TypeVar, Iterable, cast, Generator from .immutable import Immutable from .monad import Monad, sequence_, map_m_, filter_m_ from .curry import curry from .trampoline import Trampoline, Done, Call from .util import identity from .with_effect import with_effect_ A = TypeVar('A') B = TypeVar('B') C = TypeVar('C') D = TypeVar('D') class Cont(Generic[A, B], Monad, Immutable): """ Type that represents a function in continuation passing style. """ f: Callable[[Callable[[A], B]], Trampoline[B]] def and_then(self, f: 'Callable[[B], Cont[C, D]]') -> 'Cont[C, D]': """ Chain together functions in continuation passing style :example: >>> value(1).and_then(lambda i: value(i + 1)).run(identity) 2 :param f: Function in continuation passing style to chain with the result of this function :return: """ return Cont( lambda c: Call( lambda: self.f( # type: ignore lambda b: f(b).f(c) # type: ignore ) ).and_then(identity) ) # yapf: disable def run(self, f: Callable[[A], B]) -> B: """ Run the wrapped function in continuation passing style by passing the result to ``f`` :example: >>> from pfun import identity >>> value(1).run(identity) 1 :param f: The function to pass the result of the wrapped function to :return: the result of passing the return value of the wrapped function to ``f`` """ return self.f(f).run() # type: ignore __call__ = run def map(self, f: Callable[[B], C]) -> 'Cont[B, C]': """ Map the ``f`` over this continuation :example: >>> from pfun import identity >>> value(1).map(lambda v: v + 1).run(identity) 2 :param f: The function to map over this continuation :return: Continuation mapped with ``f`` """ return Cont(lambda c: self.f(c).map(f)) # type: ignore @curry def map_m(f: Callable[[A], Cont[A, B]], iterable: Iterable[A]) -> Cont[Iterable[A], B]: """ Apply ``f`` to each element in ``iterable`` and collect the results :example: >>> from pfun import identity >>> map_m(value, range(3)).run(identity) (0, 1, 2) :param f: The function to map over ``iterable`` :param iterable: The iterable to map over :return: ``iterable`` mapped with ``f`` inside Cont """ return cast(Cont[Iterable[A], B], map_m_(value, f, iterable)) def sequence(iterable: Iterable[Cont[A, B]]) -> Cont[Iterable[A], B]: """ Gather an iterable of continuation results into one iterable :example: >>> from pfun import identity >>> sequence([value(v) for v in range(3)]).run(identity) (0, 1, 2) :param iterable: An iterable of continuation results :return: Continuation results """ return cast(Cont[Iterable[A], B], sequence_(value, iterable)) @curry def filter_m(f: Callable[[A], Cont[bool, B]], iterable: Iterable[A]) -> Cont[Iterable[A], B]: """ Filter elements by in ``iterable`` by ``f`` and combine results into an iterable as a continuation :example: >>> from pfun import identity >>> filter_m(lambda v: value(v % 2 == 0), range(3)).run(identity) (0, 2) :param f: Function to filter by :param iterable: Iterable to filter :return: Elements in ``iterable`` filtered by ``f`` as a continuation """ return cast(Cont[Iterable[A], B], filter_m_(value, f, iterable)) def value(a: A) -> Cont[A, B]: """ Wrap a constant value in a :class:`Cont` context :example: >>> from pfun import identity >>> value(1).run(identity) 1 :param a: Constant value to wrap :return: :class:`Cont` wrapping the value """ return Cont(lambda cont: Done(cont(a))) Conts = Generator[Cont[A, B], B, C] def with_effect(f: Callable[..., Conts[A, B, C]]) -> Callable[..., Cont[A, C]]: """ Decorator for functions that return a generator of maybes and a final result. Iterates over the yielded maybes and sends back the unwrapped values using "and_then" :example: >>> @with_effect ... def f() -> Conts[Any, int, int]: ... a = yield value(2) ... b = yield value(2) ... return a + b >>> from pfun import identity >>> f().run(identity) Just(4) :param f: generator function to decorate :return: `f` decorated such that generated :class:`Cont` \ will be chained together with `and_then` """ return with_effect_(value, f) # type: ignore __all__ = [ 'value', 'filter_m', 'sequence', 'map_m', 'Cont', 'with_effect', 'Conts' ] PK!gk pfun/curry.pyfrom typing import Callable import functools import inspect from .immutable import Immutable class Curry(Immutable): f: Callable def __repr__(self): return repr(self.f) def __call__(self, *args, **kwargs): signature = inspect.signature(self.f) bound = signature.bind_partial(*args, **kwargs) bound.apply_defaults() arg_names = {a for a in bound.arguments.keys()} parameters = {p for p in signature.parameters.keys()} if parameters - arg_names == set(): return self.f(*args, **kwargs) partial = functools.partial(self.f, *args, **kwargs) return Curry(partial) def curry(f: Callable) -> Callable: """ Get a version of ``f`` that can be partially applied :example: >>> f = lambda a, b: a + b >>> f_curried = curry(f) >>> f_curried(1) functools.partial( at 0x1051f0950>, a=1) >>> f_curried(1)(1) 2 :param f: The function to curry :return: Curried version of ``f`` """ @functools.wraps(f) def decorator(*args, **kwargs): return Curry(f)(*args, **kwargs) return decorator __all__ = ['curry'] PK!Zs   pfun/dict.pyfrom typing import ( TypeVar, Dict as Dict_, Union, Mapping, KeysView, ValuesView, ItemsView, Generic, Iterator ) from .maybe import Maybe, Nothing, Just from .immutable import Immutable K = TypeVar('K') V = TypeVar('V') class Dict(Immutable, Generic[K, V], init=False): """ Immutable dictionary class with functional helper methods """ _d: Dict_[K, V] def __init__(self, d: Union[Dict_[K, V], 'Dict[K, V]'] = dict()): if isinstance(d, Dict): d = d._d object.__setattr__(self, '_d', dict(d)) def __repr__(self): return f'Dict({repr(self._d)})' def __eq__(self, other): if isinstance(other, dict): return other == self._d if isinstance(other, Dict): return other._d == self._d return False def keys(self) -> KeysView[K]: """ Get the keys in this dictionary :example: >>> Dict({'key': 'value'}).keys() dict_keys(['key']) :return: Dictionary keys """ return self._d.keys() def values(self) -> ValuesView[V]: """ Get the values in this dictionary :example: >>> Dict({'key': 'value'}).values() dict_values(['value']) :return: Dictionary values """ return self._d.values() def copy(self) -> 'Dict[K, V]': """ Get a shallow copy of this dictionary. :example: >>> Dict({'key': 'value'}).copy() Dict({'key': 'value'}) :return: Copy of this dict """ return Dict(self._d.copy()) def items(self) -> ItemsView[K, V]: """ Get the keys and values of this dictionary :example: >>> Dict({'key': 'value'}).items() dict_items([('key', 'value')]) :return: Keys and values of this dictionary """ return self._d.items() def __contains__(self, key: K) -> bool: """ Test if ``key`` is a key in this dictionary :example: >>> 'key' in Dict({'key': 'value'}) True :param key: The key to test for membership :return: ``True`` if ``key`` is a key in this dictionary, ``False`` otherwise """ return key in self._d def __getitem__(self, key: K) -> Maybe[V]: """ get the value associated with a key :example: >>> Dict(key='value')['key'] Just('value') :param key: the key to retrieve :return: value associated with key """ return self.get(key) def __iter__(self) -> Iterator[K]: """ Get an iterator over the keys in this dictionary :example: >>> tuple(Dict({'key': 'value'})) ('key',) :return: Iterator of the keys in this dictionary """ return iter(self._d) def __len__(self) -> int: """ Get the number of key/value pairs in this dictionary :example: >>> len(Dict({'key': 'value'})) 1 :return: Number of key/value pairs in this dictionary """ return len(self._d) def set(self, key: K, value: V) -> 'Dict[K, V]': """ Combine keys and values from this dictionary with a new dictionary that includes key and value :example: >>> Dict().set('key', 'value') {'key': 'value'} :param key: key to add to the new dictionary :param value: value to associate with key :return: new dictionary with existing keys and values in addition to key and value """ copy = self._d.copy() copy[key] = value return Dict(copy) def without(self, key: K) -> 'Dict[K, V]': """ Get a copy of this dictionary without the mapping associated with ``key``. :example: >>> Dict({'key': 'value'}).without('key') Dict({}) :param key: The ``key`` to remove :return: Copy of this dictionary without ``key`` """ copy = self._d.copy() try: del copy[key] except KeyError: pass return Dict(copy) def get(self, key: K) -> Maybe[V]: """ get the value associated with a key :example: >>> Dict().get('key', 'default') Just('default') >>> Dict(key='value').get('key', 'default') Just('value') :param key: the key to retrieve :param default: value to return if the key is not found :return: :class:`Just` if key is found in dictionary or default is given, :class:`Nothing` otherwise """ v = self._d.get(key) if v is None: return Nothing() return Just(v) def update( self, other: Union[Mapping[K, V], 'Dict[K, V]'] ) -> 'Dict[K, V]': """ Get a copy of this dictionary updated with key/value pairs from ``other`` :example: >>> Dict({'key': 'value'}).update({'new_key': 'new_value'}) Dict({'key': 'value', 'new_key': 'new_value'}) :param other: :return: """ d: Dict_[K, V] = {} d.update(self._d) d.update(other) # type: ignore return Dict(d) __all__ = ['Dict'] PK!/("("pfun/either.pyfrom __future__ import annotations from typing import ( Generic, TypeVar, Callable, Any, Iterable, cast, Union, Generator ) from functools import wraps from abc import ABC, abstractmethod from .immutable import Immutable from .monad import Monad, sequence_, map_m_, filter_m_ from .curry import curry from .with_effect import with_effect_tail_rec A = TypeVar('A') B = TypeVar('B') C = TypeVar('C') class Either_(Immutable, Monad, ABC): """ Abstract class representing a computation with either ``A`` or ``B`` as its result. Should not be instantiated directly, use :class:`Left` or :class:`Right` instead """ @abstractmethod def and_then(self, f): """ Chain together functions of either computations, keeping track of whether or not any of them have failed :example: >>> f = lambda i: Right(1 / i) if i != 0 else Left('i was 0') >>> Right(1).and_then(f) Right(1.0) >>> Right(0).and_then(f) Left('i was 0') :param f: The function to call :return: :class:`Right` of type A if \ the computation was successful, :class:`Left` of type B otherwise. """ raise NotImplementedError() @abstractmethod def __bool__(self): """ Convert this result to a boolean value :example: >>> "Right" if Right(1) else "Left" "Right" >>> "Right" if Left("an error") else "Left" "Left" :return: True if this as an :class:`Right`, False if this is an :class:`Left` """ raise NotImplementedError() @abstractmethod def or_else(self, default): """ Try to get the result of this either computation, return default if this is a ``Left`` value :example: >>> Right(1).or_else(2) 1 >>> Left(1).or_else(2) 2 :param default: Value to return if this is a ``Left`` value :return: Result of computation if this is a ``Right`` value, \ default otherwise """ raise NotImplementedError() @abstractmethod def map(self, f): """ Map the result of this either computation :example: >>> f = lambda i: Right(1 / i) if i != 0 else Left('i was 0').map(str) >>> Right(1).and_then(f).map(str) Right('0.5') >>> Ok(0).and_then(f).map(str) Left('i was 0') :param f: Function to apply to the result :return: :class:`Right` wrapping result of type C \ if the computation was if this is a ``Right`` value, \ :class:`Left` of type B otherwise """ raise NotImplementedError() class Right(Either_, Generic[A]): """ Represents the ``Right`` case of ``Either`` """ get: A def or_else(self, default: A) -> A: return self.get def map(self, f: Callable[[A], C]) -> Either[B, C]: return Right(f(self.get)) def and_then(self, f: Callable[[A], Either[B, C]]) -> Either[B, C]: return f(self.get) def __eq__(self, other: Any) -> bool: """ Test if ``other`` is a :class:`Right` wrapping the same value as this instance :example: >>> Right('value') == Right('value') True >>> Right('another value') == Right('value') False :param other: object to compare with :return: True if other is a :class:`Right` instance and wraps the same \ value as this instance, False otherwise """ return isinstance(other, Right) and self.get == other.get def __bool__(self) -> bool: return True def __repr__(self): return f'Right({repr(self.get)})' class Left(Either_, Generic[B]): """ Represents the ``Left`` case of ``Either`` """ get: B def or_else(self, default: A) -> A: return default def map(self, f: Callable[[A], C]) -> Either[B, C]: return self def __eq__(self, other: object) -> bool: """ Test if ``other`` is an :class:`Left` wrapping the same value as this instance :example: >>> Left('error message') == Left('error message') True >>> Left('error message') == Left('another message') False :param other: object to compare with :return: True if other is an :class:`Left` instance and wraps the same value as this instance, False otherwise """ return isinstance(other, Left) and other.get == self.get def __bool__(self) -> bool: return False def and_then(self, f: Callable[[A], Either[B, C]]) -> Either[B, C]: return self def __repr__(self): return f'Left({repr(self.get)})' Either = Union[Left[B], Right[A]] def either(f: Callable[..., A]) -> Callable[..., Either[A, B]]: """ Turn ``f`` into a monadic function in the ``Either`` monad by wrapping in it a :class:`Right` :example: >>> either(lambda v: v)(1) Right(1) :param f: function to wrap :return: ``f`` wrapped with a ``Right`` """ @wraps(f) def decorator(*args, **kwargs): return Right(f(*args, **kwargs)) return decorator def sequence(iterable: Iterable[Either[A, B]]) -> Either[Iterable[A], B]: """ Evaluate each ``Either`` in `iterable` from left to right and collect the results :example: >>> sequence([Right(v) for v in range(3)]) Right((0, 1, 2)) :param iterable: The iterable to collect results from :returns: ``Either`` of collected results """ return cast(Either[Iterable[A], B], sequence_(Right, iterable)) @curry def map_m(f: Callable[[A], Either[B, C]], iterable: Iterable[A]) -> Either[Iterable[B], C]: """ Map each in element in ``iterable`` to an :class:`Either` by applying ``f``, combine the elements by ``and_then`` from left to right and collect the results :example: >>> map_m(Right, range(3)) Right((0, 1, 2)) :param f: Function to map over ``iterable`` :param iterable: Iterable to map ``f`` over :return: ``f`` mapped over ``iterable`` and combined from left to right. """ return cast(Either[Iterable[B], C], map_m_(Right, f, iterable)) @curry def filter_m(f: Callable[[A], Either[bool, B]], iterable: Iterable[A]) -> Either[Iterable[A], B]: """ Map each element in ``iterable`` by applying ``f``, filter the results by the value returned by ``f`` and combine from left to right. :example: >>> filter_m(lambda v: Right(v % 2 == 0), range(3)) Right((0, 2)) :param f: Function to map ``iterable`` by :param iterable: Iterable to map by ``f`` :return: `iterable` mapped and filtered by `f` """ return cast(Either[Iterable[A], B], filter_m_(Right, f, iterable)) def tail_rec(f: Callable[[A], Either[C, Either[A, B]]], a: A) -> Either[C, B]: """ Run a stack safe recursive monadic function `f` by calling `f` with :class:`Left` values until a :class:`Right` value is produced :example: >>> def f(i: str) -> Either[Either[int, str]]: ... if i == 0: ... return Right(Right('Done')) ... return Right(Left(i - 1)) >>> tail_rec(f, 5000) Right('Done') :param f: function to run "recursively" :param a: initial argument to `f` :return: result of `f` """ outer_either = f(a) if isinstance(outer_either, Left): return outer_either inner_either = outer_either.get while isinstance(inner_either, Left): outer_either = f(inner_either.get) if isinstance(outer_either, Left): return outer_either inner_either = outer_either.get return inner_either Eithers = Generator[Either[A, B], B, C] def with_effect(f: Callable[..., Eithers[A, B, C]] ) -> Callable[..., Either[A, C]]: """ Decorator for functions that return a generator of eithers and a final result. Iteraters over the yielded eithers and sends back the unwrapped values using "and_then" :example: >>> @with_effect ... def f() -> Eithers[int, int]: ... a = yield Right(2) ... b = yield Right(2) ... return a + b >>> f() Right(4) :param f: generator function to decorate :return: `f` decorated such that generated :class:`Either` \ will be chained together with `and_then` """ return with_effect_tail_rec(Right, f, tail_rec) # type: ignore __all__ = [ 'Either', 'Left', 'Right', 'either', 'map_m', 'sequence', 'filter_m', 'with_effect', 'Eithers' ] PK!K! pfun/free.pyfrom typing import TypeVar, Generic, Callable, Iterable, cast, Generator from pfun.immutable import Immutable from abc import ABC, abstractmethod from .functor import Functor from .monad import Monad, sequence_, map_m_, filter_m_ from .curry import curry from .state import State, get from .with_effect import with_effect_ A = TypeVar('A') B = TypeVar('B') C = TypeVar('C') D = TypeVar('D') class FreeInterpreter(Generic[C, D], ABC): """ An interpreter to map a ``Free`` structure into a `D` from a `C`. """ def interpret(self, root: 'FreeInterpreterElement[C, D]') -> State[C, D]: """ Run the interpreter on the root element recursively :param root: The root interpreter element :return: The result of interpreting ``root`` """ return root.accept(self) def interpret_more(self, more) -> State[C, D]: return more.k.accept(self) def interpret_done(self, done) -> State[C, D]: return get() # type: ignore class FreeInterpreterElement(Functor, Generic[C, D], ABC): """ An element in a ``Free`` structure that can be interepreted """ @abstractmethod def accept(self, interpreter: FreeInterpreter[C, D]) -> State[C, D]: """ Interpret this element :param interpreter: The interpreter to apply to this element :return: The result of using ``interpreter` to interpret this element """ pass F = TypeVar('F', bound=Functor) class Free( Generic[F, A, C, D], FreeInterpreterElement[C, D], Monad, Immutable ): """ The "Free" monad """ @abstractmethod def and_then( self, f: 'Callable[[A], Free[F, B, C, D]]' ) -> 'Free[F, B, C, D]': pass def map(self, f: Callable[[A], B]) -> 'Free[F, B, C, D]': return self.and_then(lambda v: Done(f(v))) class Done(Free[F, A, C, D]): """ Pure ``Free`` value """ a: A def and_then(self, f: Callable[[A], Free[F, B, C, D]]) -> Free[F, B, C, D]: """ Apply ``f`` to the value wrapped in this ``Done`` :param f: The function to apply to the value wrapped in this ``Done`` :return: The result of applying ``f`` to the value in this ``Done`` """ return f(self.a) def accept(self, interpreter: FreeInterpreter[C, D]) -> State[C, D]: """ Run an interpreter on this ``Done`` :param interpreter: The interpreter to run on on this ``Done`` instance :return: The result of interpreting this ``Done`` instance """ return interpreter.interpret_done(self) class More(Free[F, A, C, D]): """ A ``Free`` value wrapping a `Functor` value """ k: Functor def and_then(self, f: Callable[[A], Free[F, B, C, D]]) -> Free[F, B, C, D]: """ Apply ``f`` to the value wrapped in the functor of this ``More`` :param f: The function to apply to the functor value :return: The result of applying ``f`` to the functor of this ``More`` """ return More(self.k.map(lambda v: v.and_then(f))) def accept(self, interpreter: FreeInterpreter[C, D]) -> State[C, D]: """ Run an interpreter on this ``More`` :param interepreter: The intepreter to run on this ``More`` instance :return: The result of running ``interpreter`` on this ``More`` """ return interpreter.interpret_more(self) @curry def map_m(f: Callable[[A], Free[F, B, C, D]], iterable: Iterable[A]) -> Free[F, Iterable[B], C, D]: """ Map each in element in ``iterable`` to a :class:`Free` by applying ``f``, combine the elements by ``and_then`` from left to right and collect the results :param f: Function to map over ``iterable`` :param iterable: Iterable to map ``f`` over :return: ``f`` mapped over ``iterable`` and combined from left to right. """ return cast(Free[F, Iterable[B], C, D], map_m_(Done, f, iterable)) def sequence(iterable: Iterable[Free[F, A, C, D]] ) -> Free[F, Iterable[A], C, D]: """ Evaluate each ``Free`` in `iterable` from left to right and collect the results :param iterable: The iterable to collect results from :returns: ``Free`` of collected results """ return cast(Free[F, Iterable[A], C, D], sequence_(Done, iterable)) @curry def filter_m(f: Callable[[A], Free[F, bool, C, D]], iterable: Iterable[A]) -> Free[F, Iterable[A], C, D]: """ Map each element in ``iterable`` by applying ``f``, filter the results by the value returned by ``f`` and combine from left to right. :param f: Function to map ``iterable`` by :param iterable: Iterable to map by ``f`` :return: `iterable` mapped and filtered by `f` """ return cast(Free[F, Iterable[A], C, D], filter_m_(Done, f, iterable)) E = TypeVar('E') Frees = Generator[Free[F, A, C, D], A, E] def with_effect(f: Callable[..., Frees[F, A, C, D, E]] ) -> Callable[..., Free[F, E, C, D]]: """ Decorator for functions that return a generator of frees and a final result. Iteraters over the yielded frees and sends back the unwrapped values using "and_then" :example: >>> from typing import Any >>> @with_effect ... def f() -> Frees[int, int, Any, Any]: ... a = yield Done(2) ... b = yield Done(2) ... return a + b >>> f() Done(4) :param f: generator function to decorate :return: `f` decorated such that generated :class:`Free` \ will be chained together with `and_then` """ return with_effect_(Done, f) # type: ignore __all__ = [ 'FreeInterpreter', 'FreeInterpreterElement', 'Free', 'Done', 'More', 'map_m', 'sequence', 'filter_m' ] PK! 3pfun/functor.pyfrom abc import ABC, abstractmethod from typing import Callable, Any class Functor(ABC): """ Abstract base class for functors """ @abstractmethod def map(self, f: Callable[[Any], Any]) -> 'Functor': """ Map function ``f`` over the value wrapped by this functor :param f: The function to apply to the value wrapped by this Functor :return: The result of applying ``f`` to the wrapped value """ pass PK!_pfun/immutable.pyfrom dataclasses import dataclass from typing import TypeVar T = TypeVar('T') @dataclass(frozen=True) class Immutable: __immutable__ = True """ Super class that makes subclasses immutable using dataclasses >>> class A(Immutable): ... a: str >>> class B(A): ... b: str >>> b = B('a', 'b') >>> b.a = 'new value' AttributeError: <__main__.B object at 0x10f99a0f0> is immutable """ def __init_subclass__( cls, init=True, repr=True, eq=True, order=False, unsafe_hash=False ): super().__init_subclass__() return dataclass( frozen=True, init=init, repr=repr, eq=eq, order=order )(cls) def clone(self: T, **kwargs) -> T: """ Make a shallow copy of an instance, potentially overwriting fields given by ``kwargs`` :example: >>> class A(Immutable): ... a: str >>> a = A('a') >>> a2 = a.clone(a='new value') >>> a2.a "new value" :param kwargs: fields to overwrite :return: New instance of same type with copied and overwritten fields """ attrs = self.__dict__.copy() attrs.update(kwargs) return type(self)(**attrs) # type: ignore __all__ = ['Immutable'] PK![:: pfun/io.pyfrom __future__ import annotations from typing import Callable, TypeVar, Generic, Generator, Iterable, cast import sys from functools import wraps from typing_extensions import Literal from .immutable import Immutable from .trampoline import Trampoline, Done, Call from .monad import Monad, sequence_, map_m_, filter_m_ from .curry import curry from .with_effect import with_effect_ A = TypeVar('A') B = TypeVar('B') class IO(Monad, Immutable, Generic[A]): """ Represents world changing actions """ run_io: Callable[[], Trampoline[A]] def and_then(self, f: Callable[[A], IO[B]]) -> IO[B]: """ Chain together functions producting world changing actions. :param f: function to compose with this action :return: new :class:`IO` action that that composes \ this action with action produced by `f` :example: >>> read_str('file.txt').and_then( ... lambda content: value(content.upper()) ... ).run() "CONTENTS OF FILE.TXT" :param f: function to compose with this :class:`IO` action :return: new :class:`IO` action composed with f """ def run() -> Trampoline[B]: def thunk() -> Trampoline[B]: t = self.run_io() # type: ignore return t.and_then( lambda a: Call(lambda: f(a).run_io()) # type: ignore ) return Call(thunk) return IO(run) def map(self, f: Callable[[A], B]) -> IO[B]: """ Map `f` over the value wrapped by this action :example: >>> read_str('file.txt').map( ... lambda content: content.upper() ... ).run() "CONTENTS OF FILE.TXT" :param f: function to map over this :class:`IO` action :return: new :class:`IO` action with `f` applied to the \ value wrapped by this one. """ return IO(lambda: Call(lambda: self.run_io().map(f))) # type: ignore def run(self): return self.run_io().run() def value(a: A) -> IO[A]: """ Create an :class:`IO` action that simply produces `a` when run :param a: The value to wrap in `IO` """ return IO(lambda: Done(a)) def read_str(path: str) -> IO[str]: """ Read the contents of a file as a `str` :param path: the name of the file :return: :class:`IO` action with the contents of `filename` """ def run() -> Trampoline[str]: with open(path) as f: return Done(f.read()) return IO(run) # We pretend the next part doesn't exist... @curry def write_str(path: str, content: str, mode: Literal['w', 'a'] = 'w') -> IO[None]: """ Write a `str` to a file :param path: the file to write to :param content: the content to write to the file :return: :class:`IO` action that produces \ the content of `filename` when run """ def run() -> Trampoline[None]: with open(path, mode) as f: f.write(content) return Done(None) return IO(run) @curry def write_bytes(path: str, content: bytes, mode: Literal['w', 'a'] = 'w') -> IO[None]: """ Write `bytes` to a file :param filename: the file to write to :param content: the `bytes` to write to the file :return: :class:`IO` action that writes to the file when run """ def run() -> Trampoline[None]: with open(path, mode + 'b') as f: f.write(content) return Done(None) return IO(run) @curry def put_line(line: str = '', file=sys.stdout) -> IO[None]: """ Print a line to standard out :param string: The line to print :param file: The file to print to (`sys.stdout` by default) :return: :class:`IO` action that prints `string` to standard out """ def run() -> Trampoline[None]: print(line, file=file) return Done(None) return IO(run) def get_line(prompt: str = '') -> IO[str]: """ Create an :class:`IO` action that reads a line from standard input when run :param prompt: The message to display to the user :return: :class:`IO` action with the line read from standard in """ def run() -> Trampoline[str]: line = input(prompt) return Done(line) return IO(run) def read_bytes(path: str) -> IO[bytes]: """ Read the contents of a file as `bytes` :param path: the filename to read bytes from :return: :class:`IO` action that reads the `bytes` of the file when run """ def run() -> Trampoline[bytes]: with open(path, 'rb') as f: return Done(f.read()) return IO(run) def io(f: Callable[..., A]) -> Callable[..., IO[A]]: """ Decorator to turn any non-monadic function into a monadic one by wrapping its result in :class:`IO` :param f: The function to wrap :return: `f` wrapped by :class:`IO` """ @wraps(f) def decorator(*args, **kwargs): v = f(*args, **kwargs) return value(v) return decorator @curry def map_m(f: Callable[[A], IO[B]], iterable: Iterable[A]) -> IO[Iterable[B]]: """ Map each in element in ``iterable`` to an :class:`IO` by applying ``f``, combine the elements by ``and_then`` from left to right and collect the results :example: >>> map_m(IO, range(3)) IO(a=(0, 1, 2)) :param f: Function to map over ``iterable`` :param iterable: Iterable to map ``f`` over :return: ``f`` mapped over ``iterable`` and combined from left to right. """ return cast(IO[Iterable[B]], map_m_(value, f, iterable)) def sequence(iterable: Iterable[IO[A]]) -> IO[Iterable[A]]: """ Evaluate each :class:`IO` in `iterable` from left to right and collect the results :example: >>> sequence([IO(v) for v in range(3)]) Just(a=(0, 1, 2)) :param iterable: The iterable to collect results from :returns: ``Maybe`` of collected results """ return cast(IO[Iterable[A]], sequence_(value, iterable)) @curry def filter_m(f: Callable[[A], IO[bool]], iterable: Iterable[A]) -> IO[Iterable[A]]: """ Map each element in ``iterable`` by applying ``f``, filter the results by the value returned by ``f`` and combine from left to right. :example: >>> filter_m(lambda v: IO(v % 2 == 0), range(3)) IO(a=(0, 2)) :param f: Function to map ``iterable`` by :param iterable: Iterable to map by ``f`` :return: `iterable` mapped and filtered by `f` """ return cast(IO[Iterable[A]], filter_m_(value, f, iterable)) IOs = Generator[IO[A], A, B] def with_effect(f: Callable[..., IOs[A, B]]) -> Callable[..., IO[B]]: """ Decorator for functions generating IOs. Will chain together the generated IOs using `and_then` :example: >>> @with_effect ... def put_file(path: str) -> IOs[str, None]: ... content = yield read_str(path) yield put_line(content) >>> put_file('file.txt').run() Content of file.txt :param f: the function to decorate :return: new function that consumes `IO`s generated by `f`, \ chaining them together with `and_then` """ return with_effect_(value, f) # type: ignore __all__ = [ 'IO', 'get_line', 'put_line', 'read_str', 'write_bytes', 'write_str', 'read_bytes', 'io', 'map_m', 'sequence', 'filter_m', 'with_effect', 'IOs' ] PK!}TT pfun/list.pyfrom typing import ( TypeVar, Callable, Iterable, Tuple, Optional, Generic, cast, Generator ) from functools import reduce from .monoid import Monoid from .immutable import Immutable from .curry import curry from .monad import map_m_, sequence_, filter_m_, Monad from .with_effect import with_effect_eager A = TypeVar('A') B = TypeVar('B') class List(Monoid, Monad, Generic[A], Iterable[A], Immutable, init=False): _iterable: Tuple[A] def __init__(self, iterable: Iterable[A] = ()): object.__setattr__(self, '_iterable', tuple(iterable)) def __repr__(self): return f"List({repr(self._iterable)})" def empty(self) -> 'List[A]': return List() def reduce( self, f: Callable[[B, A], B], initializer: Optional[B] = None ) -> B: """ Aggregate elements by ``f`` :example: >>> List(range(3)).reduce(sum) 3 :param f: Function to perform aggregation :param initializer: Starting value for aggregation :return: Aggregated result """ return reduce(f, self._iterable, initializer) # type: ignore def append(self, a: Iterable[A]) -> 'List[A]': """ Add element to end of list :example: >>> List(range(3)).append(3) [1, 2, 3] :param a: Element to append :return: New :class:`List` with ``a`` appended """ return List(self._iterable + tuple(a)) def extend(self, iterable: Iterable[A]) -> 'List[A]': """ Add all elements from ``iterable`` to end of list :example: >>> List(range(3)).extend(range(3)) [0, 1, 2, 0, 1, 2] :param iterable: Iterable to extend by :return: New :class:`List` with extended by ``iterable`` """ return self + list(iterable) def __add__(self, other: Iterable[A]) -> 'List[A]': """ Concatenate with other ``list`` or :class:`List` :example: >>> List(range(2)) + List(range(2)) [0, 1, 0, 1] :param other: list to concatenate with :return: new :class:`List` concatenated with ``other`` """ return List(self._iterable + tuple(other)) def __radd__(self, other: Iterable[A]) -> 'List[A]': """ Concatenate with other ``list`` or :class:`List` :example: >>> List(range(2)) + List(range(2)) [0, 1, 0, 1] :param other: list to concatenate with :return: new :class:`List` concatenated with ``other`` """ return List(tuple(other) + self._iterable) def map(self, f: Callable[[A], B]) -> 'List[B]': """ Apply ``f`` to each element in the list :example: >>> List(range(2)).map(str) ['0', '1'] :param f: Function to apply :return: new :class:`List` mapped by ``f`` """ return List(map(f, self)) def filter(self, f: Callable[[A], bool]) -> 'List[A]': """ Filter elements by the predicate ``f`` :example: >>> List(range(4)).filter(lambda e: e % 2 == 0) [0, 2] :param f: Function to filter by :return: new :class:`List` filtered by ``f`` """ return List(filter(f, self._iterable)) def and_then(self, f: 'Callable[[A], List[B]]') -> 'List[B]': """ Chain together functions that produce more than one result :example: >>> List(range(4)).and_then(lambda v: List(range(v))) [0, 0, 1, 0, 1, 2] :param f: Function to apply to elements of this :class:`List` :return: Concatenated results from applying ``f`` to all elements """ return self.reduce(lambda l, v: l + f(v), List()) def zip(self, other: Iterable[B]) -> Iterable[Tuple[A, B]]: """ Zip together with another iterable :example: >>> List(List(range(2)).zip(range(2))) [(0, 0), (1, 1)] :param other: Iterable to zip with :return: Zip with ``other`` """ return zip(self._iterable, other) def reverse(self) -> 'List[A]': return List(reversed(self._iterable)) def __len__(self): return len(self._iterable) def __iter__(self): return iter(self._iterable) def value(a: A) -> List[A]: return List([a]) @curry def map_m(f: Callable[[A], List[B]], iterable: Iterable[A]) -> List[Iterable[B]]: """ Map each in element in ``iterable`` to an :class:`List` by applying ``f``, combine the elements by ``and_then`` from left to right and collect the results :example: >>> map_m(Just, range(3)) Just((0, 1, 2)) :param f: Function to map over ``iterable`` :param iterable: Iterable to map ``f`` over :return: ``f`` mapped over ``iterable`` and combined from left to right. """ return cast(List[Iterable[B]], map_m_(value, f, iterable)) def sequence(iterable: Iterable[List[A]]) -> List[Iterable[A]]: """ Evaluate each :class:`List` in `iterable` from left to right and collect the results :example: >>> sequence([Just(v) for v in range(3)]) Just((0, 1, 2)) :param iterable: The iterable to collect results from :returns: ``List`` of collected results """ return cast(List[Iterable[A]], sequence_(value, iterable)) @curry def filter_m(f: Callable[[A], List[bool]], iterable: Iterable[A]) -> List[Iterable[A]]: """ Map each element in ``iterable`` by applying ``f``, filter the results by the value returned by ``f`` and combine from left to right. :example: >>> filter_m(lambda v: Just(v % 2 == 0), range(3)) Just((0, 2)) :param f: Function to map ``iterable`` by :param iterable: Iterable to map by ``f`` :return: `iterable` mapped and filtered by `f` """ return cast(List[Iterable[A]], filter_m_(value, f, iterable)) Lists = Generator[List[A], A, B] def with_effect(f: Callable[..., Lists[A, B]]) -> Callable[..., List[B]]: """ Decorator for functions that return a generator of lists and a final result. Iteraters over the yielded lists and sends back the unwrapped values using "and_then" :example: >>> @with_effect ... def f() -> Lists[int, int]: ... a = yield List([2]) ... b = yield List([2]) ... return a + b >>> f() List((4,)) :param f: generator function to decorate :return: `f` decorated such that generated :class:`List` \ will be chained together with `and_then` """ return with_effect_eager(value, f) # type: ignore __all__ = [ 'List', 'value', 'map_m', 'sequence', 'filter_m', 'Lists', 'with_effect' ] PK!it[z"" pfun/maybe.pyfrom typing import ( Generic, TypeVar, Callable, Any, Sequence, Iterable, cast, Generator, Union, Optional ) from functools import wraps from abc import ABC, abstractmethod from .immutable import Immutable from .list import List from .curry import curry from .monad import Monad, map_m_, sequence_, filter_m_ from .with_effect import with_effect_tail_rec from .either import Either, Left A = TypeVar('A') B = TypeVar('B') class Maybe_(Immutable, Monad, ABC): """ Abstract super class for classes that represent computations that can fail. Should not be instantiated directly. Use :class:`Just` and :class:`Nothing` instead. """ @abstractmethod def and_then(self, f): """ Chain together functional calls, carrying along the state of the computation that may fail. :example: >>> f = lambda i: Just(1 / i) if i != 0 else Nothing() >>> Just(2).and_then(f) Just(0.5) >>> Just(0).and_then(f) Nothing() :param f: the function to call :return: :class:`Just` wrapping a value of type A if \ the computation was successful, :class:`Nothing` otherwise. """ raise NotImplementedError() @abstractmethod def map(self, f): """ Map the result of a possibly failed computation :example: >>> f = lambda i: Just(1 / i) if i != 0 else Nothing() >>> Just(2).and_then(f).map(str) Just('0.5') >>> Just(0).and_then(f).map(str) Nothing() :param f: Function to apply to the result :return: :class:`Just` wrapping result of type B if the computation was """ raise NotImplementedError() @abstractmethod def or_else(self, default): """ Try to get the result of the possibly failed computation if it was successful. :example: >>> Just(1).or_else(2) 1 >>> Nothing().or_else(2) 2 :param default: Value to return if computation has failed :return: Default value """ raise NotImplementedError() @abstractmethod def __bool__(self): """ Convert possibly failed computation to a bool :example: >>> "Just" if Just(1) else "Nothing" "Just" >>> "Just" if Nothing() else "Nothing" "Nothing" :return: True if this is a :class:`Just` value, False if this is a :class:`Nothing` """ raise NotImplementedError() def _invoke_optional_arg( f: Union[Callable[[A], B], Callable[[], B]], arg: Optional[A] ) -> B: try: return f(arg) # type: ignore except TypeError as e: if arg is None: try: return f() # type: ignore except TypeError: raise e raise class Just(Maybe_, Generic[A]): """ Subclass of :class:`Maybe` that represents a successful computation """ get: A def and_then(self, f: Callable[[A], 'Maybe[B]']) -> 'Maybe[B]': return _invoke_optional_arg(f, self.get) def map(self, f: Callable[[A], B]) -> 'Maybe[B]': return Just(f(self.get)) def or_else(self, default: A) -> A: return self.get def __eq__(self, other: Any) -> bool: """ Test if other is a ``Just`` :param other: Value to compare with :return: True if other is a ``Just`` and its wrapped value equals the \ wrapped value of this instance """ if not isinstance(other, Just): return False return other.get == self.get def __repr__(self): return f'Just({repr(self.get)})' def __bool__(self): return True class Nothing(Maybe_): """ Subclass of :class:`Maybe` that represents a failed computation """ def and_then(self, f: Callable[[A], 'Maybe[B]']) -> 'Maybe[B]': return self def __eq__(self, other: Any) -> bool: """ Test if other is a ``Nothing`` :param other: Value to compare with :return: True if other is a ``Nothing``, False otherwise """ return isinstance(other, Nothing) def __repr__(self): return 'Nothing()' def or_else(self, default: A) -> A: return default def map(self, f: Callable[[Any], B]) -> 'Maybe[B]': return self def __bool__(self): return False Maybe = Union[Nothing, Just[A]] def maybe(f: Callable[..., B]) -> Callable[..., Maybe[B]]: """ Wrap a function that may raise an exception with a :class:`Maybe`. Can also be used as a decorator. Useful for turning any function into a monadic function :example: >>> to_int = maybe(int) >>> to_int("1") Just(1) >>> to_int("Whoops") Nothing() :param f: Function to wrap :return: f wrapped with a :class:`Maybe` """ @wraps(f) def dec(*args, **kwargs): try: return Just(f(*args, **kwargs)) except: # noqa return Nothing() return dec def flatten(maybes: Sequence[Maybe[A]]) -> List[A]: """ Extract value from each :class:`Maybe`, ignoring elements that are :class:`Nothing` :param maybes: Seqence of :class:`Maybe` :return: :class:`List` of unwrapped values """ justs = [m for m in maybes if isinstance(m, Just)] return List(j.get for j in justs) @curry def map_m(f: Callable[[A], Maybe[B]], iterable: Iterable[A]) -> Maybe[Iterable[B]]: """ Map each in element in ``iterable`` to an :class:`Maybe` by applying ``f``, combine the elements by ``and_then`` from left to right and collect the results :example: >>> map_m(Just, range(3)) Just((0, 1, 2)) :param f: Function to map over ``iterable`` :param iterable: Iterable to map ``f`` over :return: ``f`` mapped over ``iterable`` and combined from left to right. """ return cast(Maybe[Iterable[B]], map_m_(Just, f, iterable)) def sequence(iterable: Iterable[Maybe[A]]) -> Maybe[Iterable[A]]: """ Evaluate each :class:`Maybe` in `iterable` from left to right and collect the results :example: >>> sequence([Just(v) for v in range(3)]) Just((0, 1, 2)) :param iterable: The iterable to collect results from :return: ``Maybe`` of collected results """ return cast(Maybe[Iterable[A]], sequence_(Just, iterable)) @curry def filter_m(f: Callable[[A], Maybe[bool]], iterable: Iterable[A]) -> Maybe[Iterable[A]]: """ Map each element in ``iterable`` by applying ``f``, filter the results by the value returned by ``f`` and combine from left to right. :example: >>> filter_m(lambda v: Just(v % 2 == 0), range(3)) Just((0, 2)) :param f: Function to map ``iterable`` by :param iterable: Iterable to map by ``f`` :return: `iterable` mapped and filtered by `f` """ return cast(Maybe[Iterable[A]], filter_m_(Just, f, iterable)) S = TypeVar('S') R = TypeVar('R') Maybes = Generator[Maybe[S], S, R] def with_effect(f: Callable[..., Maybes[Any, R]]) -> Callable[..., Maybe[R]]: """ Decorator for functions that return a generator of maybes and a final result. Iteraters over the yielded maybes and sends back the unwrapped values using "and_then" :example: >>> @with_effect ... def f() -> Maybes[int, int]: ... a = yield Just(2) ... b = yield Just(2) ... return a + b >>> f() Just(4) :param f: generator function to decorate :return: `f` decorated such that generated :class:`Maybe` \ will be chained together with `and_then` """ return with_effect_tail_rec(Just, f, tail_rec) def tail_rec(f: Callable[[A], Maybe[Either[A, B]]], a: A) -> Maybe[B]: """ Run a stack safe recursive monadic function `f` by calling `f` with :class:`Left` values until a :class:`Right` value is produced :example: >>> from pfun.either import Left, Right, Either >>> def f(i: str) -> Maybe[Either[int, str]]: ... if i == 0: ... return Just(Right('Done')) ... return Just(Left(i - 1)) >>> tail_rec(f, 5000) Just('Done') :param f: function to run "recursively" :param a: initial argument to `f` :return: result of `f` """ maybe = f(a) if isinstance(maybe, Nothing): return maybe either = maybe.get while isinstance(either, Left): maybe = f(either.get) if isinstance(maybe, Nothing): return maybe either = maybe.get return Just(either.get) __all__ = [ 'Maybe', 'Just', 'Nothing', 'maybe', 'flatten', 'map_m', 'sequence', 'filter_m', 'with_effect', 'Maybes' ] PK!P pfun/monad.pyfrom abc import ABC, abstractmethod from typing import Callable, Any, Iterable from functools import reduce from .functor import Functor from .curry import curry class Monad(Functor, ABC): """ Base class for all monadic types """ @abstractmethod def and_then(self, f: Callable[[Any], Any]) -> 'Monad': pass # Since PEP484 does not support higher-order type variables, # the functions below cannot be typed here :( # type versions for each monad is provided # in their respective namespaces instead (e.g maybe.sequence) @curry def sequence_( value: Callable[[Any], Monad], iterable: Iterable[Monad] ) -> Monad: def combine(ms: Monad, m: Monad) -> Monad: return ms.and_then( lambda xs: m.and_then( lambda x: value(xs + (x, )) ) ) # yapf: disable return reduce(combine, iterable, value(())) @curry def map_m_( value: Callable[[Any], Monad], f: Callable[[Any], Monad], iterable: Iterable ) -> Monad: m_iterable = (f(x) for x in iterable) return sequence_(value, m_iterable) @curry def filter_m_( value: Callable[[Any], Monad], f: Callable[[Any], Monad], iterable: Iterable ) -> Monad: def combine(ms, mbx): mb, x = mbx return ms.and_then( lambda xs: mb.and_then(lambda b: value(xs + (x, ) if b else xs)) ) mbs = (f(x) for x in iterable) mbxs = zip(mbs, iterable) return reduce(combine, mbxs, value(())) PK!^g0  pfun/monoid.pyfrom functools import singledispatch from typing import Union, List, Tuple, TypeVar from abc import ABC, abstractmethod class Monoid(ABC): """ Abstract class for implementing custom Monoids that can be used with the :class:`Writer` monad """ @abstractmethod def append(self, other): """ Append function for the Monoid type :param other: Other Monoid type to append to this one :return: Result of appending other to this Monoid """ raise NotImplementedError() @abstractmethod def empty(self) -> 'Monoid': """ empty value for the Monoid type :return: empty value """ raise NotImplementedError() M_ = Union[int, List, Tuple, str, None, Monoid] M = TypeVar('M', bound=M_) @singledispatch def append(a: M, b: M) -> M: raise NotImplementedError() @append.register def append_monoid(a: Monoid, b: Monoid) -> Monoid: return a.append(b) @append.register def append_int(a: int, b: int) -> int: return a + b @append.register def append_list(a: list, b: list) -> list: return a + b @append.register def append_str(a: str, b: str) -> str: return a + b @append.register def append_none(a: None, b: None) -> None: return None @append.register def append_tuple(a: tuple, b: tuple) -> tuple: return a + b @singledispatch def empty(t): raise NotImplementedError() @empty.register def empty_int(t: int) -> int: return 0 @empty.register def empty_list(t: list) -> list: return [] @empty.register def empty_tuple(t: tuple) -> tuple: return () @empty.register def empty_monoid(t: Monoid) -> Monoid: return t.empty() @empty.register def empty_str(t: str) -> str: return '' @empty.register def empty_none(t: None) -> None: return None PK!5pfun/mypy_plugin.py# type: ignore import typing as t from mypy.plugin import Plugin, FunctionContext, ClassDefContext from mypy.plugins.dataclasses import DataclassTransformer from mypy.types import ( Type, CallableType, Instance, TypeVarType, Overloaded, TypeVarId, TypeVarDef, AnyType ) from mypy.nodes import ClassDef, ARG_POS from mypy import checkmember, infer from mypy.checker import TypeChecker _CURRY = 'pfun.curry.curry' _COMPOSE = 'pfun.util.compose' _IMMUTABLE = 'pfun.immutable.Immutable' _MAYBE = 'pfun.maybe.maybe' _MAYBE_WITH_EFFECT = 'pfun.maybe.with_effect' _LIST_WITH_EFFECT = 'pfun.liste.with_effect' _EITHER_WITH_EFFECT = 'pfun.either.with_effect' _READER_WITH_EFFECT = 'pfun.reader.with_effect' _WRITER_WITH_EFFECT = 'pfun.writer.with_effect' _STATE_WITH_EFFECT = 'pfun.state.with_effect' _IO_WITH_EFFECT = 'pfun.io.with_effect' _TRAMPOLINE_WITH_EFFECT = 'pfun.trampoline.with_effect' _FREE_WITH_EFFECT = 'pfun.free.with_effect' _RESULT = 'pfun.result.result' _IO = 'pfun.io.io' _READER = 'pfun.reader.reader' _EITHER = 'pfun.either.either' _READER_AND_THEN = 'pfun.reader.Reader.and_then' def _get_callable_type(type_: Type, context: FunctionContext) -> t.Optional[CallableType]: if isinstance(type_, CallableType): return type_ # called with an object elif isinstance(type_, Instance) and type_.has_readable_member('__call__'): chk: TypeChecker = t.cast(TypeChecker, context.api) return t.cast( CallableType, checkmember.analyze_member_access( '__call__', type_, context.context, False, False, False, context.api.msg, original_type=type_, chk=chk ) ) return None def _curry_hook(context: FunctionContext) -> Type: arg_type = context.arg_types[0][0] function = _get_callable_type(arg_type, context) if function is None: return context.default_return_type if not function.arg_types: return function return_type = function.ret_type last_function = CallableType( arg_types=[function.arg_types[-1]], arg_kinds=[function.arg_kinds[-1]], arg_names=[function.arg_names[-1]], ret_type=return_type, fallback=function.fallback ) args = list( zip( function.arg_types[:-1], function.arg_kinds[:-1], function.arg_names[:-1] ) ) for arg_type, kind, name in reversed(args): last_function = CallableType( arg_types=[arg_type], arg_kinds=[kind], arg_names=[name], ret_type=last_function, fallback=function.fallback, variables=function.variables, implicit=True ) return Overloaded([last_function, function]) def _variadic_decorator_hook(context: FunctionContext) -> Type: arg_type = context.arg_types[0][0] function = _get_callable_type(arg_type, context) if function is None: return context.default_return_type ret_type = context.default_return_type.ret_type variables = list( set(function.variables + context.default_return_type.variables) ) return CallableType( arg_types=function.arg_types, arg_kinds=function.arg_kinds, arg_names=function.arg_names, ret_type=ret_type, fallback=function.fallback, variables=variables, implicit=True ) def _type_var_def( name: str, module: str, upper_bound, values=(), meta_level=0 ) -> TypeVarDef: id_ = TypeVarId.new(meta_level) id_.raw_id = -id_.raw_id fullname = f'{module}.{name}' return TypeVarDef(name, fullname, id_, list(values), upper_bound) def _get_compose_type(context: FunctionContext) -> t.Optional[CallableType]: # TODO, why are the arguments lists of lists, # and do I need to worry about it? n_args = len([at for ats in context.arg_types for at in ats]) arg_types = [] arg_kinds = [] arg_names = [] ret_type_def = _type_var_def( 'R1', 'pfun.compose', context.api.named_type('builtins.object') ) ret_type = TypeVarType(ret_type_def) variables = [ret_type_def] for n in range(n_args): current_arg_type_def = _type_var_def( f'R{n + 2}', 'pfun.compose', context.api.named_type('builtins.object') ) current_arg_type = TypeVarType(current_arg_type_def) arg_type = CallableType( arg_types=[current_arg_type], ret_type=ret_type, arg_kinds=[ARG_POS], arg_names=[None], variables=[current_arg_type_def, ret_type_def], fallback=context.api.named_type('builtins.function') ) arg_types.append(arg_type) arg_kinds.append(ARG_POS) arg_names.append(None) variables.append(current_arg_type_def) ret_type_def = current_arg_type_def ret_type = current_arg_type first_arg_type, *_, last_arg_type = arg_types ret_type = CallableType( arg_types=last_arg_type.arg_types, arg_names=last_arg_type.arg_names, arg_kinds=last_arg_type.arg_kinds, ret_type=first_arg_type.ret_type, variables=[first_arg_type.variables[-1], last_arg_type.variables[0]], fallback=context.api.named_type('builtins.function') ) return CallableType( arg_types=arg_types, arg_kinds=arg_kinds, arg_names=arg_names, ret_type=ret_type, variables=variables, fallback=context.api.named_type('builtins.function'), name='compose' ) def _compose_hook(context: FunctionContext) -> Type: compose = _get_compose_type(context) inferred = infer.infer_function_type_arguments( compose, [arg for args in context.arg_types for arg in args], [kind for kinds in context.arg_kinds for kind in kinds], [ [i, i] for i in range(len([arg for args in context.arg_types for arg in args])) ] ) ret_type = context.api.expr_checker.apply_inferred_arguments( compose, inferred, context.context ).ret_type ret_type.variables = [] return ret_type def _immutable_hook(context: ClassDefContext): cls: ClassDef = context.cls if not cls.info.has_base(_IMMUTABLE): return transformer = DataclassTransformer(context) transformer.transform() attributes = transformer.collect_attributes() transformer._freeze(attributes) def _do_hook(context: FunctionContext) -> Type: return AnyType(6) class PFun(Plugin): def get_function_hook(self, fullname: str ) -> t.Optional[t.Callable[[FunctionContext], Type]]: if fullname == _CURRY: return _curry_hook if fullname == _COMPOSE: return _compose_hook if fullname in ( _MAYBE, _RESULT, _EITHER, _IO, _READER, _MAYBE_WITH_EFFECT, _EITHER_WITH_EFFECT, _LIST_WITH_EFFECT, _READER_WITH_EFFECT, _WRITER_WITH_EFFECT, _STATE_WITH_EFFECT, _IO_WITH_EFFECT, _TRAMPOLINE_WITH_EFFECT, _FREE_WITH_EFFECT ): return _variadic_decorator_hook return None def get_base_class_hook(self, fullname: str): return _immutable_hook def plugin(_): return PFun PK!ÿ&pfun/reader.pyfrom functools import wraps from typing import Generic, TypeVar, Callable, Iterable, cast, Generator, Any from .immutable import Immutable from .curry import curry from .trampoline import Trampoline, Done, Call from .monad import Monad, map_m_, sequence_, filter_m_ from .with_effect import with_effect_ Context = TypeVar('Context') Result_ = TypeVar('Result_') Next = TypeVar('Next') A = TypeVar('A') B = TypeVar('B') class Reader(Immutable, Generic[Context, Result_], Monad): """ Represents a computation that is not yet completed, but will complete once given an object of type ``Context`` """ f: Callable[[Context], Trampoline[Result_]] def and_then( self: 'Reader[Context, Result_]', f: 'Callable[[Result_], Reader[Context, Next]]' ) -> 'Reader[Context, Next]': """ Compose ``f`` with the function wrapped by this :class:`Reader` instance :example: >>> ask().and_then( ... lambda context: value(f'context: {context}') ... ).run([]) 'context: []' :param f: Function to compose with this this :class:`Reader` :return: Composed :class:`Reader` """ return Reader( lambda a: Call( lambda: self.f(a).and_then( # type: ignore lambda v: Call(lambda: f(v).f(a)) # type: ignore ) ) ) def map(self, f: Callable[[Result_], B]) -> 'Reader[Context, B]': """ Apply ``f`` to the result of this :class:`Reader` :example: >>> value(1).map(str).run(...) '1' :param f: Function to apply :return: :class:`Reader` that returns the result of applying ``f`` to its result """ return Reader( lambda a: self.f(a).and_then(lambda r: Done(f(r))) # type: ignore ) def run(self, c: Context) -> Result_: """ Apply this :class:`Reader` to the context ``c`` :example: >>> value(1).run(...) 1 :param c: The context to passed to the function wrapped by this :class:`Reader` :return: The result of this :class:`Reader` """ return self.f(c).run() # type: ignore __call__ = run def value(v: Result_) -> Reader[Context, Result_]: """ Make a ``Reader`` that will produce ``v`` no matter the context :example: >>> value(1).run(None) 1 :param v: the value to put in a :class:`Reader` instance :return: :class:`Reader` that returns ``v`` when given any context """ return Reader(lambda _: Done(v)) def ask() -> Reader[Context, Context]: """ Make a :class:`Reader` that just returns the context. :return: :class:`Reader` that will return the context when run """ return Reader(lambda c: Done(c)) def reader(f: Callable[..., B]) -> Callable[..., Reader[Context, B]]: """ Wrap any function in a :class:`Reader` context. Useful for making non-monadic functions monadic. Can also be used as a decorator :example: >>> to_int = reader(int) >>> to_int('1').and_then(lambda i: i + 1).run(...) 2 :param f: Function to wrap :return: Wrapped function """ @wraps(f) def dec(*args, **kwargs): result = f(*args, **kwargs) return value(result) return dec @curry def map_m(f: Callable[[A], Reader[Context, B]], iterable: Iterable[A]) -> Reader[Context, Iterable[B]]: """ Map each in element in ``iterable`` to a :class:`Reader` by applying ``f``, combine the elements by ``and_then`` from left to right and collect the results :example: >>> map_m(value, range(3)).run(...) (0, 1, 2) :param f: Function to map over ``iterable`` :param iterable: Iterable to map ``f`` over :return: ``f`` mapped over ``iterable`` and combined from left to right. """ return cast(Reader[Context, Iterable[B]], map_m_(value, f, iterable)) def sequence(iterable: Iterable[Reader[Context, B]] ) -> Reader[Context, Iterable[B]]: """ Evaluate each :class:`Reader` in `iterable` from left to right and collect the results :example: >>> sequence([value(v) for v in range(3)]).run(...) (0, 1, 2) :param iterable: The iterable to collect results from :returns: :class:`Reader` of collected results """ return cast(Reader[Context, Iterable[B]], sequence_(value, iterable)) @curry def filter_m(f: Callable[[A], Reader[Context, bool]], iterable: Iterable[A]) -> Reader[Context, Iterable[A]]: """ Map each element in ``iterable`` by applying ``f``, filter the results by the value returned by ``f`` and combine from left to right. :example: >>> filter_m(lambda v: value(v % 2 == 0), range(3)).run(...) (0, 2) :param f: Function to map ``iterable`` by :param iterable: Iterable to map by ``f`` :return: `iterable` mapped and filtered by `f` """ return cast(Reader[Context, Iterable[A]], filter_m_(value, f, iterable)) Readers = Generator[Reader[Context, Result_], Result_, B] def with_effect(f: Callable[..., Readers[Context, Any, B]] ) -> Callable[..., Reader[Context, B]]: """ Decorator for functions that return a generator of readers and a final result. Iteraters over the yielded readers and sends back the unwrapped values using "and_then" :example: >>> @with_effect ... def f() -> Readers[Any, int, int]: ... a = yield value(2) ... b = yield value(2) ... return a + b >>> f().run() 4 :param f: generator function to decorate :return: `f` decorated such that generated :class:`Maybe` \ will be chained together with `and_then` """ return with_effect_(value, f) __all__ = [ 'Reader', 'reader', 'value', 'map_m', 'sequence', 'filter_m', 'ask', 'with_effect', 'Readers' ] PK!##pfun/result.pyfrom typing import TypeVar, Callable, Union from functools import wraps from .either import Left, Right, with_effect, Eithers A = TypeVar('A') B = TypeVar('B') class Ok(Right[A]): pass class Error(Left[Exception]): get: Exception Result = Union[Error, Ok[A]] Results = Eithers[Exception, A, B] with_effect = with_effect def result(f: Callable[..., B]) -> Callable[..., Result[B]]: """ Wrap a function that may raise an exception with a :class:`Result`. Can also be used as a decorator. Useful for turning any function into a monadic function :example: >>> to_int = result(int) >>> to_int("1") Ok(1) >>> to_int("Whoops") Error(ValueError("invalid literal for int() with base 10: 'Whoops'")) :param f: Function to wrap :return: f wrapped with a :class:`Result` """ @wraps(f) def dec(*args, **kwargs): try: return Ok(f(*args, **kwargs)) except Exception as e: return Error(e) return dec __all__ = ['Result', 'Ok', 'Error', 'result'] PK! pfun/state.pyfrom typing import Generic, TypeVar, Callable, Tuple, Iterable, cast, Generator from .immutable import Immutable from .monad import sequence_, map_m_, filter_m_, Monad from .curry import curry from .trampoline import Trampoline, Done, Call from .with_effect import with_effect_ A = TypeVar('A') B = TypeVar('B') C = TypeVar('C') class State(Generic[B, A], Immutable, Monad): """ Represents a computation that is not yet complete, but will complete when given a state of type A """ f: Callable[[A], Trampoline[Tuple[B, A]]] def and_then(self, f: 'Callable[[B], State[C, A]]') -> 'State[C, A]': """ Chain together stateful computations :example: >>> get().and_then( ... lambda s: put(s + ['final state']) ... ).run(['initial state']) (), ['initial state', 'final state'] :param f: Function to pass the result of this :class:`State` instance \ once it can be computed :return: new :class:`State` which wraps the result of \ passing the result of this :class:`State` instance to ``f`` """ return State( lambda a: Call( lambda: self.f(a). # type: ignore and_then( lambda tu: Call(lambda: f(tu[0]).f(tu[1])) # type: ignore ) ) ) def run(self, a: A) -> Tuple[B, A]: """ Get the result of this :class:`State` by passing the state ``a`` to the wrapped function :example: >>> append_to_state = lambda state: value(state + ['final state']) >>> get().and_then(append_to_state).run(['initial state']) ['initial state', 'final state'], ['initial state', 'final state'] :param a: State to run this :class:`State` instance on :return: Result of running :class:`State` instance with ``a`` as state """ return self.f(a).run() # type: ignore def map(self, f: Callable[[B], C]) -> 'State[C, A]': return State( lambda a: self.f(a). # type: ignore and_then(lambda tu: Done((f(tu[0]), tu[1]))) ) def put(a: A) -> State[None, A]: """ Update the current state :example: >>> put('state').run('') (), 'state' :param a: The new state :return: :class:`State` with ``a`` as the new state """ return State(lambda state: Done((None, a))) def get() -> State[A, A]: """ Get the current state :example: >>> get().run('state') 'state', 'state' :return: :class:`State` with the current state as its result """ return State(lambda b: Done((b, b))) def value(b: B) -> State[B, A]: """ Put a value in the :class:`State` context :example: >>> value(1).run('state') 1, 'state' :param b: the value to put in a :class:`State` context :return: :class:`State` that will produce ``b`` no matter the state """ return State(lambda a: Done((b, a))) @curry def map_m(f: Callable[[A], State[A, B]], iterable: Iterable[A]) -> State[Iterable[A], B]: """ Map each in element in ``iterable`` to an :class:`Maybe` by applying ``f``, combine the elements by ``and_then`` from left to right and collect the results :example: >>> map_m(value, range(3)).run(None) ((0, 1, 2), None) :param f: Function to map over ``iterable`` :param iterable: Iterable to map ``f`` over :return: ``f`` mapped over ``iterable`` and combined from left to right. """ return cast(State[Iterable[A], B], map_m_(value, f, iterable)) def sequence(iterable: Iterable[State[A, B]]) -> State[Iterable[A], B]: """ Evaluate each :class:`State` in `iterable` from left to right and collect the results :example: >>> sequence([value(v) for v in range(3)]).run(None) ((0, 1, 2), None) :param iterable: The iterable to collect results from :returns: ``Maybe`` of collected results """ return cast(State[Iterable[A], B], sequence_(value, iterable)) @curry def filter_m(f: Callable[[A], State[bool, B]], iterable: Iterable[A]) -> State[Iterable[A], B]: """ Map each element in ``iterable`` by applying ``f``, filter the results by the value returned by ``f`` and combine from left to right. :example: >>> filter_m(lambda v: value(v % 2 == 0), range(3)).Run(None) ((0, 2), None) :param f: Function to map ``iterable`` by :param iterable: Iterable to map by ``f`` :return: `iterable` mapped and filtered by `f` """ return cast(State[Iterable[A], B], filter_m_(value, f, iterable)) States = Generator[State[A, B], A, C] def with_effect(f: Callable[..., States[A, B, C]] ) -> Callable[..., State[C, B]]: """ Decorator for functions that return a generator of states and a final result. Iteraters over the yielded states and sends back the unwrapped values using "and_then" :example: >>> @with_effect ... def f() -> States[int, Any, int]: ... a = yield value(2) ... b = yield value(2) ... return a + b >>> f().run(None) (4, None) :param f: generator function to decorate :return: `f` decorated such that generated :class:`State` \ will be chained together with `and_then` """ return with_effect_(value, f) # type: ignore __all__ = [ 'State', 'put', 'get', 'value', 'map_m', 'sequence', 'filter_m', 'with_effect', 'States' ] PK! }pfun/trampoline.pyfrom typing import Generic, TypeVar, Callable, cast, Iterable, Generator from abc import ABC, abstractmethod from .immutable import Immutable from .monad import Monad, sequence_, map_m_, filter_m_ from .curry import curry from .with_effect import with_effect_ A = TypeVar('A') B = TypeVar('B') C = TypeVar('C') class Trampoline(Immutable, Generic[A], Monad, ABC): """ Base class for Trampolines. Useful for writing stack safe-safe recursive functions. """ @abstractmethod def _resume(self) -> 'Trampoline[A]': pass @abstractmethod def _handle_cont( self, cont: Callable[[A], 'Trampoline[B]'] ) -> 'Trampoline[B]': pass @property def _is_done(self) -> bool: return isinstance(self, Done) def and_then(self, f: Callable[[A], 'Trampoline[B]']) -> 'Trampoline[B]': """ Apply ``f`` to the value wrapped by this trampoline. :param f: function to apply the value in this trampoline :return: Result of applying ``f`` to the value wrapped by \ this trampoline """ return AndThen(self, f) def map(self, f: Callable[[A], B]) -> 'Trampoline[B]': """ Map ``f`` over the value wrapped by this trampoline. :param f: function to wrap over this trampoline :return: new trampoline wrapping the result of ``f`` """ return self.and_then(lambda a: Done(f(a))) def run(self) -> A: """ Interpret a structure of trampolines to produce a result :return: result of intepreting this structure of \ trampolines """ trampoline = self while not trampoline._is_done: trampoline = trampoline._resume() return cast(Done[A], trampoline).a class Done(Trampoline[A]): """ Represents the result of a recursive computation. """ a: A def _resume(self) -> Trampoline[A]: return self def _handle_cont(self, cont: Callable[[A], Trampoline[B]]) -> Trampoline[B]: return cont(self.a) class Call(Trampoline[A]): """ Represents a recursive call. """ thunk: Callable[[], Trampoline[A]] def _handle_cont(self, cont: Callable[[A], Trampoline[B]]) -> Trampoline[B]: return self.thunk().and_then(cont) # type: ignore def _resume(self) -> Trampoline[A]: return self.thunk() # type: ignore class AndThen(Generic[A, B], Trampoline[B]): """ Represents monadic bind for trampolines as a class to avoid deep recursive calls to ``Trampoline.run`` during interpretation. """ sub: Trampoline[A] cont: Callable[[A], Trampoline[B]] def _handle_cont(self, cont: Callable[[B], Trampoline[C]]) -> Trampoline[C]: return self.sub.and_then(self.cont).and_then(cont) # type: ignore def _resume(self) -> Trampoline[B]: return self.sub._handle_cont(self.cont) # type: ignore def and_then( # type: ignore self, f: Callable[[A], Trampoline[B]] ) -> Trampoline[B]: return AndThen( self.sub, lambda x: Call(lambda: self.cont(x).and_then(f)) # type: ignore ) @curry def map_m(f: Callable[[A], Trampoline[B]], iterable: Iterable[A]) -> Trampoline[Iterable[B]]: """ Map each in element in ``iterable`` to an :class:`Trampoline` by applying ``f``, combine the elements by ``and_then`` from left to right and collect the results :example: >>> map_m(Just, range(3)) Just((0, 1, 2)) :param f: Function to map over ``iterable`` :param iterable: Iterable to map ``f`` over :return: ``f`` mapped over ``iterable`` and combined from left to right. """ return cast(Trampoline[Iterable[B]], map_m_(Done, f, iterable)) def sequence(iterable: Iterable[Trampoline[A]]) -> Trampoline[Iterable[A]]: """ Evaluate each :class:`Trampoline` in `iterable` from left to right and collect the results :example: >>> sequence([Just(v) for v in range(3)]) Just((0, 1, 2)) :param iterable: The iterable to collect results from :returns: ``Trampoline`` of collected results """ return cast(Trampoline[Iterable[A]], sequence_(Done, iterable)) @curry def filter_m(f: Callable[[A], Trampoline[bool]], iterable: Iterable[A]) -> Trampoline[Iterable[A]]: """ Map each element in ``iterable`` by applying ``f``, filter the results by the value returned by ``f`` and combine from left to right. :example: >>> filter_m(lambda v: Just(v % 2 == 0), range(3)) Just((0, 2)) :param f: Function to map ``iterable`` by :param iterable: Iterable to map by ``f`` :return: `iterable` mapped and filtered by `f` """ return cast(Trampoline[Iterable[A]], filter_m_(Done, f, iterable)) Trampolines = Generator[Trampoline[A], A, B] def with_effect(f: Callable[..., Trampolines[A, B]] ) -> Callable[..., Trampoline[B]]: """ Decorator for functions that return a generator of trampolines and a final result. Iteraters over the yielded trampolines and sends back the unwrapped values using "and_then" :example: >>> @with_effect ... def f() -> Trampolines[int, int]: ... a = yield Done(2) ... b = yield Done(2) ... return a + b >>> f() Done(4) :param f: generator function to decorate :return: `f` decorated such that generated :class:`Trampoline` \ will be chained together with `and_then` """ return with_effect_(Done, f) # type: ignore __all__ = [ 'Trampoline', 'Done', 'Call', 'AndThen', 'map_m', 'sequence', 'filter_m', 'Trampolines', 'with_effect' ] PK!|0 0 pfun/util.pyfrom typing import TypeVar, Callable, Generic, Tuple, Any from .immutable import Immutable A = TypeVar('A') B = TypeVar('B') C = TypeVar('C') def identity(v: A) -> A: """ The identity function. Just gives back its argument :example: >>> identity('value') 'value' :param v: The value to get back :return: v """ return v Unary = Callable[[A], B] Predicate = Callable[[A], bool] class always(Generic[A], Immutable): """ A Callable that always returns the same value regardless of the arguments :example: >>> f = always(1) >>> f(None) 1 >>> f('') 1 >>> "... and so on..." """ value: A def __call__(self, *args, **kwargs) -> A: return self.value class Composition(Immutable): functions: Tuple[Callable, ...] def __call__(self, *args, **kwargs): fs = reversed(self.functions) first, *rest = fs last_result = first(*args, **kwargs) for f in rest: last_result = f(last_result) return last_result def compose( f: Callable[[Any], Any], g: Callable[[Any], Any], *functions: Callable[[Any], Any] ) -> Callable[[Any], Any]: """ Compose functions from left to right :example: >>> f = lambda v: v * 2 >>> g = compose(str, f) >>> g(3) "6" :param f: the outermost function in the composition :param g: the function to be composed with f :param functions: functions to be composed with `f` \ and `g` from left to right :return: `f` composed with `g` composed with `functions` from left to right """ return Composition((f, g) + functions) def pipeline( first: Callable[[Any], Any], second: Callable[[Any], Any], *rest: Callable[[Any], Any] ): """ Compose functions from right to left :example: >>> f = lambda v: v * 2 >>> g = pipeline(f, str) >>> g(3) "6" :param first: the innermost function in the composition :param g: the function to compose with f :param functions: functions to compose with `first` and \ `second` from right to left :return: `rest` composed from right to left, composed with \ `second` composed with `first` """ return compose(*reversed(rest), second, first) __all__ = ['always', 'compose', 'pipeline', 'identity'] PK!_-rpfun/with_effect.pyfrom functools import wraps from pfun.monad import Monad from pfun.curry import curry from typing import Generator, TypeVar, Callable, Any M = TypeVar('M', bound=Monad) @curry def with_effect_( value: Callable[[Any], M], f: Callable[..., Generator[M, Any, Any]] ) -> Callable[..., M]: @wraps(f) def decorator(*args, **kwargs): g = f(*args, **kwargs) def cont(v) -> M: try: return g.send(v).and_then(cont) except StopIteration as e: return value(e.value) try: m = next(g) return m.and_then(cont) except StopIteration as e: return value(e.value) return decorator @curry def with_effect_eager( value: Callable[[Any], M], f: Callable[..., Generator[M, Any, Any]] ) -> Callable[..., M]: """ DANGER! This approach only works for monads that bind functions eagerly such as Maybe, Either, Writer and List """ @wraps(f) def decorator(*args, **kwargs): g = f(*args, **kwargs) m = next(g) try: while True: m = m.and_then(lambda v: g.send(v)) except StopIteration as e: return value(e.value) return decorator @curry def with_effect_tail_rec( value: Callable[[Any], M], f: Callable[..., Generator[M, Any, Any]], tail_rec: Callable[[Callable[[Any], Any], Any], Any] = None ) -> Callable[..., M]: from .either import Left, Right @wraps(f) def decorator(*args, **kwargs): g = f(*args, **kwargs) def cont(v) -> M: try: return g.send(v).map(Left) except StopIteration as e: return value(Right(e.value)) try: m = next(g) return m.and_then(lambda v: tail_rec(cont, v)) except StopIteration as e: return value(e.value) return decorator PK!wUUpfun/writer.pyfrom typing import Generic, Callable, TypeVar, Iterable, cast, Generator from pfun.monoid import M, append, empty from .immutable import Immutable from .curry import curry from .monad import map_m_, Monad, sequence_, filter_m_ from .either import Either, Left from .with_effect import with_effect_tail_rec A = TypeVar('A') B = TypeVar('B') class Writer(Generic[A, M], Immutable, Monad): """ Represents a value along with a monoid value that is accumulated as an effect """ a: A m: M def and_then(self, f: 'Callable[[A], Writer[B, M]]') -> 'Writer[B, M]': """ Pass the value in this value/monoid pair to ``f``, and then combine the resulting monoid with the monoid in this pair :example: >>> Writer(1, ['first element']).and_then( ... lambda i: Writer(i + 1, ['second element']) ... ) Writer(2, ['first element', 'second element']) :param f: Function to pass the value to :return: :class:`Writer` with result of passing the value in this :class:`Writer` to ``f``, and appending the monoid in this instance with the result of ``f`` """ # this is kind of a hack: # I'm using ... as a special symbol that represents a monoid value # the type of which is yet to be determined. w = f(self.a) if w.m is ... and self.m is ...: m = ... elif w.m is ...: m = append(self.m, empty(self.m)) # type: ignore elif self.m is ...: m = append(empty(w.m), w.m) # type: ignore else: m = append(self.m, w.m) # type: ignore return Writer(w.a, m) # type: ignore def __eq__(self, other): if not isinstance(other, Writer): return False return other.a == self.a and other.m == self.m def map(self, f: 'Callable[[A], B]') -> 'Writer[B, M]': """ Map the value/monoid pair in this :class:`Writer` :example: >>> Writer('value', []).map(lambda v, m: ('new value', ['new monoid'])) Writer('new value', ['new monoid']) :param f: the function to map the value and monoid in this :class:`Writer` :return: :class:`Writer` with value and monoid mapped by ``f`` """ return Writer(f(self.a), self.m) def __repr__(self): a_repr = repr(self.a) m_repr = repr(self.m) if self.m is not ... else "..." return f'Writer({a_repr}, {m_repr})' def __iter__(self): return iter((self.a, self.m)) def value(a: A, m: M = ...) -> Writer[A, M]: # type: ignore """ Put a value in a :class:`Writer` context :example: >>> value(1) Writer(1, ...) >>> value(1, ['some monoid']) Writer(1, ['some monoid']) :param a: The value to put in the :class:`Writer` context :param m: Optional monoid to associate with ``a`` :return: :class:`Writer` with ``a`` and optionally ``m`` """ return Writer(a, m) def tell(m: M) -> Writer[None, M]: """ Create a Writer with a monoid ``m`` and unit value :example: >>> tell( ... ['monoid value'] ... ).and_then( ... lambda _: tell(['another monoid value']) ... ) Writer(None, ['monoid value', 'another monoid value']) :param m: the monoid value :return: Writer with unit value and monoid value ``m`` """ return Writer(None, m) def sequence(iterable: Iterable[Writer[A, M]]) -> Writer[Iterable[A], M]: """ Evaluate each :class:`Writer` in `iterable` from left to right and collect the results :example: >>> sequence([value(v) for v in range(3)]) Writer((0, 1, 2), ...) :param iterable: The iterable to collect results from :returns: ``Writer`` of collected results """ return cast(Writer[Iterable[A], M], sequence_(value, iterable)) @curry def filter_m(f: Callable[[A], Writer[bool, M]], iterable: Iterable[A]) -> Writer[Iterable[A], M]: """ Map each element in ``iterable`` by applying ``f``, filter the results by the value returned by ``f`` and combine from left to right. :example: >>> filter_m(lambda v: value(v % 2 == 0), range(3)) Writer((0, 2), ...) :param f: Function to map ``iterable`` by :param iterable: Iterable to map by ``f`` :return: `iterable` mapped and filtered by `f` """ return cast(Writer[Iterable[A], M], filter_m_(value, f, iterable)) @curry def map_m(f: Callable[[A], Writer[B, M]], iterable: Iterable[A]) -> Writer[Iterable[B], M]: """ Map each in element in ``iterable`` to an :class:`Writer` by applying ``f``, combine the elements by ``and_then`` from left to right and collect the results :example: >>> map_m(value, range(3)) Writer((0, 1, 2), ...) :param f: Function to map over ``iterable`` :param iterable: Iterable to map ``f`` over :return: ``f`` mapped over ``iterable`` and combined from left to right. """ return cast(Writer[Iterable[B], M], map_m_(value, f, iterable)) def tail_rec(f: Callable[[A], Writer[Either[A, B], M]], a: A) -> Writer[B, M]: """ Run a stack safe recursive monadic function `f` by calling `f` with :class:`Left` values until a :class:`Right` value is produced :example: >>> from pfun.either import Left, Right, Either >>> def f(i: str) -> Writer[Either[int, str]]: ... if i == 0: ... return value(Right('Done')) ... return value(Left(i - 1)) >>> tail_rec(f, 5000) Writer('Done', ...) :param f: function to run "recursively" :param a: initial argument to `f` :return: result of `f` """ writer = f(a) either = writer.a while isinstance(either, Left): writer = writer.and_then(lambda _: f(either.get)) # type: ignore either = writer.a return writer.and_then(lambda _: value(either.get)) # type: ignore Writers = Generator[Writer[A, M], A, B] def with_effect(f: Callable[..., Writers[A, M, B]] ) -> Callable[..., Writer[B, M]]: """ Decorator for functions that return a generator of writers and a final result. Iteraters over the yielded writers and sends back the unwrapped values using "and_then" :example: >>> @with_effect ... def f() -> Writers[int, List[str], int]: ... a = yield value(2, ['Got two']) ... b = yield value(2, ['Got two again']) ... return a + b >>> f() Writer(4, ['Got two', 'Got two again']) :param f: generator function to decorate :return: `f` decorated such that generated :class:`Writer` \ will be chained together with `and_then` """ return with_effect_tail_rec(value, f, tail_rec) # type: ignore __all__ = [ 'Writer', 'value', 'tell', 'sequence', 'map_m', 'filter_m', 'Writers', 'with_effect', 'tail_rec' ] PK!HڽTUpfun-0.5.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H)2pfun-0.5.1.dist-info/METADATARO0*DŒ1Q6 MMDbM;'@ôo=gT^%+1Q-->Wm[R|4-$VU :9\|s,@F$ڸ\N994ZM.I+jT(\1g҃o]q("EvTN-J+]V!sw_|z>F+wbHn ph=WS/+{dl]iZ-ym%A<˜Z:+zzZ6#v̢܂<17CuLJǟ0.@_50wR5`U>dGBD^MZOrbԁb:M:k02?,Uk5h#kp DMzމwWiGP -IWM#̢Gκ4XkB;Z@vmw 7ַ4FqkkՆy]BDGQ_. ^WzƓlJYnX$[^&e&r(#\+ǞWLGwU0PK!Hd @pfun-0.5.1.dist-info/RECORDmɒJyX.A@QDaC0OD?+Mu<>I~|#n,6Wȧ?ϋI6q¦F+80e|CTXU:ęC:R Y{#C+%Agedfs*.$+uh5j)QK2Ezh{U"QwN59KgRRX s݊a9#t`5XУC&B02NJ-$볪`k,#8۫_yEOM:iB83zwķNI kW`㾘 |uˢx&G.p˯y(^ K_/]s&HN2jvgӍ9.\>\H٩rmT2Wĸ[1"1 /PK!S  pfun/__init__.pyPK!Ex 7pfun/cont.pyPK!gk 4pfun/curry.pyPK!Zs   pfun/dict.pyPK!/("("1/pfun/either.pyPK!K! Qpfun/free.pyPK! 3ohpfun/functor.pyPK!_qjpfun/immutable.pyPK![:: opfun/io.pyPK!}TT pfun/list.pyPK!it[z"" pfun/maybe.pyPK!P Npfun/monad.pyPK!^g0  Mpfun/monoid.pyPK!5pfun/mypy_plugin.pyPK!ÿ&}pfun/reader.pyPK!##/ pfun/result.pyPK! ~pfun/state.pyPK! }T'pfun/trampoline.pyPK!|0 0 .>pfun/util.pyPK!_-rGpfun/with_effect.pyPK!wUU@Opfun/writer.pyPK!HڽTUjpfun-0.5.1.dist-info/WHEELPK!H)2Mkpfun-0.5.1.dist-info/METADATAPK!Hd @mpfun-0.5.1.dist-info/RECORDPK3r