PK!Copype/__init__.pyfrom . import app PK!2͈ pype/__main__.pyfrom pype.app import cli cli() PK!pype/_version.py__version__ = "0.0.57" PK!z~|Q.Q. pype/app.py#!/usr/bin/env python """Command line pipes in python.""" from __future__ import generator_stop import collections import importlib import io import os import sys import textwrap import token import tokenize import itertools import re import contextlib import typing import types import functools from typing import Callable from typing import Awaitable from typing import AsyncIterable from typing import AsyncIterator from typing import Optional from typing import List import attr import parso import click import click_default_group import toolz import trio import async_generator import async_exit_stack try: import asks except ImportError: pass else: asks.init("trio") from . import _version T = typing.TypeVar("T") U = typing.TypeVar("U") _PYPE_VALUE = "_PYPE_VALUE" BUFSIZE = 2 ** 14 counter = itertools.count() _RECEIVE_SIZE = 4096 # pretty arbitrary DEFAULTS = {"max_concurrent": 5} class TerminatedFrameReceiver: """Parse frames out of a Trio stream, where each frame is terminated by a fixed byte sequence. For example, you can parse newline-terminated lines by setting the terminator to b"\n". This uses some tricks to protect against denial of service attacks: - It puts a limit on the maximum frame size, to avoid memory overflow; you might want to adjust the limit for your situation. - It uses some algorithmic trickiness to avoid "slow loris" attacks. All algorithms are amortized O(n) in the length of the input. """ def __init__( self, stream: trio.abc.ReceiveStream, terminator: bytes, max_frame_length: int = 16384, ) -> None: self.stream = stream self.terminator = terminator self.max_frame_length = max_frame_length self._buf = bytearray() self._next_find_idx = 0 async def receive(self) -> bytearray: while True: terminator_idx = self._buf.find(self.terminator, self._next_find_idx) if terminator_idx < 0: # no terminator found if len(self._buf) > self.max_frame_length: raise ValueError("frame too long") # next time, start the search where this one left off self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1) # add some more data, then loop around more_data = await self.stream.receive_some(_RECEIVE_SIZE) if more_data == b"": if self._buf: raise ValueError("incomplete frame") raise trio.EndOfChannel self._buf += more_data else: # terminator found in buf, so extract the frame frame = self._buf[:terminator_idx] # Update the buffer in place, to take advantage of bytearray's # optimized delete-from-beginning feature. del self._buf[: terminator_idx + len(self.terminator)] # next time, start the search from the beginning self._next_find_idx = 0 return frame def __aiter__(self) -> "TerminatedFrameReceiver": return self async def __anext__(self) -> bytearray: try: return await self.receive() except trio.EndOfChannel: raise StopAsyncIteration class AsyncIterableWrapper: def __init__(self, iterable): self.iterable = iter(iterable) def __aiter__(self): return self async def __anext__(self): try: return next(self.iterable) except StopIteration: raise StopAsyncIteration def _get_named_module(name): builtins = sys.modules["builtins"] if hasattr(builtins, name): return builtins try: return __import__(name, {}, {}) except ImportError as e: pass raise LookupError(f"Could not find {name}") def _get_autoimport_module(fullname): name_parts = fullname.split(".") try_names = [] for idx in range(len(name_parts)): try_names.insert(0, ".".join(name_parts[: idx + 1])) for name in try_names: try: module = _get_named_module(name) except LookupError: pass else: if module is sys.modules["builtins"]: return {} return {name: module} return {} def find_maybe_module_names(text): # TODO: Use a real parser. return re.findall(r"\b[^\d\W]\w*(?:\.[^\d\W]\w*)+\b", text) def split_pipestring(s, sep="!"): segments = [] tree = parso.parse(s) current_nodes = [] for c in tree.children: if isinstance(c, parso.python.tree.PythonErrorLeaf) and c.value == sep: segments.append(current_nodes) current_nodes = [] else: current_nodes.append(c) segments.append(current_nodes) return ["".join(node.get_code() for node in seg) for seg in segments] def test_split_pipestring(): s = 'x ! y + f"{x!r}"' sep = "!" assert split_pipestring(s, sep) == ["x", ' y + f"{x!r}"'] def build_source(components): components = [c.strip() for c in components] indent = " " lines = "".join([f"{indent}x = {c}\n" for c in components]) source = textwrap.dedent( f"""\ async def _pype_runner(x): {lines} return x """ ) return source def build_name_to_module(command): name_to_module = {} components = split_pipestring(command) module_names = {name for c in components for name in find_maybe_module_names(c)} for name in module_names: name_to_module.update(_get_autoimport_module(name)) return name_to_module def build_function(command): name_to_module = build_name_to_module(command) source = build_source(split_pipestring(command)) local_namespace = {} global_namespace = name_to_module.copy() exec(source, global_namespace, local_namespace) function = local_namespace["_pype_runner"] return function @async_generator.asynccontextmanager async def async_map( function: Callable[[T], Awaitable[U]], iterable: AsyncIterable[T], max_concurrent ) -> AsyncIterator[AsyncIterable[U]]: send_result, receive_result = trio.open_memory_channel[U](0) limiter = trio.CapacityLimiter(max_concurrent) async def wrapper(prev_done: trio.Event, self_done: trio.Event, item: T) -> None: maybe_coroutine_result = function(item) if isinstance(maybe_coroutine_result, types.CoroutineType): async with limiter: result = await maybe_coroutine_result else: result = maybe_coroutine_result await prev_done.wait() await send_result.send(result) self_done.set() async def consume_input(nursery) -> None: prev_done = trio.Event() prev_done.set() async for item in iterable: self_done = trio.Event() nursery.start_soon(wrapper, prev_done, self_done, item, name=function) prev_done = self_done await prev_done.wait() await send_result.aclose() async with trio.open_nursery() as nursery: nursery.start_soon(consume_input, nursery) yield receive_result nursery.cancel_scope.cancel() @async_generator.asynccontextmanager async def async_filter( function: Callable[[T], Awaitable[T]], iterable: AsyncIterable[T], max_concurrent ) -> AsyncIterator[AsyncIterable[T]]: send_result, receive_result = trio.open_memory_channel[T](0) limiter = trio.CapacityLimiter(max_concurrent) async def wrapper(prev_done: trio.Event, self_done: trio.Event, item: T) -> None: maybe_coroutine_result = function(item) if isinstance(maybe_coroutine_result, types.CoroutineType): async with limiter: result = await maybe_coroutine_result else: result = maybe_coroutine_result await prev_done.wait() if result: await send_result.send(item) self_done.set() async def consume_input(nursery) -> None: prev_done = trio.Event() prev_done.set() async for item in iterable: self_done = trio.Event() nursery.start_soon(wrapper, prev_done, self_done, item, name=function) prev_done = self_done await prev_done.wait() await send_result.aclose() async with trio.open_nursery() as nursery: nursery.start_soon(consume_input, nursery) yield receive_result nursery.cancel_scope.cancel() async def program_runner(pairs, items, max_concurrent): async with async_exit_stack.AsyncExitStack() as stack: for how, function in pairs: if how == "map": items = await stack.enter_async_context( async_map(function, items, max_concurrent) ) elif how == "filter": items = await stack.enter_async_context( async_filter(function, items, max_concurrent) ) elif how == "apply": items = AsyncIterableWrapper([await function([x async for x in items])]) elif how == "eval": items = AsyncIterableWrapper([await function(None)]) elif how == "stack": items = AsyncIterableWrapper( [await function("".join([x + "\n" async for x in items]))] ) else: raise NotImplementedError(how) return stack.pop_all(), items async def async_main(pairs, max_concurrent=DEFAULTS["max_concurrent"]): stream = trio._unix_pipes.PipeReceiveStream(os.dup(0)) receiver = TerminatedFrameReceiver(stream, b"\n") result = (item.decode() async for item in receiver) pairs = [(how, build_function(what)) for how, what in pairs] stack, items = await program_runner(pairs, result, max_concurrent) async with stack: async for item in items: print(item) def main(pairs, **kwargs): trio.run(functools.partial(async_main, pairs, **kwargs)) @click.group(chain=True) @click.option("--max-concurrent", type=int, default=DEFAULTS["max_concurrent"]) @click.version_option(_version.__version__, prog_name="pype") def cli(**kwargs): pass def make_subcommand(name): @click.command(name) @click.argument("command") def _subcommand(command): return (name, command) return _subcommand subcommand_names = ["map", "apply", "filter", "eval", "stack"] subcommands = [make_subcommand(name) for name in subcommand_names] for subcommand in subcommands: cli.add_command(subcommand) @cli.resultcallback() def collect(pairs, **kwargs): main(pairs, **kwargs) # time python3.7 -m poetry run python -m pype map 'await asks.get(x) ! x.body ' map 'len(x)' filter 'x < 195' apply 'len(x)' < 4), ("map", lambda x: x * 10), ], items=["a", "bb", "ccc", "dddd", "eeeee", "ffffff"], ) PK!H$%-python_pype-0.0.57.dist-info/entry_points.txtN+I/N.,()*,HVz9\\PK!HڽTU"python_pype-0.0.57.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HgC %python_pype-0.0.57.dist-info/METADATAWms6 < &IfɎ8]ץ.M{}YoE ۜ)#$۱}>D@Nhg89VI;9=>yϢKCl>]~ut%2PniEHx_Ɛs!ş`s#!t a }㳓|MхΉ{g<jĮzP'Gk-})n`*{g[]-rӹ+-bHX0¬n^]ZKpƣV?"93L}k[Ȅb B3€cBUQE:zF~/Zzpp~`{_0ZL, _7`ue"~wp,2!V*9g7VK˱F:- Ҁ3 ez +Ҕ?(Rd3{9 >?As VoQ:^1m=)zPH,0 UC1P&Ê6[Zr r7J5SV17كx%O]0nZ w}RT*F n - ;z۶n - g ul]C;EPJju;LXÍH$A*ϒ_{jV!_+}v΢Fqڻ{4Cv!=fqp_4X5r>ž< F frn⤾|- W(jgq0A8jbEʚvxS& cK z}L r}L ƆjEJ7m ,. =w7sU2ZL",qz9}f#BHɿhcAglΞl#® [ P**t^G_Y7,̹W G#)90YFL(o;iJS^xt3Pxy[n)~PPDϑPsFWp%b˞.`Ly'_@!)TC0L^sGOI9cD.f8;^x@[ݡ ǢV-^ۉC鱜UdqHQxD)8bőHjP QR~Y!q6ChBH1պIg|uic&!c˞pxOF.%"ŸR& Qnx G["a#L?A3X4cEPxцS# pp^.ypq嫙Ur$6ZM5ɺ5Y0%,gf8?D9 x[}.7[8 [06"fCGn,L'0PK!HZ#python_pype-0.0.57.dist-info/RECORDɒP@} Y0LE E@>INMέd!{7|Mf/_Z(Z!5Injy>zrHwSȓ 1u6Ќ@IsTI}Tbڛ;#[V 6y8;>\Ԓ=$@2N /<AfɯvMo0I7ZgCEa2e8N'2,|QZɕ.LaM.lc8;YeT 2FQZkv-)U.Fc7h<~DXg{@=U5=G1 iwy1yZVOgp"7ۦޔV=eWkvf-JáLm3ƥ3,GlIe2AfyY)wocD^XF PK!Copype/__init__.pyPK!2͈ @pype/__main__.pyPK!pype/_version.pyPK!z~|Q.Q. pype/app.pyPK!; M/pype/sync.pyPK!H$%-2python_pype-0.0.57.dist-info/entry_points.txtPK!HڽTU"f3python_pype-0.0.57.dist-info/WHEELPK!HgC %3python_pype-0.0.57.dist-info/METADATAPK!HZ#I:python_pype-0.0.57.dist-info/RECORDPK |=<