Source code for mario.app
#!/usr/bin/env python
"""Command line pipes in python."""
from __future__ import generator_stop
import collections
import functools
import os
from typing import AsyncIterable
from typing import List
import async_exit_stack
import attr
import trio
from . import asynch
from . import config
from . import interfaces
from . import interpret
from . import plug
[docs]async def call_traversal(
context, # pylint: disable=unused-argument
traversal: interfaces.Traversal,
items: AsyncIterable,
exit_stack: async_exit_stack.AsyncExitStack,
):
runtime_parameters = {"items": items, "exit_stack": exit_stack}
calculated_params = traversal.plugin_object.calculate_more_params(traversal)
available_params = collections.ChainMap(
calculated_params,
runtime_parameters,
traversal.specific_invocation_params,
traversal.global_invocation_options.global_options,
)
args = {
param: available_params[param]
for param in traversal.plugin_object.required_parameters
}
return await traversal.plugin_object.traversal_function(**args)
[docs]async def program_runner(
traversals: List[interfaces.Traversal],
items: AsyncIterable,
context: interfaces.Context,
):
async with async_exit_stack.AsyncExitStack() as stack:
for traversal in traversals:
items = await call_traversal(context, traversal, items, stack)
return stack.pop_all(), items
[docs]async def async_main(basic_traversals, **kwargs):
# pylint: disable=protected-access
stream = trio.hazmat.FdStream(os.dup(0))
receiver = asynch.TerminatedFrameReceiver(stream, b"\n")
# pylint: disable=no-member
global_context = interfaces.Context(global_registry.global_options.copy())
global_context.global_options.update(config.DEFAULTS)
global_context.global_options.update(kwargs)
global_context.global_options[
"global_namespace"
] = interpret.build_global_namespace(
global_context.global_options["base_exec_before"]
)
global_context.global_options["global_namespace"].update(
interpret.build_global_namespace(global_context.global_options["exec_before"])
)
traversals = []
for bt in basic_traversals:
for d in bt:
# pylint: disable=fixme
# TODO Make classes or use pyrsistent.
traversal_namespace = {
**global_context.global_options["global_namespace"],
**d["parameters"].get("inject_values", {"HELLO": "WORLD"}),
}
traversal_context = attr.evolve(
global_context,
global_options=dict(
global_context.global_options, global_namespace=traversal_namespace
),
)
# pylint: disable=unsubscriptable-object
traversal = interfaces.Traversal(
global_invocation_options=traversal_context,
specific_invocation_params=d,
plugin_object=global_registry.traversals[d["name"]],
)
traversals.append(traversal)
items = (item.decode() async for item in receiver)
stack, items = await program_runner(traversals, items, global_context)
async with stack:
async for item in items:
print(item)
[docs]def main(pairs, **kwargs):
trio.run(functools.partial(async_main, pairs, **kwargs))
global_registry = plug.make_global_registry()