import subprocess
import sys
import tempfile
import textwrap
import click
from mario import cli_tools
from mario import doc
from mario import interpret
from mario import plug
from mario import traversals
registry = plug.Registry()
[docs]def calculate_function(traversal, howcall=None):
if howcall is None:
howcall = traversal.specific_invocation_params.get("howcall")
if howcall is None:
howcall = interpret.HowCall.SINGLE
global_namespace = traversal.global_invocation_options.global_options[
"global_namespace"
].copy()
if "exec_before" in traversal.specific_invocation_params["parameters"]:
global_namespace.update(
interpret.build_global_namespace(
traversal.specific_invocation_params["parameters"]["exec_before"]
)
)
if "code" in traversal.specific_invocation_params:
return {
"function": interpret.build_function(
traversal.specific_invocation_params["code"],
global_namespace=global_namespace,
howcall=howcall,
)
}
return {"function": None}
[docs]def calculate_reduce(traversal):
function = interpret.build_function(
traversal.specific_invocation_params["code"],
traversal.global_invocation_options.global_options["global_namespace"],
howcall=interpret.HowCall.VARARGS,
)
return {"function": function}
[docs]@registry.add_traversal("map", calculate_more_params=calculate_function)
async def map(
function, items, exit_stack, max_concurrent
): # pylint: disable=redefined-builtin
"""
Run code on each input item.
Each item is handled in the order it was received, and also output in the
same order. For less strict ordering and asynchronous execution, see
``async-map`` and ``async-map-unordered``.
For example,
.. code-block:: bash
$ mario map 'x*2' <<EOF
a
b
c
EOF
aa
bb
cc
"""
return await exit_stack.enter_async_context(
traversals.sync_map(function, items, max_concurrent)
)
[docs]@registry.add_traversal("async_map", calculate_more_params=calculate_function)
async def async_map(function, items, exit_stack, max_concurrent):
"""
Run code on each input item asynchronously.
The order of inputs is retained in the outputs. However, the order of inputs
does not determine the order in which each input is handled, only the order
in which its result is emitted. To keep the order in which each input is
handled, use the synchronous version, ``map``.
In this example, we make requests that have a server-side delay of specified
length. The input order is retained in the output by holding each item until
its precedents are ready.
.. code-block:: bash
$ mario async-map 'await asks.get ! x.json()["url"]' <<EOF
http://httpbin.org/delay/5
http://httpbin.org/delay/1
http://httpbin.org/delay/2
http://httpbin.org/delay/3
http://httpbin.org/delay/4
EOF
https://httpbin.org/delay/5
https://httpbin.org/delay/1
https://httpbin.org/delay/2
https://httpbin.org/delay/3
https://httpbin.org/delay/4
"""
return await exit_stack.enter_async_context(
traversals.async_map(function, items, max_concurrent)
)
[docs]@registry.add_traversal("async_map_unordered", calculate_more_params=calculate_function)
async def async_map_unordered(function, items, exit_stack, max_concurrent):
"""
Run code on each input item asynchronously, without retaining input order.
Each result is emitted in the order it becomes ready, regardless of input
order. Input order is also ignored when determining in which order to
*start* handling each item. Results start emitting as soon as the first one
is ready. It also saves memory because it doesn't require accumulating
results while waiting for previous items to become ready. For stricter
ordering, see ``map`` or ``async_map``.
In this example, we make requests that have a server-side delay of specified
length. The input order is lost but the results appear immediately as they
are ready (the delay length determines the output order):
.. code-block:: bash
$ mario async-map-unordered 'await asks.get ! x.json()["url"]' <<EOF
http://httpbin.org/delay/5
http://httpbin.org/delay/1
http://httpbin.org/delay/2
http://httpbin.org/delay/3
http://httpbin.org/delay/4
EOF
https://httpbin.org/delay/1
https://httpbin.org/delay/2
https://httpbin.org/delay/3
https://httpbin.org/delay/4
https://httpbin.org/delay/5
"""
return await exit_stack.enter_async_context(
traversals.async_map_unordered(function, items, max_concurrent)
)
[docs]@registry.add_traversal("filter", calculate_more_params=calculate_function)
async def filter(
function, items, exit_stack, max_concurrent
): # pylint: disable=redefined-builtin
"""
Keep input items that satisfy a condition.
Order of input items is retained in the output.
For example,
.. code-block:: bash
$ mario filter 'x > "c"' <<EOF
a
b
c
d
e
f
EOF
d
e
f
"""
return await exit_stack.enter_async_context(
traversals.sync_filter(function, items, max_concurrent)
)
[docs]@registry.add_traversal("async_filter", calculate_more_params=calculate_function)
async def async_filter(function, items, exit_stack, max_concurrent):
"""
Keep input items that satisfy an asynchronous condition.
For example,
.. code-block:: bash
$ mario async-filter '(await asks.get(x)).json()["url"].endswith(("1", "3"))' <<EOF
http://httpbin.org/delay/5
http://httpbin.org/delay/1
http://httpbin.org/delay/2
http://httpbin.org/delay/3
http://httpbin.org/delay/4
EOF
http://httpbin.org/delay/1
http://httpbin.org/delay/3
"""
return await exit_stack.enter_async_context(
traversals.async_filter(function, items, max_concurrent)
)
[docs]@registry.add_traversal("apply", calculate_more_params=calculate_function)
async def apply(function, items):
"""
Apply code to the iterable of items.
The code should take an iterable and it will be called with the input items.
The items iterable will be converted to a list before the code is called, so
it doesn't work well on very large streams.
For example,
.. code-block:: bash
$ mario map int apply sum <<EOF
10
20
30
EOF
60
"""
return traversals.AsyncIterableWrapper([await function([x async for x in items])])
# pylint: disable=redefined-builtin
[docs]@registry.add_traversal(
"eval",
calculate_more_params=lambda x: calculate_function(
x, howcall=interpret.HowCall.NONE
),
)
async def eval(function):
"""
Evaluate a Python expression.
No input items are used.
For example,
.. code-block:: bash
$ mario eval 1+1
2
"""
return traversals.AsyncIterableWrapper([await function(None)])
[docs]@registry.add_traversal("reduce", calculate_more_params=calculate_reduce)
async def reduce(function, items, exit_stack, max_concurrent):
"""
Reduce input items with code that takes two arguments, similar to ``functools.reduce``.
For example,
.. code-block:: bash
$ mario reduce map int operator.mul <<EOF
1
2
3
4
5
EOF
120
"""
return await exit_stack.enter_async_context(
traversals.async_reduce(function, items, max_concurrent)
)
[docs]@registry.add_traversal(
"chain",
calculate_more_params=lambda x: calculate_function(
x, howcall=interpret.HowCall.NONE
),
)
async def chain(items, exit_stack):
"""
Concatenate a sequence of input iterables together into one long iterable.
Converts an iterable of iterables of items into an iterable of items, like `itertools.chain.from_iterable <https://docs.python.org/3/library/itertools.html#itertools.chain.from_iterable>`_.
For example,
.. code-block:: bash
$ mario eval '[[1,2]]'
[[1, 2]]
$ mario eval '[[1, 2]]' chain
[1, 2]
"""
return await exit_stack.enter_async_context(traversals.sync_chain(items))
subcommands = [
cli_tools.DocumentedCommand(
"map",
help=map.__doc__,
short_help="Call code on each line of input.",
section="Traversals",
),
cli_tools.DocumentedCommand(
"async-map",
help=async_map.__doc__,
short_help="Call code on each line of input.",
section="Async traversals",
),
cli_tools.DocumentedCommand(
"apply",
help=apply.__doc__,
short_help="Call code on input as a sequence.",
section="Traversals",
),
cli_tools.DocumentedCommand(
"filter",
help=filter.__doc__,
short_help="Call code on each line of input and exclude false values.",
section="Traversals",
),
cli_tools.DocumentedCommand(
"async-filter",
help=async_filter.__doc__,
short_help="Async call code on each line of input and exclude false values.",
section="Async traversals",
),
cli_tools.DocumentedCommand(
"async-map-unordered",
help=async_map_unordered.__doc__,
short_help="Call code on each line of input, ignoring order of input items.",
section="Async traversals",
),
cli_tools.DocumentedCommand(
"eval",
help=eval.__doc__,
short_help="Evaluate a python expression code",
section="Traversals",
),
]
[docs]def build_callback(sub_command):
def callback(code, autocall, **parameters):
if autocall:
howcall = interpret.HowCall.SINGLE
else:
howcall = interpret.HowCall.NONE
return [
{
"name": sub_command.name.replace("-", "_"),
"howcall": howcall,
"code": code,
"parameters": parameters,
}
]
return callback
option_exec_before = click.option(
"--exec-before", help="Execute code in the function's global namespace."
)
for subcommand in subcommands:
subcommand.params = [
click.Option(
["--autocall/--no-autocall"],
is_flag=True,
default=True,
help='Automatically call the function if "x" does not appear in the expression. '
"Allows ``map len`` instead of ``map len(x)``.",
),
click.Argument(["code"]),
]
subcommand.callback = build_callback(subcommand)
subcommand = option_exec_before(subcommand)
# pylint: disable=fixme
# TODO: add_cli and add_traversal should be the non-decorator form
registry.add_cli(name=subcommand.name)(subcommand)
@registry.add_cli(name="reduce")
@click.command( # type: ignore
"reduce",
cls=cli_tools.DocumentedCommand,
section="Traversals",
short_help="Reduce a sequence with a function like ``operator.mul``.",
help=reduce.__doc__,
)
@option_exec_before
@click.argument("function_name")
def _reduce(function_name, **parameters):
return [
{
"code": f"toolz.curry({function_name})",
"name": "reduce",
"parameters": parameters,
}
]
# @registry.add_cli(name="eval")
# @click.command("eval", short_help="Call <code> without any input.")
# @option_exec_before
# @click.argument("expression")
# def _eval(expression, **parameters):
# return [{"code": expression, "name": "eval", "parameters": parameters}]
more_commands = [
cli_tools.DocumentedCommand(
"chain",
callback=lambda **kw: [{"name": "chain", "parameters": kw}],
help=chain.__doc__,
short_help="Expand iterable of iterables of items into an iterable of items.",
section="Traversals",
)
]
for cmd in more_commands:
registry.add_cli(name=cmd.name)(cmd)
meta = click.Group("meta", chain=True)
meta.section = doc.UNSECTIONED # type: ignore
meta.sections = None # type: ignore
meta.help = "Commands about using mario."
@meta.command(
context_settings=dict(ignore_unknown_options=True),
cls=cli_tools.DocumentedCommand,
section=doc.UNSECTIONED,
)
@click.argument("pip_args", nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def pip(ctx, pip_args):
"""
Run ``pip`` in the environment that mario is installed into.
Arguments are forwarded to ``pip``.
"""
cli_args = [sys.executable, "-m", "pip"] + list(pip_args)
ctx.exit(subprocess.run(cli_args, check=False).returncode)
@meta.command(
"test",
cls=cli_tools.DocumentedCommand,
section=doc.UNSECTIONED,
context_settings=dict(ignore_unknown_options=True),
)
@click.argument("pytest_args", nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def run_tests(ctx, pytest_args):
"""
Run all declarative command tests from plugins and config.
Executes each test in the ``command.tests`` field with pytest.
Default pytest args: ``-vvv --tb=short``
"""
pytest_args = list(pytest_args) or ["-vvv", "--tb=short"]
source = textwrap.dedent(
"""\
import subprocess
import sys
import pytest
import mario.app
COMMANDS = mario.app.global_registry.commands.values() # pylint: disable=no-member
TEST_SPECS = [test for command in COMMANDS for test in command.tests]
@pytest.mark.parametrize(\"test_spec\", TEST_SPECS, ids=lambda ts: str(list(ts.invocation)))
def test_command(test_spec):
output = subprocess.check_output(
[sys.executable, \"-m\", \"mario\"] + list(test_spec.invocation),
input=test_spec.input.encode(),
).decode()
assert output == test_spec.output
"""
)
f = tempfile.NamedTemporaryFile("wt", suffix=".py", delete=False)
f.write(source)
f.close()
args = [sys.executable, "-m", "pytest"] + pytest_args + [f.name]
proc = subprocess.run(args, check=False)
ctx.exit(proc.returncode)
registry.add_cli(name="meta")(meta)