Skip to content

Fix incorrect output from prints originating from different processes#604

Merged
bsipocz merged 4 commits intoexecutablebooks:masterfrom
basnijholt:coalesce_streams
Sep 19, 2024
Merged

Fix incorrect output from prints originating from different processes#604
bsipocz merged 4 commits intoexecutablebooks:masterfrom
basnijholt:coalesce_streams

Conversation

@basnijholt
Copy link
Copy Markdown
Contributor

@basnijholt basnijholt commented May 31, 2024

In the PipeFunc documentation I have the following problem when executing code with a ProcessPoolExecutor:

image

With this fix it becomes:
image

The root of the issue is that nbconvert --execute produces this output:

  {
   "cell_type": "code",
   "execution_count": 47,
   "id": "92",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2024-05-31T05:42:39.297713Z",
     "iopub.status.busy": "2024-05-31T05:42:39.297474Z",
     "iopub.status.idle": "2024-05-31T05:42:40.477462Z",
     "shell.execute_reply": "2024-05-31T05:42:40.475729Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410279 - Running double_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.408318 - Running double_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410888 - Running double_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.416024 - Running double_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.431485 - Running half_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.434285 - Running half_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.433559 - Running half_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.439223 - Running half_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:40.459668 - Running take_sum"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "14\n"
     ]
    }
   ],
   "source": [
    "from concurrent.futures import ProcessPoolExecutor\n",
    "import datetime\n",
    "import numpy as np\n",
    "import time\n",
    "from pipefunc import Pipeline, pipefunc\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"double\", mapspec=\"x[i] -> double[i]\")\n",
    "def double_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running double_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return 2 * x\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"half\", mapspec=\"x[i] -> half[i]\")\n",
    "def half_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running half_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return x // 2\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"sum\")\n",
    "def take_sum(half: np.ndarray, double: np.ndarray) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running take_sum\")\n",
    "    return sum(half + double)\n",
    "\n",
    "\n",
    "pipeline = Pipeline([double_it, half_it, take_sum])\n",
    "inputs = {\"x\": [0, 1, 2, 3]}\n",
    "run_folder = \"my_run_folder\"\n",
    "executor = ProcessPoolExecutor(max_workers=8)  # use 8 processes\n",
    "results = pipeline.map(\n",
    "    inputs,\n",
    "    run_folder=run_folder,\n",
    "    parallel=True,\n",
    "    executor=executor,\n",
    "    storage=\"shared_memory_dict\",\n",
    ")\n",
    "print(results[\"sum\"].output)"
   ]
  },

In the PipeFunc documentation I got:
```
```
  {
   "cell_type": "code",
   "execution_count": 47,
   "id": "92",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2024-05-31T05:42:39.297713Z",
     "iopub.status.busy": "2024-05-31T05:42:39.297474Z",
     "iopub.status.idle": "2024-05-31T05:42:40.477462Z",
     "shell.execute_reply": "2024-05-31T05:42:40.475729Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410279 - Running double_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.408318 - Running double_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410888 - Running double_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.416024 - Running double_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.431485 - Running half_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.434285 - Running half_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.433559 - Running half_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.439223 - Running half_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:40.459668 - Running take_sum"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "14\n"
     ]
    }
   ],
   "source": [
    "from concurrent.futures import ProcessPoolExecutor\n",
    "import datetime\n",
    "import numpy as np\n",
    "import time\n",
    "from pipefunc import Pipeline, pipefunc\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"double\", mapspec=\"x[i] -> double[i]\")\n",
    "def double_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running double_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return 2 * x\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"half\", mapspec=\"x[i] -> half[i]\")\n",
    "def half_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running half_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return x // 2\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"sum\")\n",
    "def take_sum(half: np.ndarray, double: np.ndarray) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running take_sum\")\n",
    "    return sum(half + double)\n",
    "\n",
    "\n",
    "pipeline = Pipeline([double_it, half_it, take_sum])\n",
    "inputs = {\"x\": [0, 1, 2, 3]}\n",
    "run_folder = \"my_run_folder\"\n",
    "executor = ProcessPoolExecutor(max_workers=8)  # use 8 processes\n",
    "results = pipeline.map(\n",
    "    inputs,\n",
    "    run_folder=run_folder,\n",
    "    parallel=True,\n",
    "    executor=executor,\n",
    "    storage=\"shared_memory_dict\",\n",
    ")\n",
    "print(results[\"sum\"].output)"
   ]
  },
