Source code for mario.asynch
#!/usr/bin/env python
"""Command line pipes in python."""
from __future__ import generator_stop
import itertools
import typing
import trio
T = typing.TypeVar("T")
U = typing.TypeVar("U")
_PYPE_VALUE = "_PYPE_VALUE"
BUFSIZE = 2 ** 14
counter = itertools.count()
_RECEIVE_SIZE = 4096 # pretty arbitrary
[docs]class TerminatedFrameReceiver:
"""Parse frames out of a Trio stream, where each frame is terminated by a
fixed byte sequence.
For example, you can parse newline-terminated lines by setting the
terminator to b"\n".
This uses some tricks to protect against denial of service attacks:
- It puts a limit on the maximum frame size, to avoid memory overflow; you
might want to adjust the limit for your situation.
- It uses some algorithmic trickiness to avoid "slow loris" attacks. All
algorithms are amortized O(n) in the length of the input.
"""
def __init__(
self,
stream: trio.abc.ReceiveStream,
terminator: bytes,
max_frame_length: int = 16384,
) -> None:
self.stream = stream
self.terminator = terminator
self.max_frame_length = max_frame_length
self._buf = bytearray()
self._next_find_idx = 0
[docs] async def receive(self) -> bytearray:
while True:
terminator_idx = self._buf.find(self.terminator, self._next_find_idx)
if terminator_idx < 0:
# no terminator found
if len(self._buf) > self.max_frame_length:
raise ValueError("frame too long")
# next time, start the search where this one left off
self._next_find_idx = max(0, len(self._buf) - len(self.terminator) + 1)
# add some more data, then loop around
more_data = await self.stream.receive_some(_RECEIVE_SIZE)
if more_data == b"":
if self._buf:
raise ValueError("incomplete frame")
raise trio.EndOfChannel
self._buf += more_data
else:
# terminator found in buf, so extract the frame
frame = self._buf[:terminator_idx]
# Update the buffer in place, to take advantage of bytearray's
# optimized delete-from-beginning feature.
del self._buf[: terminator_idx + len(self.terminator)]
# next time, start the search from the beginning
self._next_find_idx = 0
return frame
def __aiter__(self) -> "TerminatedFrameReceiver":
return self
async def __anext__(self) -> bytearray:
try:
return await self.receive()
except trio.EndOfChannel:
raise StopAsyncIteration