Skip to content

Add pass-through arguments for scheduler/worker --preload modules.#1634

Merged
mrocklin merged 10 commits intodask:masterfrom
asford:preload_passthrough
Feb 12, 2018
Merged

Add pass-through arguments for scheduler/worker --preload modules.#1634
mrocklin merged 10 commits intodask:masterfrom
asford:preload_passthrough

Conversation

@asford
Copy link
Copy Markdown
Contributor

@asford asford commented Dec 14, 2017

Adds support for pass-through command line arguments to worker and scheduler preload modules via an argv pattern similar to that used for compiler subcommand arguments. Intended to provide support "generic" preload modules with run-time configuration.

  • Extension of dask_setup interface will likely break existing preload implementations. Extends preload interface with an optional dask_command module attribute.
  • Preload argument handling is ambiguous in the case of multiple modules. Only support a single preload module exposing the dask_command interface. Applications requiring multiple preload modules may declare a single "top-level" module chaining the required subcommands.
  • Test coverage and test updates.

@asford asford force-pushed the preload_passthrough branch from 3b99652 to 3fc9641 Compare December 14, 2017 17:56
@asford asford changed the title WIP Add pass-through arguments for scheduler/worker --preload modules. [WIP] Add pass-through arguments for scheduler/worker --preload modules. Dec 14, 2017
@asford asford changed the title [WIP] Add pass-through arguments for scheduler/worker --preload modules. [WIP] [skip ci] Add pass-through arguments for scheduler/worker --preload modules. Dec 14, 2017
@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 14, 2017 via email

@asford
Copy link
Copy Markdown
Contributor Author

asford commented Dec 14, 2017

I've looked through the click documentation and think this would be possible. The "clang-like" interface seems harder to support via click, but I'm not tied to that concept. Would a model that passes any "unrecognized" arguments into the preloaded module be preferable to the --preload_args interface? This would result in:

dask-scheduler --preload scheduler_preload.py --flags-for-preloads=flag_value args-for-preloads

This interface seems better, but may complicate (a) unknown argument error handling and (b) the use of multiple preload modules.

My main concern with building the "preload" logic around click is that it ties the preload implementation to a specific command-line parser, but I can see an argument for standardization. I'll explore this more and prototype a click-based system.

@jakirkham
Copy link
Copy Markdown
Member

jakirkham commented Dec 14, 2017

The AppVeyor build appeared to have hung, but didn't terminate itself. So took the liberty of canceling it.

Edit: Should add the previous build for this PR did the exact same thing. Probably worth investigating if there is something here causing it.

@asford
Copy link
Copy Markdown
Contributor Author

asford commented Dec 14, 2017

@jakirkham Sorry, my intention was to skip CI for this pull until we've landed on an implementation. Would you mind checking if the [skip ci] flag I've added is now working?

@asford
Copy link
Copy Markdown
Contributor Author

asford commented Dec 14, 2017

@mrocklin It turned out that a click based implementation was easier than I expected. I've updated this interface to support delegation into a click.Command exposed in the preload module attribute dask_command.

This obviously needs testing and documentation updates, but unfortunately I may not be able to push this to a mergeable state until early next week. Would you mind commenting on the interface as implemented?

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 14, 2017 via email

@asford asford force-pushed the preload_passthrough branch 2 times, most recently from 4b0d408 to f209527 Compare December 14, 2017 20:40
@jakirkham
Copy link
Copy Markdown
Member

Sorry, my intention was to skip CI for this pull until we've landed on an implementation.

No worries. Just figured I'd mention it in case this was unexpected or you were trying to debug the CI.

Would you mind checking if the [skip ci] flag I've added is now working?

Sorry. Don't seem to be seeing it. If you add [ci skip] to the commit message, that usually works.

@mrocklin
Copy link
Copy Markdown
Member

In reviewing this it would be useful to see an example of how it works. Perhaps add a test following the pattern in distributed/cli/tests/test_dask_scheduler.py::test_preload_file?

@mrocklin
Copy link
Copy Markdown
Member

I'm finding review this to be a bit of a challenge due to my own lack of familiarity with the click library.

CC'ing @danielfrg in case cd he has time and has experience with common conventions on how to pass through keyword arguments to downstream commands. @danielfrg , in brief the dask-scheduler and dask-worker commands can specify user-defined scripts to run at a certain point in the setup process. This is often used to set up logging, add additional services, etc.. We would like to pass keywords down from the main dask-scheduler call to these scripts. Here is an example.

dask-scheduler ... --preload my-script.py --key1 value --key2 value

Where --key1 and --key2 are intended for my-script.py

@danielfrg
Copy link
Copy Markdown
Member

I kinda like what wait-for-it does you pass a -- and after that the extra command and flags.

It will look like:

dask-scheduler --port 8080 -- python my-script.py --key1 value1 --key2 value2

@mrocklin
Copy link
Copy Markdown
Member

I kinda like what wait-for-it does you pass a -- and after that the extra command and flags.

This would presumably go outside of standard click approaches though, yes? I'm not a good judge of how fancy this is or what conventions are these days, but I'm a little hesitant about this.

I think what I'm looking for is a way to pass keywords down to subcommands. I want the click equivalent of passing **kwargs

def f(x=1, y=2, **kwargs):
    tmp = g(**kwargs)
    return x + y + tmp

def g(z=3, **kwargs):
    ...

@asford
Copy link
Copy Markdown
Contributor Author

asford commented Dec 22, 2017 via email

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 22, 2017 via email

@mrocklin
Copy link
Copy Markdown
Member

OK, I gave this a shot with the following example:

# click_test.py
@click.command(context_settings={'ignore_unknown_options': True})
@click.option('--foo', type=str)
def dask_setup(foo):
    print(foo)

if __name__ == '__main__':
    dask_setup()
$ dask-scheduler --preload click_test.py --foo 5
distributed.utils - INFO - Reload module click_test from .py file
Usage: dask-scheduler [OPTIONS] [PRELOAD_ARGV]...

Error: Additional preload arguments specified, but --preload target did not expose command.

It's not clear to me how to use this work properly yet. I appreciate the informative error messages.
They may not be enough for new users though. Perhaps this needs documentation? Perhaps an example or two?

@asford asford force-pushed the preload_passthrough branch from 6dbabe5 to 8b345f5 Compare December 29, 2017 03:39
@asford
Copy link
Copy Markdown
Contributor Author

asford commented Dec 29, 2017

Rebased, added initial test coverage and documented. This should now be ready for review.

@asford asford changed the title [WIP] [skip ci] Add pass-through arguments for scheduler/worker --preload modules. Add pass-through arguments for scheduler/worker --preload modules. Dec 29, 2017
@asford
Copy link
Copy Markdown
Contributor Author

asford commented Dec 29, 2017

It looks like the CI error was py2.7 distributed/tests/test_worker.py::test_statistical_profiling_cycle. I haven't been able to repro locally in a conda-based python 2.7 development environment. Is it possible that this test is flaky?

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 29, 2017 via email

@asford
Copy link
Copy Markdown
Contributor Author

asford commented Jan 16, 2018

@mrocklin @danielfrg Friendly ping on this pull. Happy to revise the implementation, but would like to have a +1/-1 and plan for a merge.

Copy link
Copy Markdown
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle the approach looks good to me. I've raised a couple of points below.

@click.command()
@click.option("--passthrough", type=str, default="default")
def dask_command(passthrough):
_config["passthrough"] = passthrough
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to just put the command bits on dask_setup rather than add a new special function. Presumably these run at the same time anyway?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, fixed below.

Comment thread distributed/preloading.py Outdated
return value

if value and not ctx.params.get("preload", None):
raise click.UsageError("Additional preload arguments specified without --preload target.")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be valuable to include in a test. We want to make ensure that operations like dask-scheduler --bad-keyword do genuinely raise informative arguments. In these cases we probably don't want to talk about --preload but rather just mention that there are unknown keywords (and perhaps list what they are).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in commit below to match click error messages for unknown options and extra arguments.

Alex Ford added 3 commits January 26, 2018 03:38
Adds support for optional command-line parsing to worker and scheduler
--preload modules. Modules *may* support an additional `dask_command`
module attribute, which must provide a click.Command to be used in
parsing command-line options from the root command-line interface.
`dask_command` will be invoked with any "additional" arguments specified
on the command line interface, and is expected to configure the preload
module. Following calls to `dask_setup` and `dask_teardown` provide the
target worker or scheduler objects.
Adds initial test coverage for click-based preload arguments. Fixup
errors in preload argument specification and preload main handling.
Alex Ford added 4 commits January 26, 2018 03:40
From review, convert to parsing options on dask_setup if provided
as a click command. Fixup error messages from unknown/extra options to
match click conventions if preload argment module isn't provided.
@asford asford force-pushed the preload_passthrough branch from cc35032 to 7fcdd79 Compare January 26, 2018 03:40
@mrocklin
Copy link
Copy Markdown
Member

An experiment and results:

# foo.py

import click

@click.command()
@click.option("--cmd", type=str, default='default')
def dask_setup(scheduler, cmd):
    print("Hello")
    print(cmd)

print("running")
$ dask-scheduler --preload foo.py --cmd bar
distributed.utils - INFO - Reload module foo from .py file
running
running
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at: tcp://192.168.50.100:8786
distributed.scheduler - INFO -       bokeh at:                     :8787
distributed.utils - INFO - Reload module foo from .py file
running
Hello
bar
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-dcacf5e_
distributed.scheduler - INFO - -----------------------------------------------

Why the reloads? Why did running appear three times?

@mrocklin
Copy link
Copy Markdown
Member

Otherwise everything feels smooth on my end

@asford
Copy link
Copy Markdown
Contributor Author

asford commented Jan 26, 2018

This occurs when a source preload .py file is specified for preload, rather than a preload module name, due to the explicit reload call after import_module in distributed.utils.import_file.

The preload system performs multiple imports of the given name because the target preload must be accessed (A) to check for a click-ed dask_setup function and (B) to extract the preload interface for the actual dask_setup call. This doesn't result in triple-execution of a preload module (as opposed to preload file), as the module is retrieved from the import cache in (B).

This forced reload prevents the import system from using the cached initial .py import, so in (A) you see (1) an initial import of the file and (2) a forced reload of the file then in (B) (3) a final forced reload before the dask_setup call.

If this behavior seems too spooky it would be possible to fully parse the --preload component during dask-(worker|scheduler) click argument parsing then pass the complete interface via the preload argument value, as opposed to passing the string (file|module) name as currently implemented. However, this is really a workaround for the somewhat-strange multiple-load behavior in distributed.utils.import_file.

@asford
Copy link
Copy Markdown
Contributor Author

asford commented Jan 26, 2018

Example tracebacks below, lightly trimmed for clarity:

Preload Source

fordas@salish:~/workspace/distributed_dev$ cat test_plugin.py
import traceback

import click

@click.command()
@click.option("--foo", default="bar")
def dask_setup(scheduler, foo):
    print("dask_setup test_plugin")
    print(foo)

print("run test_plugin")
traceback.print_stack()

Preload via module name, single import

fordas@salish:~/workspace/distributed_dev$ PYTHONPATH=`pwd` dask-scheduler --preload test_plugin --port 88786
run test_plugin
[...]
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/preloading.py", line 32, in validate_preload_argv
    preload_modules = _import_modules(ctx.params.get("preload"))
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/preloading.py", line 97, in _import_modules
    import_module(name)
  File "/home/fordas/.conda/envs/distributed_dev/lib/python3.5/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 985, in _gcd_import
  File "<frozen importlib._bootstrap>", line 968, in _find_and_load
  File "<frozen importlib._bootstrap>", line 957, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 673, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 697, in exec_module
  File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
  File "/home/fordas/workspace/distributed_dev/test_plugin.py", line 12, in <module>
    traceback.print_stack()
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:    tcp://10.128.0.2:23250
distributed.scheduler - INFO -       bokeh at:                    :45101
dask_setup test_plugin
bar
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-qaix2em4
distributed.scheduler - INFO - -----------------------------------------------
^Cdistributed.scheduler - INFO - End scheduler at 'tcp://:88786'

Import via filename, repeated import