```
@welcome
Copy link
Copy Markdown

welcome bot commented May 31, 2024

Thanks for submitting your first pull request! You are awesome! 🤗

If you haven't done so already, check out EBP's Code of Conduct and our Contributing Guide, as this will greatly help the review process.

Welcome to the EBP community! 🎉

@basnijholt
Copy link
Copy Markdown
Contributor Author

@agoose77 the failing CI check is unrelated to these changes

@basnijholt
Copy link
Copy Markdown
Contributor Author

@agoose77, friendly ping. Could you take a look at this?

@bsipocz
Copy link
Copy Markdown
Member

bsipocz commented Sep 19, 2024

I have a very similar problem in one of my repos, but this fix doesn't seem to work, I still get the fragmented output in the rendered HTML.

@basnijholt
Copy link
Copy Markdown
Contributor Author

@bsipocz, did you set nb_merge_streams = True?

@bsipocz
Copy link
Copy Markdown
Member

bsipocz commented Sep 19, 2024

@bsipocz, did you set nb_merge_streams = True?

Yeap, I didn't have that, but discovered the option by following the link to your pipefunc/pipefunc#125 PR. So thank you.

So now my issue is fixed even without this PR.

While this PR does pass all tests, and doesn't seem to break anything, I suppose it would be nice to add your failing case to the tests, too.

streams[output["name"]]["text"] += f"{out}\n"
else:
new_outputs.append(output)
output["text"] = output["text"].strip() + "\n"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line necessary?

Copy link
Copy Markdown
Contributor Author

@basnijholt basnijholt Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, perhaps it would be clearer to move it one line up. We're mutating the dict we just added to new_outputs.

edit: done

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I was just nitpicking that you stripe off whitespace and add a newline; practically the same, so is this really needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is in case there are multiple newlines at the end which can happen after merging the cells.

See the example in my first post.

I am hard at work on trying to write a test. Turns out that on MacOS the issue does not exist ... took me a good while to realize that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bsipocz, just added a test!

See my comment here #604 (comment)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@basnijholt @bsipocz just a hint, you should be using rstrip not strip, because now you have removed any possible indentation at the start of the streams 🤷

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll open a follow-up to fix that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I just opened #640

bsipocz
bsipocz previously approved these changes Sep 19, 2024
Copy link
Copy Markdown
Member

@bsipocz bsipocz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to break anything, and thus I would go ahead and merge it, however it would be nicer to have a test covering it, too.

@basnijholt
Copy link
Copy Markdown
Contributor Author

I added a test. Without the changes here the test will fail:

________________________________________________ test_merge_streams_parallel ________________________________________________

sphinx_run = <conftest.SphinxFixture object at 0x7e810491f9b0>
file_regression = <conftest.FileRegression object at 0x7e80b65fe3f0>

    @pytest.mark.sphinx_params(
        "merge_streams_parallel.ipynb",
        conf={"nb_execution_mode": "off", "nb_merge_streams": True},
    )
    def test_merge_streams_parallel(sphinx_run, file_regression):
        """Test configuring multiple concurrent stdout/stderr outputs to be merged."""
        sphinx_run.build()
        assert sphinx_run.warnings() == ""
        doctree = sphinx_run.get_resolved_doctree("merge_streams_parallel")
>       file_regression.check(doctree.pformat(), extension=".xml", encoding="utf-8")

/home/bas.nijholt/repos/MyST-NB/tests/test_render_outputs.py:116:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <conftest.FileRegression object at 0x7e80b65fe3f0>
data = '<document source="merge_streams_parallel" translation_progress="{\'total\': 0, \'translated\': 0}">\n    <container c...        \n                \n                \n                \n                \n                \n                \n'
kwargs = {'encoding': 'utf-8', 'extension': '.xml'}

    def check(self, data, **kwargs):
>       return self.file_regression.check(self._strip_ignores(data), **kwargs)
E       AssertionError: FILES DIFFER:
E       /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.xml
E       /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.obtained.xml
E       HTML DIFF: /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.obtained.diff.html
E       ---
E       +++
E       @@ -9,13 +9,13 @@
E                                pass
E                <container classes="cell_output" nb_element="cell_code_output">
E                    <literal_block classes="output stream" language="myst-ansi" linenos="False" xml:space="preserve">
E       +                000000000
E                        0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       +
E       +
E       +
E       +
E       +
E       +
E       +
E       +

The notebook is executed via jupyter nbconvert --execute merge_streams_parallel.ipynb --to ipynb and I committed the executed ipynb.

@bsipocz bsipocz added the bug Something isn't working label Sep 19, 2024
Copy link
Copy Markdown
Member

@bsipocz bsipocz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @basnijholt!

@bsipocz bsipocz merged commit 25fa81d into executablebooks:master Sep 19, 2024
basnijholt added a commit to pipefunc/pipefunc that referenced this pull request Sep 27, 2024
basnijholt added a commit to pipefunc/pipefunc that referenced this pull request Sep 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants