PK!Copype/__init__.pyfrom . import app PK!$pype/__main__.pyfrom . import cli cli.cli() PK!Zpype/_version.py__version__ = "0.0.63" PK!+@ pype/app.py#!/usr/bin/env python """Command line pipes in python.""" from __future__ import generator_stop import collections import importlib import io import os import sys import textwrap import token import tokenize import itertools import re import contextlib import typing import types import functools import pathlib from typing import Callable from typing import Awaitable from typing import AsyncIterable from typing import AsyncIterator from typing import Optional from typing import List import attr import parso import click import click_default_group import toolz import trio import async_generator import async_exit_stack try: import asks except ImportError: pass else: asks.init("trio") from . import _version from . import config from . import utils from . import asynch from . import interpret async def program_runner(pairs, items, max_concurrent): async with async_exit_stack.AsyncExitStack() as stack: for how, function in pairs: if how == "map": items = await stack.enter_async_context( asynch.async_map(function, items, max_concurrent) ) elif how == "filter": items = await stack.enter_async_context( asynch.async_filter(function, items, max_concurrent) ) elif how == "apply": items = asynch.AsyncIterableWrapper( [await function([x async for x in items])] ) elif how == "eval": items = asynch.AsyncIterableWrapper([await function(None)]) elif how == "stack": items = asynch.AsyncIterableWrapper( [await function("".join([x + "\n" async for x in items]))] ) else: raise NotImplementedError(how) return stack.pop_all(), items async def async_main( pairs, max_concurrent=config.DEFAULTS["max_concurrent"], exec_before=config.DEFAULTS["exec_before"], autocall=config.DEFAULTS["autocall"], ): stream = trio._unix_pipes.PipeReceiveStream(os.dup(0)) receiver = asynch.TerminatedFrameReceiver(stream, b"\n") result = (item.decode() async for item in receiver) global_namespace = interpret.build_global_namespace(exec_before) pairs = [ ( how, interpret.build_function( what, global_namespace, autocall=(False if how == "eval" else autocall) ), ) for how, what in pairs ] stack, items = await program_runner(pairs, result, max_concurrent) async with stack: async for item in items: print(item) def main(pairs, **kwargs): trio.run(functools.partial(async_main, pairs, **kwargs)) PK!~M pype/asynch.py#!/usr/bin/env python """Command line pipes in python.""" from __future__ import generator_stop import collections import importlib import io import os import sys import textwrap import token import tokenize import itertools import re import contextlib import typing import types import functools import pathlib from typing import Callable from typing import Awaitable from typing import AsyncIterable from typing import AsyncIterator from typing import Optional from typing import List import attr import parso import click import click_default_group import toolz import trio import async_generator import async_exit_stack try: import asks except ImportError: pass else: asks.init("trio") from . import _version from . import config from . import utils T = typing.TypeVar("T") U = typing.TypeVar("U") _PYPE_VALUE = "_PYPE_VALUE" BUFSIZE = 2 ** 14 counter = itertools.count() _RECEIVE_SIZE = 4096 # pretty arbitrary class TerminatedFrameReceiver: """Parse frames out of a Trio stream, where each frame is terminated by a fixed byte sequence. For example, you can parse newline-terminated lines by setting the terminator to b"\n". This uses some tricks to protect against denial of service attacks: - It puts a limit on the maximum frame size, to avoid memory overflow; you might want to adjust the limit for your situation. - It uses some algorithmic trickiness to avoid "slow loris" attacks. All algorithms are amortized O(n) in the length of the input. """ def __init__( self, stream: trio.abc.ReceiveStream, terminator: bytes, max_frame_length: int = 16384, ) -> None: self.stream = stream self.terminator = terminator self.max_frame_length = max_frame_length self._buf = bytearray() self._next_find_idx = 0 async def receive(self) -> bytearray: while True: terminator_idx = self._buf.find(self.terminator, self._next_find_idx) if terminator_idx < 0: # no terminator found if len(self._buf) > self.max_frame_length: raise ValueError("frame too long") # next time, start the search where this one left off self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1) # add some more data, then loop around more_data = await self.stream.receive_some(_RECEIVE_SIZE) if more_data == b"": if self._buf: raise ValueError("incomplete frame") raise trio.EndOfChannel self._buf += more_data else: # terminator found in buf, so extract the frame frame = self._buf[:terminator_idx] # Update the buffer in place, to take advantage of bytearray's # optimized delete-from-beginning feature. del self._buf[: terminator_idx + len(self.terminator)] # next time, start the search from the beginning self._next_find_idx = 0 return frame def __aiter__(self) -> "TerminatedFrameReceiver": return self async def __anext__(self) -> bytearray: try: return await self.receive() except trio.EndOfChannel: raise StopAsyncIteration class AsyncIterableWrapper: def __init__(self, iterable): self.iterable = iter(iterable) def __aiter__(self): return self async def __anext__(self): try: return next(self.iterable) except StopIteration: raise StopAsyncIteration @async_generator.asynccontextmanager async def async_map( function: Callable[[T], Awaitable[U]], iterable: AsyncIterable[T], max_concurrent ) -> AsyncIterator[AsyncIterable[U]]: send_result, receive_result = trio.open_memory_channel[U](0) limiter = trio.CapacityLimiter(max_concurrent) async def wrapper(prev_done: trio.Event, self_done: trio.Event, item: T) -> None: maybe_coroutine_result = function(item) if isinstance(maybe_coroutine_result, types.CoroutineType): async with limiter: result = await maybe_coroutine_result else: result = maybe_coroutine_result await prev_done.wait() await send_result.send(result) self_done.set() async def consume_input(nursery) -> None: prev_done = trio.Event() prev_done.set() async for item in iterable: self_done = trio.Event() nursery.start_soon(wrapper, prev_done, self_done, item, name=function) prev_done = self_done await prev_done.wait() await send_result.aclose() async with trio.open_nursery() as nursery: nursery.start_soon(consume_input, nursery) yield receive_result nursery.cancel_scope.cancel() @async_generator.asynccontextmanager async def async_filter( function: Callable[[T], Awaitable[T]], iterable: AsyncIterable[T], max_concurrent ) -> AsyncIterator[AsyncIterable[T]]: send_result, receive_result = trio.open_memory_channel[T](0) limiter = trio.CapacityLimiter(max_concurrent) async def wrapper(prev_done: trio.Event, self_done: trio.Event, item: T) -> None: maybe_coroutine_result = function(item) if isinstance(maybe_coroutine_result, types.CoroutineType): async with limiter: result = await maybe_coroutine_result else: result = maybe_coroutine_result await prev_done.wait() if result: await send_result.send(item) self_done.set() async def consume_input(nursery) -> None: prev_done = trio.Event() prev_done.set() async for item in iterable: self_done = trio.Event() nursery.start_soon(wrapper, prev_done, self_done, item, name=function) prev_done = self_done await prev_done.wait() await send_result.aclose() async with trio.open_nursery() as nursery: nursery.start_soon(consume_input, nursery) yield receive_result nursery.cancel_scope.cancel() PK!; 4), ("map", lambda x: x * 10), ], items=["a", "bb", "ccc", "dddd", "eeeee", "ffffff"], ) PK!w pype/utils.pyNAME = "pype" PK!Huxt#%-python_pype-0.0.63.dist-info/entry_points.txtN+I/N.,()*,HV9z@PK!HڽTU"python_pype-0.0.63.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!H^%python_pype-0.0.63.dist-info/METADATAXے}|Dz9슻bIk'-K*Yv!HB F$DߞӘ/f!.ݍӧYx>.:Qq8gFT,A\p4W塁IJZoڪَn%}P-߰ΣN0_}ti0"qUu~RCgNX$2gr,7~_锫b׃> LLYaUa[KjA vUV Gקqt5(K,R&3BXgB;(GiZَބRh1xyj_ lm1][AD`yMū U W%e\ 6S=۴w xv!}>%>/3#Kx (^R͸w CB$蝽e&Z D =oCz03dNq!`x5C"f5BOlaȴO.۸s0A֒6:8dM,D]KbtjM<SEɖBbR`Gcy WaA =E[!l|RAZT0$H wWXW]k|žrU0/p<ɼ KW*z.(T+%N5w \ -m2p9YR3ܝjf\CV" =}ӲO+u0'x떲,;6ĭCT #|kjѪշm-Y8~DY'QʲSI\cV)nn $q󲷊"O rʌ~#6C:ڪϦ9~ GӢT [Z/MCxkön3Z+5"]zt-\"f .0{wZ3ZQgqg1!xwTrټûMw'BTzZbjNѐo`L*P'\E8Tz$ьoQyts_~#,:ɀj G!{FTjL10,7Hnj zH@jЫLVR J=̒h nƒ+W4U+Ǹ gphx@ᙗ.mQvoE@KIǥg@WQ+u8yH͒ۗ7)v(|p9|Zܤ>촡rC2 勅 t_:qjeKtwuEp c=##g.Zkr VgZu1^wS7h\ƻ9 [t`fe AOnY _e/<*6,j}Z9fRRYdg0}ړaSvIp'ZzM;e-9pJLY X|;h9APK!Copype/__init__.pyPK!$@pype/__main__.pyPK!Zpype/_version.pyPK!+@ pype/app.pyPK!~M  pype/asynch.pyPK!;pype/utils.pyPK!Huxt#%-?python_pype-0.0.63.dist-info/entry_points.txtPK!HڽTU"}?python_pype-0.0.63.dist-info/WHEELPK!H^%@python_pype-0.0.63.dist-info/METADATAPK!Hߐ#LHpython_pype-0.0.63.dist-info/RECORDPKK