fordas@salish:~/workspace/distributed_dev$ PYTHONPATH=`pwd` dask-scheduler --preload test_plugin.py --port 88786
distributed.utils - INFO - Reload module test_plugin from .py file
run test_plugin
[...]
  File "/home/fordas/.conda/envs/distributed_dev/lib/python3.5/site-packages/click/core.py", line 78, in invoke_param_callback
    return callback(ctx, param, value)
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/preloading.py", line 32, in validate_preload_argv
    preload_modules = _import_modules(ctx.params.get("preload"))
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/preloading.py", line 92, in _import_modules
    module = import_file(name)[0]
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/utils.py", line 1022, in import_file
    loaded.append(reload(import_module(name)))
  File "/home/fordas/.conda/envs/distributed_dev/lib/python3.5/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 985, in _gcd_import
  File "<frozen importlib._bootstrap>", line 968, in _find_and_load
  File "<frozen importlib._bootstrap>", line 957, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 673, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 697, in exec_module
  File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
  File "/home/fordas/workspace/distributed_dev/test_plugin.py", line 12, in <module>
    traceback.print_stack()

run test_plugin
[...]
  File "/home/fordas/.conda/envs/distributed_dev/lib/python3.5/site-packages/click/core.py", line 78, in invoke_param_callback
    return callback(ctx, param, value)
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/preloading.py", line 32, in validate_preload_argv
    preload_modules = _import_modules(ctx.params.get("preload"))
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/preloading.py", line 92, in _import_modules
    module = import_file(name)[0]
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/utils.py", line 1022, in import_file
    loaded.append(reload(import_module(name)))
  File "/home/fordas/.conda/envs/distributed_dev/lib/python3.5/importlib/__init__.py", line 166, in reload
    _bootstrap._exec(spec, module)
  File "<frozen importlib._bootstrap>", line 626, in _exec
  File "<frozen importlib._bootstrap_external>", line 697, in exec_module
  File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
  File "/home/fordas/workspace/distributed_dev/test_plugin.py", line 12, in <module>
    traceback.print_stack()
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:    tcp://10.128.0.2:23250
distributed.scheduler - INFO -       bokeh at:                    :34427
distributed.utils - INFO - Reload module test_plugin from .py file
run test_plugin
  File "/home/fordas/.conda/envs/distributed_dev/bin/dask-scheduler", line 11, in <module>
    load_entry_point('distributed', 'console_scripts', 'dask-scheduler')()
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/cli/dask_scheduler.py", line 145, in go
    main()
  File "/home/fordas/.conda/envs/distributed_dev/lib/python3.5/site-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/home/fordas/.conda/envs/distributed_dev/lib/python3.5/site-packages/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/home/fordas/.conda/envs/distributed_dev/lib/python3.5/site-packages/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/fordas/.conda/envs/distributed_dev/lib/python3.5/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/cli/dask_scheduler.py", line 125, in main
    preload_modules(preload, parameter=scheduler, file_dir=local_directory, argv=preload_argv)
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/preloading.py", line 123, in preload_modules
    imported_modules = _import_modules(names, file_dir=file_dir)
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/preloading.py", line 90, in _import_modules
    module = import_file(copy_dst)[0]
  File "/home/fordas/workspace/distributed_dev/src/distributed/distributed/utils.py", line 1022, in import_file
    loaded.append(reload(import_module(name)))
  File "/home/fordas/.conda/envs/distributed_dev/lib/python3.5/importlib/__init__.py", line 166, in reload
    _bootstrap._exec(spec, module)
  File "<frozen importlib._bootstrap>", line 626, in _exec
  File "<frozen importlib._bootstrap_external>", line 697, in exec_module
  File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
  File "/tmp/scheduler-p65s4rto/test_plugin.py", line 12, in <module>
    traceback.print_stack()
dask_setup test_plugin
bar
distributed.scheduler - INFO - Local Directory:    /tmp/scheduler-p65s4rto
distributed.scheduler - INFO - -----------------------------------------------
^Cdistributed.scheduler - INFO - End scheduler at 'tcp://:88786'

@mrocklin
Copy link
Copy Markdown
Member

I again apologize for the long delay here. I've merged in master and pushed. Merging on passed tests.

@mrocklin mrocklin merged commit cc908e7 into dask:master Feb 12, 2018
@mrocklin
Copy link
Copy Markdown
Member

This is in. Thank you for your work and patience @asford

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants