Fix incorrect output from prints originating from different processes#604
Conversation
In the PipeFunc documentation I got:
```
```
{
"cell_type": "code",
"execution_count": 47,
"id": "92",
"metadata": {
"execution": {
"iopub.execute_input": "2024-05-31T05:42:39.297713Z",
"iopub.status.busy": "2024-05-31T05:42:39.297474Z",
"iopub.status.idle": "2024-05-31T05:42:40.477462Z",
"shell.execute_reply": "2024-05-31T05:42:40.475729Z"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.410279 - Running double_it for x=3"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.408318 - Running double_it for x=0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.410888 - Running double_it for x=1"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.416024 - Running double_it for x=2"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.431485 - Running half_it for x=0"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.434285 - Running half_it for x=1"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.433559 - Running half_it for x=2"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:39.439223 - Running half_it for x=3"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-05-30 22:42:40.459668 - Running take_sum"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"14\n"
]
}
],
"source": [
"from concurrent.futures import ProcessPoolExecutor\n",
"import datetime\n",
"import numpy as np\n",
"import time\n",
"from pipefunc import Pipeline, pipefunc\n",
"\n",
"\n",
"@pipefunc(output_name=\"double\", mapspec=\"x[i] -> double[i]\")\n",
"def double_it(x: int) -> int:\n",
" print(f\"{datetime.datetime.now()} - Running double_it for x={x}\")\n",
" time.sleep(1)\n",
" return 2 * x\n",
"\n",
"\n",
"@pipefunc(output_name=\"half\", mapspec=\"x[i] -> half[i]\")\n",
"def half_it(x: int) -> int:\n",
" print(f\"{datetime.datetime.now()} - Running half_it for x={x}\")\n",
" time.sleep(1)\n",
" return x // 2\n",
"\n",
"\n",
"@pipefunc(output_name=\"sum\")\n",
"def take_sum(half: np.ndarray, double: np.ndarray) -> int:\n",
" print(f\"{datetime.datetime.now()} - Running take_sum\")\n",
" return sum(half + double)\n",
"\n",
"\n",
"pipeline = Pipeline([double_it, half_it, take_sum])\n",
"inputs = {\"x\": [0, 1, 2, 3]}\n",
"run_folder = \"my_run_folder\"\n",
"executor = ProcessPoolExecutor(max_workers=8) # use 8 processes\n",
"results = pipeline.map(\n",
" inputs,\n",
" run_folder=run_folder,\n",
" parallel=True,\n",
" executor=executor,\n",
" storage=\"shared_memory_dict\",\n",
")\n",
"print(results[\"sum\"].output)"
]
},
```
|
Thanks for submitting your first pull request! You are awesome! 🤗 |
|
@agoose77 the failing CI check is unrelated to these changes |
|
@agoose77, friendly ping. Could you take a look at this? |
|
I have a very similar problem in one of my repos, but this fix doesn't seem to work, I still get the fragmented output in the rendered HTML. |
|
@bsipocz, did you set |
Yeap, I didn't have that, but discovered the option by following the link to your pipefunc/pipefunc#125 PR. So thank you. So now my issue is fixed even without this PR. While this PR does pass all tests, and doesn't seem to break anything, I suppose it would be nice to add your failing case to the tests, too. |
| streams[output["name"]]["text"] += f"{out}\n" | ||
| else: | ||
| new_outputs.append(output) | ||
| output["text"] = output["text"].strip() + "\n" |
There was a problem hiding this comment.
Yes, perhaps it would be clearer to move it one line up. We're mutating the dict we just added to new_outputs.
edit: done
There was a problem hiding this comment.
oh, I was just nitpicking that you stripe off whitespace and add a newline; practically the same, so is this really needed?
There was a problem hiding this comment.
Oh this is in case there are multiple newlines at the end which can happen after merging the cells.
See the example in my first post.
I am hard at work on trying to write a test. Turns out that on MacOS the issue does not exist ... took me a good while to realize that.
There was a problem hiding this comment.
@bsipocz, just added a test!
See my comment here #604 (comment)
There was a problem hiding this comment.
@basnijholt @bsipocz just a hint, you should be using rstrip not strip, because now you have removed any possible indentation at the start of the streams 🤷
There was a problem hiding this comment.
Thanks! I'll open a follow-up to fix that.
bsipocz
left a comment
There was a problem hiding this comment.
This doesn't seem to break anything, and thus I would go ahead and merge it, however it would be nicer to have a test covering it, too.
|
I added a test. Without the changes here the test will fail: The notebook is executed via |
executablebooks/MyST-NB#604 was merged and made it into 1.21
executablebooks/MyST-NB#604 was merged and made it into [v1.1.2](https://github.com/executablebooks/MyST-NB/releases/tag/v1.1.2).
In the PipeFunc documentation I have the following problem when executing code with a
ProcessPoolExecutor:With this fix it becomes:

The root of the issue is that
nbconvert --executeproduces this output: