Source code for mario.asynch

#!/usr/bin/env python
"""Command line pipes in python."""

from __future__ import generator_stop

import itertools
import typing

import trio


T = typing.TypeVar("T")
U = typing.TypeVar("U")

_PYPE_VALUE = "_PYPE_VALUE"

BUFSIZE = 2 ** 14
counter = itertools.count()
_RECEIVE_SIZE = 4096  # pretty arbitrary


[docs]class TerminatedFrameReceiver: """Parse frames out of a Trio stream, where each frame is terminated by a fixed byte sequence. For example, you can parse newline-terminated lines by setting the terminator to b"\n". This uses some tricks to protect against denial of service attacks: - It puts a limit on the maximum frame size, to avoid memory overflow; you might want to adjust the limit for your situation. - It uses some algorithmic trickiness to avoid "slow loris" attacks. All algorithms are amortized O(n) in the length of the input. """ def __init__( self, stream: trio.abc.ReceiveStream, terminator: bytes, max_frame_length: int = 16384, ) -> None: self.stream = stream self.terminator = terminator self.max_frame_length = max_frame_length self._buf = bytearray() self._next_find_idx = 0
[docs] async def receive(self) -> bytearray: while True: terminator_idx = self._buf.find(self.terminator, self._next_find_idx) if terminator_idx < 0: # no terminator found if len(self._buf) > self.max_frame_length: raise ValueError("frame too long") # next time, start the search where this one left off self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1) # add some more data, then loop around more_data = await self.stream.receive_some(_RECEIVE_SIZE) if more_data == b"": if self._buf: raise ValueError("incomplete frame") raise trio.EndOfChannel self._buf += more_data else: # terminator found in buf, so extract the frame frame = self._buf[:terminator_idx] # Update the buffer in place, to take advantage of bytearray's # optimized delete-from-beginning feature. del self._buf[: terminator_idx + len(self.terminator)] # next time, start the search from the beginning self._next_find_idx = 0 return frame
def __aiter__(self) -> "TerminatedFrameReceiver": return self async def __anext__(self) -> bytearray: try: return await self.receive() except trio.EndOfChannel: raise StopAsyncIteration