Skip to content

feat(mcp-server): Add ClpConnector for query job submission and result retrieval.#1388

Merged
All-less merged 5 commits into
y-scope:mainfrom
All-less:clp-mcp-connector
Oct 9, 2025
Merged

feat(mcp-server): Add ClpConnector for query job submission and result retrieval.#1388
All-less merged 5 commits into
y-scope:mainfrom
All-less:clp-mcp-connector

Conversation

@All-less

@All-less All-less commented Oct 6, 2025

Copy link
Copy Markdown
Contributor

Description

This PR adds the connector between MCP server and CLP package. It enables the MCP to submit queries and read results.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

cd clp
task package

cd components/clp-mcp-server
uv run pytest

Summary by CodeRabbit

  • New Features

    • Added a connector to submit time‑ranged searches, track job status, wait for completion with optional timeouts, and retrieve cached results (max 1,000).
    • Introduced environment-driven defaults for DB credentials and results‑cache configuration.
    • Standardized job statuses/types and a 1s polling interval.
  • Tests

    • Added async tests for validation, status transitions, failure handling, and result retrieval.
  • Chores

    • Updated runtime/dev dependencies and lint/test settings.

@All-less All-less requested a review from a team as a code owner October 6, 2025 19:07
@coderabbitai

coderabbitai Bot commented Oct 6, 2025

Copy link
Copy Markdown
Contributor

Walkthrough

Adds a new ClpConnector class integrating MariaDB and MongoDB results cache, introduces constants/enums for job types/statuses, environment-driven DB credentials, dependency updates in pyproject, and async unit tests covering validation, polling, error handling, and results retrieval.

Changes

Cohort / File(s) Summary of Changes
Connector implementation
components/clp-mcp-server/clp_mcp_server/clp_connector.py
New ClpConnector class with __init__, submit_query, read_job_status, wait_query_completion, and read_results. Implements timestamp validation, msgpack job payloads, MariaDB job insertion and status polling, MongoDB results-collection creation and reads, and error handling.
Constants and enums
components/clp-mcp-server/clp_mcp_server/constants.py
New constants SEARCH_MAX_NUM_RESULTS = 1000, POLLING_INTERVAL_SECONDS = 1, and enums QueryJobType(IntEnum) and QueryJobStatus(IntEnum) aligning with scheduler statuses/types.
Environment-driven settings
components/clp-mcp-server/clp_mcp_server/settings.py
New settings module exposing CLP_DB_USER (default "clp-user") and CLP_DB_PASS (default "<no_password_set>") read from environment variables.
Project configuration
components/clp-mcp-server/pyproject.toml
Added runtime deps: pymongo>=4.15.1, aiomysql>=0.2.0, msgpack>=1.1.1; dev dep: pytest-asyncio>=1.2.0; added Hatch metadata and Ruff per-file ignores.
Tests for connector
components/clp-mcp-server/tests/test_clp_connector.py
New async tests using mocks/patches and helper async iterator: invalid timestamp checks, status progression and polling (including patched sleep), failure case handling, results retrieval; some tests skipped when real DB required.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Client
  participant CC as ClpConnector
  participant MDB as MariaDB
  participant MGO as MongoDB_Results

  rect rgb(235,245,255)
    note over CC: Initialization
    Client->>CC: __init__(clp_config)
    CC->>MGO: connect to results DB
    CC->>MDB: configure MariaDB client
  end

  rect rgb(240,255,240)
    note over Client,CC: Submit query
    Client->>CC: submit_query(query, begin_ts, end_ts)
    CC->>CC: validate timestamps & pack job (msgpack)
    CC->>MDB: INSERT INTO query_jobs -> returns query_id
    CC->>MGO: create collection "query_id" and insert metadata
    CC-->>Client: return query_id
  end

  rect rgb(255,250,235)
    note over Client,CC: Wait for completion (poll)
    Client->>CC: wait_query_completion(query_id, timeout?)
    loop every POLLING_INTERVAL_SECONDS
      CC->>MDB: SELECT status WHERE id = query_id
      MDB-->>CC: status
      alt status == SUCCEEDED
        CC-->>Client: return (success)
      else status in [PENDING,RUNNING,CANCELLING]
        CC->>CC: continue polling
      else status in [FAILED,CANCELLED,KILLED,unknown]
        CC-->>Client: raise RuntimeError
      end
    end
  end

  rect rgb(245,240,255)
    note over Client,CC: Read results
    Client->>CC: read_results(query_id)
    CC->>MGO: find documents (limit SEARCH_MAX_NUM_RESULTS)
    MGO-->>CC: docs[]
    CC-->>Client: docs[]
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title clearly summarises the main change by stating that a new ClpConnector feature has been added to the MCP server for query job submission and result retrieval. It uses the Conventional Commits style with an imperative verb and component scope, making it concise and informative.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Ruff (0.13.3)
components/clp-mcp-server/clp_mcp_server/clp_connector.py

�[1;31mruff failed�[0m
�[1mCause:�[0m Failed to load configuration /ruff.toml
�[1mCause:�[0m Failed to parse /ruff.toml
�[1mCause:�[0m TOML parse error at line 26, column 3
|
26 | "RSE100", # Use of assert detected
| ^^^^^^^^
Unknown rule selector: RSE100

components/clp-mcp-server/clp_mcp_server/settings.py

�[1;31mruff failed�[0m
�[1mCause:�[0m Failed to load configuration /ruff.toml
�[1mCause:�[0m Failed to parse /ruff.toml
�[1mCause:�[0m TOML parse error at line 26, column 3
|
26 | "RSE100", # Use of assert detected
| ^^^^^^^^
Unknown rule selector: RSE100

components/clp-mcp-server/tests/test_clp_connector.py

�[1;31mruff failed�[0m
�[1mCause:�[0m Failed to load configuration /ruff.toml
�[1mCause:�[0m Failed to parse /ruff.toml
�[1mCause:�[0m TOML parse error at line 26, column 3
|
26 | "RSE100", # Use of assert detected
| ^^^^^^^^
Unknown rule selector: RSE100


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ba34c27 and 8d5209ec5b85775989b52605aee17666fbd89b6a.

📒 Files selected for processing (5)
  • components/clp-mcp-server/clp_mcp_server/clp_connector.py (1 hunks)
  • components/clp-mcp-server/clp_mcp_server/constants.py (1 hunks)
  • components/clp-mcp-server/clp_mcp_server/settings.py (1 hunks)
  • components/clp-mcp-server/pyproject.toml (3 hunks)
  • components/clp-mcp-server/tests/test_clp_connector.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
  • QueryJobStatus (18-27)
  • QueryJobType (10-15)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
  • CLPConnector (22-176)
  • submit_query (40-97)
  • read_job_status (99-120)
  • wait_query_completion (122-157)
  • read_results (159-176)
components/clp-mcp-server/clp_mcp_server/constants.py (1)
  • QueryJobStatus (18-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: rust-checks (ubuntu-24.04)
  • GitHub Check: rust-checks (ubuntu-22.04)
  • GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (2)
components/clp-mcp-server/clp_mcp_server/constants.py (1)

5-27: LGTM: centralised constants and enums align with job-orchestration.

Values and ordering look consistent; starting at 0 matches DB usage.

components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)

74-83: aiomysql.connect usage: confirm async context manager support.

aiomysql.connect returns a coroutine; typical usage is conn = await aiomysql.connect(...); then async with conn.cursor().... Confirm that async with aiomysql.connect(...) is supported in your version; otherwise refactor.

Apply this safer pattern if needed:

-        async with aiomysql.connect(**self.db_conf) as conn, conn.cursor() as cur:
-            await cur.execute(
+        conn = await aiomysql.connect(**self.db_conf)
+        try:
+            async with conn.cursor() as cur:
+                await cur.execute(
                 "INSERT INTO query_jobs (type, job_config) VALUES (%s, %s);",
                 (int(QueryJobType.SEARCH_OR_AGGREGATION), job_config),
-            )
-            await conn.commit()
-            await cur.execute("SELECT LAST_INSERT_ID();")
-            result = await cur.fetchone()
-            query_id = result[0] if result else None
+                )
+                await conn.commit()
+                await cur.execute("SELECT LAST_INSERT_ID();")
+                result = await cur.fetchone()
+                query_id = result[0] if result else None
+        finally:
+            conn.close()

Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment on lines +99 to +121
async def read_job_status(self, query_id: str) -> int:
"""
Reads the job status of a query.

Raises:
aiomysql.Error: if there is an error connecting to or querying MariaDB
ValueError: when the query is not found

Args:
query_id: the ID of the query

"""
async with aiomysql.connect(**self.db_conf) as conn, conn.cursor() as cur:
await cur.execute("SELECT status FROM query_jobs WHERE id = %s;", (query_id,))
result = await cur.fetchone()
status = result[0] if result else None

if status is None:
err_msg = f"Query job with ID {query_id} not found."
raise ValueError(err_msg)

return status

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider returning a typed status and documenting it.

read_job_status returns int, later compared to IntEnum. For clarity, consider converting to QueryJobStatus with a fallback for unknown values.

Example:

status_val = result[0] if result else None
if status_val is None:
    raise ValueError(...)
try:
    return QueryJobStatus(status_val)
except ValueError:
    return status_val  # preserve unknown for caller to handle
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 99 to
121, read_job_status currently returns a raw int which callers compare to an
IntEnum; convert the retrieved int to the QueryJobStatus enum before returning
and document the return type, but preserve unknown/invalid numeric values by
catching the enum conversion error and returning the original int (or another
sentinel) so callers can handle unknown statuses; ensure QueryJobStatus is
imported/available, validate None as before, and update the docstring to state
the function returns either QueryJobStatus or int for unknown values.

Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py
Comment thread components/clp-mcp-server/pyproject.toml
Comment thread components/clp-mcp-server/tests/test_clp_connector.py Outdated
Comment thread components/clp-mcp-server/tests/test_clp_connector.py Outdated
Comment thread components/clp-mcp-server/tests/test_clp_connector.py Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
components/clp-mcp-server/pyproject.toml (1)

73-76: Remove I001 ignore now that package-relative imports are used.

Since clp_connector.py now uses package-relative imports (.constants, .settings), the I001 (unsorted imports) ignore is no longer necessary.

Apply this diff:

 [tool.ruff.lint.per-file-ignores]
     "tests/test_clp_connector.py" = ["INP001", "E402"]
     "tests/*.py" = ["S101"]
-    "clp_mcp_server/clp_connector.py" = ["I001"]
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8d5209ec5b85775989b52605aee17666fbd89b6a and 8549a3e6eb3c528d855c41443aaec30ac9a87382.

📒 Files selected for processing (5)
  • components/clp-mcp-server/clp_mcp_server/clp_connector.py (1 hunks)
  • components/clp-mcp-server/clp_mcp_server/constants.py (1 hunks)
  • components/clp-mcp-server/clp_mcp_server/settings.py (1 hunks)
  • components/clp-mcp-server/pyproject.toml (3 hunks)
  • components/clp-mcp-server/tests/test_clp_connector.py (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-20T05:38:30.720Z
Learnt from: junhaoliao
PR: y-scope/clp#1242
File: taskfile.yaml:545-547
Timestamp: 2025-08-20T05:38:30.720Z
Learning: The python component directories in the y-scope/clp repository are kept very clean and only contain essential files: source code directories, pyproject.toml, poetry.lock, README.md, and the dist/ directory. No cache directories, virtual environments, or other transient files exist in the component directories, so additional exclude patterns beyond "dist/**/*" are unnecessary.

Applied to files:

  • components/clp-mcp-server/pyproject.toml
🧬 Code graph analysis (2)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
  • CLPConnector (27-188)
  • submit_query (45-104)
  • read_job_status (106-130)
  • wait_query_completion (132-169)
  • read_results (171-188)
components/clp-mcp-server/clp_mcp_server/constants.py (1)
  • QueryJobStatus (18-27)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
  • QueryJobStatus (18-27)
  • QueryJobType (10-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: package-image
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: rust-checks (ubuntu-22.04)
  • GitHub Check: rust-checks (ubuntu-24.04)
  • GitHub Check: build (ubuntu-24.04)
🔇 Additional comments (9)
components/clp-mcp-server/pyproject.toml (1)

11-13: LGTM! Dependencies support async connector.

The pymongo, aiomysql, and msgpack dependencies align with the CLPConnector implementation for async query submission and result retrieval.

components/clp-mcp-server/tests/test_clp_connector.py (3)

10-39: LGTM! Skipped tests appropriately marked.

The skipped integration tests serve as useful placeholders and are correctly marked as requiring actual database connections.


101-103: LGTM! Correct async generator type annotation.

The async_gen function is properly annotated as AsyncIterator[dict], correctly reflecting its return type.


93-109: LGTM! Effective mock structure for read_results.

The test correctly mocks the async generator pattern and validates that read_results retrieves expected documents.

components/clp-mcp-server/clp_mcp_server/constants.py (1)

1-27: LGTM! Well-defined constants and enums.

The constants and IntEnum definitions are clear, well-documented, and appropriately scoped for the connector's query submission and status polling behaviour.

components/clp-mcp-server/clp_mcp_server/clp_connector.py (4)

1-28: LGTM! Correct import patterns used.

The imports follow best practices with the public AsyncMongoClient import from pymongo and package-relative imports for constants and settings.


30-43: LGTM! Constructor initializes DB clients correctly.

The initialization correctly sets up MongoDB and MariaDB configurations. Note: For improved testability, consider allowing optional client/config injection (see test file feedback).


45-104: LGTM! Robust query submission with proper validation.

The method correctly validates timestamps, handles the case where LAST_INSERT_ID() might fail, creates the MongoDB collection, and returns the query ID.


171-188: LGTM! Results properly limited to prevent OOM.

The method correctly applies SEARCH_MAX_NUM_RESULTS limit when reading from MongoDB, preventing potential out-of-memory issues.


"""
waiting_states = {QueryJobStatus.PENDING, QueryJobStatus.RUNNING, QueryJobStatus.CANCELLING}
start_time = asyncio.get_event_loop().time()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use asyncio.get_running_loop() instead of asyncio.get_event_loop().

asyncio.get_event_loop() is deprecated in Python 3.10+. Use asyncio.get_running_loop() for accessing the current running loop.

Apply this diff:

-        start_time = asyncio.get_event_loop().time()
+        start_time = asyncio.get_running_loop().time()

And at line 153:

-                if timeout and (asyncio.get_event_loop().time() - start_time) > timeout:
+                if timeout and (asyncio.get_running_loop().time() - start_time) > timeout:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
start_time = asyncio.get_event_loop().time()
async def wait_query_completion(
self,
query_id: str,
timeout: float | None = None,
) -> None:
"""
Waits for the query to complete, optionally timing out.
"""
waiting_states = {
QueryJobStatus.PENDING,
QueryJobStatus.RUNNING,
QueryJobStatus.CANCELLING,
}
start_time = asyncio.get_running_loop().time()
while True:
status = await self.read_job_status(query_id)
if status in waiting_states:
await asyncio.sleep(POLLING_INTERVAL_SECONDS)
if timeout and (asyncio.get_running_loop().time() - start_time) > timeout:
raise TimeoutError(f"Timed out waiting for query job {query_id} to complete.")
elif status == QueryJobStatus.SUCCEEDED:
break
elif status in {
QueryJobStatus.FAILED,
QueryJobStatus.CANCELLED,
QueryJobStatus.KILLED,
}:
raise RuntimeError(f"Query job {query_id} did not succeed, status={status}.")
else:
raise RuntimeError(f"Query job with ID {query_id} has unknown status {status}.")
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 147
and 153, replace asyncio.get_event_loop().time() with
asyncio.get_running_loop().time() to avoid using the deprecated
get_event_loop(); update both occurrences so the current running loop is used
(no additional logic required unless called outside a running loop, in which
case ensure the call occurs within async context).

Comment thread components/clp-mcp-server/tests/test_clp_connector.py
Comment thread components/clp-mcp-server/clp_mcp_server/settings.py Outdated
Comment thread components/clp-mcp-server/tests/test_clp_connector.py
Comment thread components/clp-mcp-server/pyproject.toml
Comment thread components/clp-mcp-server/pyproject.toml
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/clp_mcp_server/constants.py

@junhaoliao junhaoliao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for putting up the PR.

i posted a partial review - most comments are style / code organization related.

i'll take a deeper look at the exact logic once the posted comments are addressed. feel free to let me know if there's anything i can help with

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

♻️ Duplicate comments (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (2)

107-130: Consider returning QueryJobStatus for improved type safety.

While returning int is functional, converting to QueryJobStatus would provide better type safety and make the API more self-documenting. Handle unknown status values by preserving the raw int as a fallback.

Example:

-    async def read_job_status(self, query_id: str) -> int:
+    async def read_job_status(self, query_id: str) -> QueryJobStatus | int:
         """
         Reads the job status of a query.
         
         ...
-        :return: The status of the query.
-        :rtype: int
+        :return: The status of the query as QueryJobStatus, or raw int for unknown statuses.
+        :rtype: QueryJobStatus | int
         
         """
         async with aiomysql.connect(**self.db_conf) as conn, conn.cursor() as cur:
             await cur.execute("SELECT status FROM query_jobs WHERE id = %s;", (query_id,))
             result = await cur.fetchone()
             status = result[0] if result else None
         
         if status is None:
             err_msg = f"Query job with ID {query_id} not found."
             raise ValueError(err_msg)
         
-        return status
+        try:
+            return QueryJobStatus(status)
+        except ValueError:
+            return status  # Return raw int for unknown statuses

147-147: Replace deprecated asyncio.get_event_loop() with asyncio.get_running_loop().

asyncio.get_event_loop() is deprecated in Python 3.10+ when called from a running async context. Use asyncio.get_running_loop() instead.

Apply this diff:

-        start_time = asyncio.get_event_loop().time()
+        start_time = asyncio.get_running_loop().time()

And at line 153:

-                if timeout and (asyncio.get_event_loop().time() - start_time) > timeout:
+                if timeout and (asyncio.get_running_loop().time() - start_time) > timeout:

Based on past review comments.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8549a3e6eb3c528d855c41443aaec30ac9a87382 and 46a03f0.

📒 Files selected for processing (5)
  • components/clp-mcp-server/clp_mcp_server/clp_connector.py (1 hunks)
  • components/clp-mcp-server/clp_mcp_server/constants.py (1 hunks)
  • components/clp-mcp-server/clp_mcp_server/settings.py (1 hunks)
  • components/clp-mcp-server/pyproject.toml (3 hunks)
  • components/clp-mcp-server/tests/test_clp_connector.py (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-20T05:38:30.720Z
Learnt from: junhaoliao
PR: y-scope/clp#1242
File: taskfile.yaml:545-547
Timestamp: 2025-08-20T05:38:30.720Z
Learning: The python component directories in the y-scope/clp repository are kept very clean and only contain essential files: source code directories, pyproject.toml, poetry.lock, README.md, and the dist/ directory. No cache directories, virtual environments, or other transient files exist in the component directories, so additional exclude patterns beyond "dist/**/*" are unnecessary.

Applied to files:

  • components/clp-mcp-server/pyproject.toml
🧬 Code graph analysis (2)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
  • CLPConnector (27-188)
  • submit_query (45-105)
  • read_job_status (107-130)
  • wait_query_completion (132-169)
  • read_results (171-188)
components/clp-mcp-server/clp_mcp_server/constants.py (1)
  • QueryJobStatus (18-27)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
  • QueryJobStatus (18-27)
  • QueryJobType (10-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: package-image
  • GitHub Check: rust-checks (ubuntu-22.04)
  • GitHub Check: rust-checks (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: rust-checks (macos-15)
🔇 Additional comments (5)
components/clp-mcp-server/tests/test_clp_connector.py (4)

42-47: LGTM! Validation logic is correctly tested.

The test properly verifies that submit_query raises ValueError when end_ts < begin_ts. The match pattern ensures the error message is meaningful.

Note: Constructor side-effects (creating real DB clients) are addressed in the previous comment about dependency injection.


50-57: LGTM! Error propagation is correctly tested.

The test properly verifies that read_job_status raises ValueError for non-existent queries with the expected error message.


60-72: LGTM! Status progression is well-tested.

The test effectively simulates the query lifecycle (PENDING → RUNNING → SUCCEEDED) and verifies that wait_query_completion correctly polls until completion. Patching asyncio.sleep avoids unnecessary delays in tests.


93-109: LGTM! Results retrieval is properly tested.

The test effectively mocks the MongoDB collection and async iteration, verifying that read_results correctly retrieves and returns documents. The async generator pattern is well-implemented.

components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)

171-188: LGTM! Results retrieval is correctly implemented.

The method properly limits results to SEARCH_MAX_NUM_RESULTS and returns them as a list. The implementation correctly addresses past concerns about potential OOM issues by applying a limit.

Comment on lines +30 to +43
def __init__(self) -> None:
"""Initializes the CLPConnector with MongoDB and MariaDB configurations."""
mongo_url = f"mongodb://{CLP_RESULTS_CACHE_SERVICE_NAME}:{CLP_RESULTS_CACHE_PORT}/"
self.mongo_client = AsyncMongoClient(mongo_url)
self.results_cache = self.mongo_client[CLP_RESULTS_CACHE_CLP_DB_NAME]

# Configuration to be used in `aiomysql.connect` to MariaDB.
self.db_conf = {
"host": CLP_DB_SERVICE_NAME,
"port": CLP_DB_PORT,
"user": CLP_DB_USER,
"password": CLP_DB_PASS,
"db": CLP_DB_NAME,
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add explicit resource cleanup for MongoDB client.

The AsyncMongoClient is created in __init__ but never explicitly closed, potentially causing resource leaks in long-running processes. Consider:

  1. Add an async cleanup method:
async def close(self) -> None:
    """Closes the MongoDB client connection."""
    self.mongo_client.close()
  1. Document the lifecycle: Add a note in the class docstring that users should call close() when done, or use the connector as an async context manager:
async def __aenter__(self) -> "CLPConnector":
    """Async context manager entry."""
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Async context manager exit."""
    await self.close()

Then usage becomes:

async with CLPConnector() as connector:
    await connector.submit_query(...)
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 30 to
43, the AsyncMongoClient is created in __init__ but never closed; add an
explicit cleanup API and context-manager support: implement a close(self) method
that calls the Mongo client close(), add async __aenter__ and async __aexit__ so
the connector can be used with "async with" and ensure __aexit__ calls close(),
and update the class docstring to document that callers should call close() or
use the async context manager to release MongoDB resources.

err_msg = "Failed to retrieve the ID of the submitted query."
raise RuntimeError(err_msg)

await self.results_cache.create_collection(query_id)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Ensure idempotent collection creation.

create_collection(query_id) may raise an exception if the collection already exists (e.g., due to a retry after partial failure). Consider wrapping this in a try/except to ignore CollectionInvalid errors, or use list_collection_names() to check existence first.

Example:

-        await self.results_cache.create_collection(query_id)
+        try:
+            await self.results_cache.create_collection(query_id)
+        except pymongo.errors.CollectionInvalid:
+            # Collection already exists, ignore
+            pass
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await self.results_cache.create_collection(query_id)
# Create a collection in MongoDB where the name is the last_insert_id
try:
await self.results_cache.create_collection(query_id)
except pymongo.errors.CollectionInvalid:
# Collection already exists, ignore
pass
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around line 94, the
call to await self.results_cache.create_collection(query_id) can raise an
exception if the collection already exists; make creation idempotent by either
checking existence first (e.g., use results_cache.list_collection_names() or
equivalent and skip create if present) or wrapping the create_collection call in
a try/except that catches the specific "collection already exists" error (e.g.,
CollectionInvalid) and ignores it while re-raising other exceptions; ensure any
caught exception is handled silently or logged at debug level so retries are
safe.

Comment on lines +1 to +27
"""Constants for CLP MCP server."""

from enum import auto, IntEnum

SEARCH_MAX_NUM_RESULTS = 1000

POLLING_INTERVAL_SECONDS = 1


class QueryJobType(IntEnum):
"""Matching the `QueryJobType` class in `job_orchestration.query_scheduler.constants`."""

SEARCH_OR_AGGREGATION = 0
EXTRACT_IR = auto()
EXTRACT_JSON = auto()


class QueryJobStatus(IntEnum):
"""Matching the `QueryJobStatus` class in `job_orchestration.query_scheduler.constants`."""

PENDING = 0
RUNNING = auto()
SUCCEEDED = auto()
FAILED = auto()
CANCELLING = auto()
CANCELLED = auto()
KILLED = auto()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

🧩 Analysis chain

Verify alignment with job_orchestration constants and plan for consolidation.

The constants and enums duplicate definitions from job_orchestration.query_scheduler.constants, creating a maintenance risk if the canonical definitions evolve independently. Per past discussions, build system incompatibilities currently prevent direct imports.

Short-term: Verify that the enum values precisely match their counterparts.

Long-term: Consider migrating to a shared constants package or resolving the build system conflict to enable direct imports, eliminating duplication.

Based on past review comments.


🏁 Script executed:

#!/bin/bash
# Verify that constants match job_orchestration definitions
echo "=== Checking QueryJobType in job_orchestration ==="
ast-grep --pattern $'class QueryJobType($$$):
  $$$
'

echo ""
echo "=== Checking QueryJobStatus in job_orchestration ==="
ast-grep --pattern $'class QueryJobStatus($$$):
  $$$
'

echo ""
echo "=== Searching for SEARCH_MAX_NUM_RESULTS usage ==="
rg -n "SEARCH_MAX_NUM_RESULTS\s*=" --type py

echo ""
echo "=== Searching for POLLING_INTERVAL usage ==="
rg -n "POLLING_INTERVAL\w*\s*=" --type py

Length of output: 3345


Enums match canonical definitions; consider future consolidation

  • QueryJobType and QueryJobStatus in clp_mcp_server/constants.py exactly match those in job_orchestration.query_scheduler.constants.
  • Given current build-system constraints, duplication is acceptable; plan a future refactor to resolve the poetry vs. hatchling conflict and import these enums from a shared package.
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/constants.py lines 1-27, the
QueryJobType and QueryJobStatus enums duplicate canonical definitions in
job_orchestration.query_scheduler.constants; leave the enums as-is for now but
add a concise TODO comment above the enums stating they should be imported from
the shared package once the poetry vs. hatchling build-system conflict is
resolved, include a link or issue number for the follow-up refactor, and add a
one-line comment to remind maintainers to keep these enums in sync until the
refactor is done.

Comment on lines +5 to +13
CLP_DB_SERVICE_NAME = os.environ.get("CLP_DB_SERVICE_NAME", "db")
CLP_DB_PORT = int(os.environ.get("CLP_DB_PORT", "3306"))
CLP_DB_NAME = os.environ.get("CLP_DB_NAME", "clp-db")
CLP_DB_USER = os.environ.get("CLP_DB_USER", "clp-user")
CLP_DB_PASS = os.environ.get("CLP_DB_PASS", "<no_password_set>")

CLP_RESULTS_CACHE_SERVICE_NAME = os.environ.get("CLP_RESULTS_CACHE_SERVICE_NAME", "results-cache")
CLP_RESULTS_CACHE_PORT = int(os.environ.get("CLP_RESULTS_CACHE_PORT", "27017"))
CLP_RESULTS_CACHE_CLP_DB_NAME = os.environ.get("CLP_RESULTS_CACHE_CLP_DB_NAME", "clp-query-results")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Consider explicit validation for required credentials.

The sentinel default "<no_password_set>" for CLP_DB_PASS may cause confusing runtime connection errors when a password is actually required. Per past review discussions, consider:

Option 1: Fail fast if the password is unset:

CLP_DB_PASS = os.environ.get("CLP_DB_PASS")
if CLP_DB_PASS is None:
    raise ValueError("CLP_DB_PASS environment variable must be set")

Option 2: Use typed configuration (dataclass/pydantic) with required fields:

from dataclasses import dataclass

@dataclass
class CLPSettings:
    db_service_name: str = "db"
    db_port: int = 3306
    db_name: str = "clp-db"
    db_user: str = "clp-user"
    db_pass: str  # Required, no default
    
    @classmethod
    def from_env(cls) -> "CLPSettings":
        return cls(
            db_service_name=os.environ.get("CLP_DB_SERVICE_NAME", "db"),
            db_port=int(os.environ.get("CLP_DB_PORT", "3306")),
            db_name=os.environ.get("CLP_DB_NAME", "clp-db"),
            db_user=os.environ.get("CLP_DB_USER", "clp-user"),
            db_pass=os.environ["CLP_DB_PASS"],  # Raises KeyError if missing
        )

Based on past review comments.

🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/settings.py around lines 5 to 13,
the CLP_DB_PASS default of "<no_password_set>" can mask missing credentials and
cause confusing runtime errors; change to require an explicit password by
reading CLP_DB_PASS without a sentinel and fail-fast if absent (raise a
ValueError or let KeyError surface), or replace module-level values with a
typed/config object (dataclass or pydantic) where db_pass is a required field
populated from the environment so the application fails immediately with a clear
message when CLP_DB_PASS is not set.

Comment thread components/clp-mcp-server/pyproject.toml Outdated
Comment thread components/clp-mcp-server/pyproject.toml Outdated
Comment thread components/clp-mcp-server/tests/test_clp_connector.py Outdated
@20001020ycx 20001020ycx self-requested a review October 7, 2025 15:35
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/pyproject.toml
Comment thread components/clp-mcp-server/pyproject.toml
Comment thread components/clp-mcp-server/clp_mcp_server/constants.py
Comment thread components/clp-mcp-server/pyproject.toml Outdated
@All-less All-less marked this pull request as draft October 7, 2025 20:25
@All-less All-less marked this pull request as ready for review October 8, 2025 00:30
@All-less All-less requested a review from junhaoliao October 8, 2025 00:34
@All-less All-less requested a review from 20001020ycx October 8, 2025 00:39
Comment thread components/clp-mcp-server/pyproject.toml Outdated
Comment thread components/clp-mcp-server/pyproject.toml Outdated
Comment thread components/clp-mcp-server/tests/test_clp_connector.py
Comment thread components/clp-mcp-server/tests/test_clp_connector.py Outdated
Comment thread components/clp-mcp-server/tests/test_clp_connector.py Outdated
Comment thread components/clp-mcp-server/tests/test_clp_connector.py Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)

136-136: Fix timeout check to honour zero-second deadlines.

The condition if timeout and treats timeout=0 as falsy, preventing immediate timeout. Use if timeout is not None and to allow zero-second deadlines while still supporting timeout=None for no timeout.

Apply this diff:

-                if timeout and (event_loop.time() - start_time) > timeout:
+                if timeout is not None and (event_loop.time() - start_time) > timeout:
components/clp-mcp-server/tests/test_clp_connector.py (1)

94-98: Fix type annotation for exception class parameter.

The exc_type parameter should be typed as type[Exception] (a class type), not Exception (an instance type), since it's used with pytest.raises which expects an exception class.

Apply this diff:

 async def test_wait_query_completion_failure_cases(
         fail_status: QueryJobStatus,
-        exc_type: Exception,
+        exc_type: type[Exception],
         mock_clp_config: Any
     ) -> None:
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e9e1053994043a90e08c1bdfc5b1efd4bb3dfb82 and 22020e01214f15395a9d5aaf6e39801c7a783f34.

⛔ Files ignored due to path filters (1)
  • components/clp-mcp-server/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • components/clp-mcp-server/clp_mcp_server/clp_connector.py (1 hunks)
  • components/clp-mcp-server/pyproject.toml (3 hunks)
  • components/clp-mcp-server/tests/test_clp_connector.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
  • ClpConnector (22-162)
  • submit_query (40-94)
  • read_job_status (96-115)
  • wait_query_completion (117-146)
  • read_results (148-162)
components/clp-mcp-server/clp_mcp_server/constants.py (1)
  • QueryJobStatus (18-27)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
  • QueryJobStatus (18-27)
  • QueryJobType (10-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: package-image
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: rust-checks (ubuntu-22.04)
  • GitHub Check: rust-checks (ubuntu-24.04)
  • GitHub Check: rust-checks (macos-15)
🔇 Additional comments (9)
components/clp-mcp-server/pyproject.toml (2)

13-13: Verify msgpack version matches latest requirement.

Based on past review comments, the suggested version was msgpack>=1.1.2, but this line shows msgpack>=1.0.7. Please confirm whether >=1.1.2 is required or if >=1.0.7 is intentional.


77-81: LGTM on per-file ignores for tests.

The per-file ignores are appropriately scoped: INP001 allows implicit namespace packages for tests, and S101 permits assert statements in test cases.

components/clp-mcp-server/tests/test_clp_connector.py (4)

3-6: LGTM on imports and organization.

The imports are properly organized with standard library, third-party, and local imports clearly separated. Using from types import SimpleNamespace and proper typing annotations shows good practice.


13-19: LGTM on fixture design.

The mock_clp_config fixture using SimpleNamespace provides a clean, reusable mock configuration for tests without requiring actual database connections.


106-116: LGTM on async generator helper.

The _aiter helper cleanly converts an iterable to an async generator for testing. The TypeVar usage and type annotations are correct.


118-129: LGTM on read_results test.

The test properly mocks the MongoDB collection and uses patch.object to inject the mock into the connector's _results_cache, verifying the results are retrieved correctly.

components/clp-mcp-server/clp_mcp_server/clp_connector.py (3)

1-19: LGTM on imports and module structure.

The imports are correctly organized with standard library, third-party packages, and package-relative imports. Using from pymongo import AsyncMongoClient follows the recommended public API.


96-115: LGTM on read_job_status implementation.

The method correctly queries MariaDB for the job status and raises a clear ValueError when the query is not found. The error message is descriptive.


148-162: LGTM on read_results implementation.

The method correctly retrieves documents from the MongoDB collection with the SEARCH_MAX_NUM_RESULTS limit applied, preventing unbounded memory usage.

Comment on lines +22 to +38
class ClpConnector:
"""A connector class to interact with the CLP database and results cache."""

def __init__(self, clp_config: Any) -> None:
"""Initializes the ClpConnector with MongoDB and MariaDB configurations."""
mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/"
mongo_client = AsyncMongoClient(mongo_url)
self._results_cache = mongo_client[clp_config.results_cache.db_name]

# Configuration to be used in `aiomysql.connect` to MariaDB.
self._db_conf = {
"host": clp_config.database.host,
"port": clp_config.database.port,
"user": CLP_DB_USER,
"password": CLP_DB_PASS,
"db": clp_config.database.name,
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding resource cleanup for MongoDB client.

The AsyncMongoClient created in __init__ is never explicitly closed, which can cause resource leaks in long-running processes. Consider adding an async context manager protocol (__aenter__ and __aexit__) or a close() method to properly clean up the MongoDB connection.

Example implementation:

async def close(self) -> None:
    """Closes the MongoDB client connection."""
    self._mongo_client.close()

async def __aenter__(self) -> "ClpConnector":
    """Async context manager entry."""
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Async context manager exit."""
    await self.close()

Note: Store mongo_client as self._mongo_client to enable cleanup.

🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 22 to
38, the locally created AsyncMongoClient is not stored on the instance nor
closed, risking resource leaks; store the client as self._mongo_client, keep
self._results_cache as before, add a close(self) method that calls
self._mongo_client.close(), and implement async context manager methods async
def __aenter__(self) -> "ClpConnector": return self and async def
__aexit__(self, exc_type, exc, tb) -> None: self.close(); update callers to use
"async with ClpConnector(...)" or explicitly call await
connector.close()/connector.close() as appropriate.

Comment on lines +40 to +94
async def submit_query(self, query: str, begin_ts: int, end_ts: int) -> str:
"""
Submits a query to the CLP database and returns the ID of the query.

:param query: The query string.
:param begin_ts: The beginning timestamp of the query range.
:param end_ts: The end timestamp of the query range.
:raise ValueError: If ``end_ts`` is smaller than ``begin_ts``.
:raise aiomysql.Error: If there is an error connecting to or querying MariaDB.
:raise pymongo.errors.PyMongoError: If there is an error interacting with MongoDB.
:raise Exception: For any other unexpected errors.
:return: The ID assigned to the query.

"""
if end_ts < begin_ts:
err_msg = f"end_ts {end_ts} is smaller than begin_ts {begin_ts}."
raise ValueError(err_msg)

job_config = msgpack.packb(
{
"begin_timestamp": begin_ts,
"dataset": None,
"end_timestamp": end_ts,
"ignore_case": True,
"max_num_results": SEARCH_MAX_NUM_RESULTS,
"query_string": query,
}
)

async with aiomysql.connect(**self._db_conf) as conn, conn.cursor() as cur:
await cur.execute(
"INSERT INTO query_jobs (type, job_config) VALUES (%s, %s);",
(int(QueryJobType.SEARCH_OR_AGGREGATION), job_config),
)
await conn.commit()
await cur.execute("SELECT LAST_INSERT_ID();")
result = await cur.fetchone()
query_id = str(result[0]) if result else None

if query_id is None:
err_msg = "Failed to retrieve the ID of the submitted query."
raise RuntimeError(err_msg)

await self._results_cache.create_collection(query_id)

results_metadata_doc = {
"_id": str(query_id),
"errorMsg": None,
"errorName": None,
"lastSignal": "resp-querying",
"queryEngine": "clp",
}
await self._results_cache["results-metadata"].insert_one(results_metadata_doc)

return query_id

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

LGTM on submit_query implementation with a minor idempotency suggestion.

The method correctly validates timestamps, packs the job config, inserts into MariaDB, retrieves the query ID, and creates the MongoDB collection. The error handling is appropriate.

Minor suggestion: Consider making collection creation idempotent by catching pymongo.errors.CollectionInvalid (raised when the collection already exists) to support retry scenarios:

try:
    await self._results_cache.create_collection(query_id)
except pymongo.errors.CollectionInvalid:
    pass  # Collection already exists
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 40-94,
make the MongoDB collection creation idempotent by wrapping the await
self._results_cache.create_collection(query_id) call in a try/except that
catches pymongo.errors.CollectionInvalid and silently continues (or logs a debug
message) so retries won’t fail if the collection already exists; ensure
pymongo.errors is imported or referenced correctly.

Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/pyproject.toml Outdated
@All-less All-less force-pushed the clp-mcp-connector branch 2 times, most recently from 765bbd6 to d6676bb Compare October 9, 2025 00:02

@junhaoliao junhaoliao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the very last batch of comments (hopefully)

Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py
Comment thread components/clp-mcp-server/tests/test_clp_connector.py Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (5)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (4)

129-130: Replace deprecated get_event_loop() with get_running_loop().

asyncio.get_event_loop() is deprecated in Python 3.10+ and the project requires Python >=3.10. This issue was flagged in past reviews but remains unaddressed.

Apply this diff:

-        event_loop = asyncio.get_event_loop()
+        event_loop = asyncio.get_running_loop()
         start_time = event_loop.time()

Also update line 136:

-                if timeout and (event_loop.time() - start_time) > timeout:
+                if timeout and (event_loop.time() - start_time) > timeout:

83-83: Make collection creation idempotent for retry scenarios.

The create_collection call will raise an exception if the collection already exists, which can happen during retries. Based on past review comments, consider making this idempotent.

-        await self._results_cache.create_collection(query_id)
+        try:
+            await self._results_cache.create_collection(query_id)
+        except pymongo.errors.CollectionInvalid:
+            pass  # Collection already exists

Remember to import pymongo.errors if not already imported.


139-146: Simplify control flow by replacing elif with if.

Since the if block at line 134 contains continue and the block at line 140 contains break, control flow is already interrupted. The elif statements can be simplified to if statements for better readability.

Apply this diff:

-            elif status == QueryJobStatus.SUCCEEDED:
+            if status == QueryJobStatus.SUCCEEDED:
                 break
-            elif status in error_states:
+            if status in error_states:
                 err_msg = f"Query job with ID {query_id} ended in status {status.name}."
                 raise RuntimeError(err_msg)
-            else:
-                err_msg = f"Query job with ID {query_id} has unknown status {status}."
-                raise RuntimeError(err_msg)
+            
+            err_msg = f"Query job with ID {query_id} has unknown status {status}."
+            raise RuntimeError(err_msg)

136-136: Fix timeout check to honour zero-second deadlines.

The condition if timeout and ... treats timeout=0 as falsy, so a zero-second deadline will never time out. This issue was flagged in past reviews.

Apply this diff:

-                if timeout and (event_loop.time() - start_time) > timeout:
+                if timeout is not None and (event_loop.time() - start_time) > timeout:
                     err_msg = f"Timeout waiting for query job with ID {query_id} to complete."
                     raise TimeoutError(err_msg)
components/clp-mcp-server/tests/test_clp_connector.py (1)

88-98: Fix type annotation to accept both enum and int.

The fail_status parameter is annotated as QueryJobStatus, but the parametrize decorator includes 999 (an int) to test unknown status codes. This creates a type mismatch.

Apply this diff to fix the annotation:

 async def test_wait_query_completion_failure_cases(
-    fail_status: QueryJobStatus,
+    fail_status: int,
     exc_type: type[Exception],
     mock_clp_config: Any
 ) -> None:
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 22020e01214f15395a9d5aaf6e39801c7a783f34 and d6676bbc74b83d6bb5150036cb483eed96e66ede.

⛔ Files ignored due to path filters (1)
  • components/clp-mcp-server/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • components/clp-mcp-server/clp_mcp_server/clp_connector.py (1 hunks)
  • components/clp-mcp-server/pyproject.toml (3 hunks)
  • components/clp-mcp-server/tests/test_clp_connector.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
  • QueryJobStatus (18-27)
  • QueryJobType (10-15)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
  • ClpConnector (22-162)
  • submit_query (40-94)
  • read_job_status (96-115)
  • wait_query_completion (117-146)
  • read_results (148-162)
components/clp-mcp-server/clp_mcp_server/constants.py (1)
  • QueryJobStatus (18-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: package-image
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: rust-checks (ubuntu-24.04)
  • GitHub Check: rust-checks (ubuntu-22.04)
  • GitHub Check: rust-checks (macos-15)
🔇 Additional comments (1)
components/clp-mcp-server/tests/test_clp_connector.py (1)

118-129: LGTM on test implementation.

The test correctly mocks the MongoDB collection and uses the async generator helper to simulate document retrieval. The approach of patching _results_cache is appropriate for unit testing without requiring actual database connections.

Comment on lines +25 to +38
def __init__(self, clp_config: Any) -> None:
"""Initializes the ClpConnector with MongoDB and MariaDB configurations."""
mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/"
mongo_client = AsyncMongoClient(mongo_url)
self._results_cache = mongo_client[clp_config.results_cache.db_name]

# Configuration to be used in `aiomysql.connect` to MariaDB.
self._db_conf = {
"host": clp_config.database.host,
"port": clp_config.database.port,
"user": CLP_DB_USER,
"password": CLP_DB_PASS,
"db": clp_config.database.name,
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding resource cleanup for MongoDB client.

The AsyncMongoClient is created but not stored as an instance variable and never explicitly closed, which can cause resource leaks in long-running processes. Based on past review comments, consider adding a cleanup method or async context manager support.

Example implementation:

def __init__(self, clp_config: Any) -> None:
    """Initializes the ClpConnector with MongoDB and MariaDB configurations."""
    mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/"
    self._mongo_client = AsyncMongoClient(mongo_url)
    self._results_cache = self._mongo_client[clp_config.results_cache.db_name]
    # ... rest of init

async def close(self) -> None:
    """Closes the MongoDB client connection."""
    self._mongo_client.close()

async def __aenter__(self) -> "ClpConnector":
    """Async context manager entry."""
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Async context manager exit."""
    await self.close()
🤖 Prompt for AI Agents
components/clp-mcp-server/clp_mcp_server/clp_connector.py lines 25-38: The
AsyncMongoClient is instantiated but not stored or closed, risking resource
leaks; store the client on the instance (e.g., self._mongo_client =
AsyncMongoClient(...)) and use it to set self._results_cache, add a close method
that closes the client (call self._mongo_client.close()), and implement async
context manager support by adding async __aenter__ (return self) and async
__aexit__ (call/await close) so callers can use "async with" or explicitly call
close to release resources.

junhaoliao
junhaoliao previously approved these changes Oct 9, 2025

@junhaoliao junhaoliao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's one more suggestion (sorry for not raising it earlier), which is optional to be applied

for the title, how about:

feat(mcp-server): Add ClpConnector for query job submission and result retrieval.

Comment thread components/clp-mcp-server/clp_mcp_server/clp_connector.py Outdated
@All-less All-less changed the title feat(mcp-server): Add connector between MCP server and CLP package feat(mcp-server): Add ClpConnector for query job submission and result retrieval. Oct 9, 2025

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (4)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (3)

82-82: Consider making collection creation idempotent.

If submit_query is retried (e.g., after a partial failure), create_collection will raise pymongo.errors.CollectionInvalid. Consider wrapping the call to handle existing collections gracefully.

Example:

+        try:
             await self._results_cache.create_collection(query_id)
+        except pymongo.errors.CollectionInvalid:
+            pass  # Collection already exists, safe to continue

141-141: Fix timeout check to handle zero-second deadlines.

The condition if timeout and (event_loop.time() - start_time) > timeout: treats timeout=0 as falsy, so a zero-second deadline will never fire. Change the guard to explicitly check for None.

Apply this diff:

-            if timeout and (event_loop.time() - start_time) > timeout:
+            if timeout is not None and (event_loop.time() - start_time) > timeout:
                 err_msg = f"Timeout waiting for query job with ID {query_id} to complete."
                 raise TimeoutError(err_msg)

127-128: Replace deprecated get_event_loop() with get_running_loop().

asyncio.get_event_loop() is deprecated in Python 3.10+ and the project requires Python >=3.10. Use asyncio.get_running_loop() to access the currently running event loop.

Apply this diff:

-        event_loop = asyncio.get_event_loop()
+        event_loop = asyncio.get_running_loop()
         start_time = event_loop.time()
components/clp-mcp-server/tests/test_clp_connector.py (1)

94-98: Fix type annotation to match parametrized test values.

The parametrized test includes 999 (an int) which is not a member of the QueryJobStatus enum, but fail_status is annotated as QueryJobStatus. This causes a type mismatch.

Apply this diff:

 async def test_wait_query_completion_failure_cases(
-    fail_status: QueryJobStatus,
+    fail_status: int,
     exc_type: type[Exception],
     mock_clp_config: Any
 ) -> None:
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d57200a1d7c22ed323cd4d657eeb93dd5db3d314 and 342a802.

⛔ Files ignored due to path filters (1)
  • components/clp-mcp-server/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (3)
  • components/clp-mcp-server/clp_mcp_server/clp_connector.py (1 hunks)
  • components/clp-mcp-server/pyproject.toml (3 hunks)
  • components/clp-mcp-server/tests/test_clp_connector.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
  • ClpConnector (22-160)
  • submit_query (40-93)
  • read_job_status (95-113)
  • wait_query_completion (115-145)
  • read_results (147-160)
components/clp-mcp-server/clp_mcp_server/constants.py (1)
  • QueryJobStatus (18-27)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
  • QueryJobStatus (18-27)
  • QueryJobType (10-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: package-image
  • GitHub Check: rust-checks (ubuntu-24.04)
  • GitHub Check: rust-checks (ubuntu-22.04)
  • GitHub Check: rust-checks (macos-15)
  • GitHub Check: lint-check (macos-15)

Comment on lines +25 to +29
def __init__(self, clp_config: Any) -> None:
"""Initializes the ClpConnector with MongoDB and MariaDB configurations."""
mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/"
mongo_client = AsyncMongoClient(mongo_url)
self._results_cache = mongo_client[clp_config.results_cache.db_name]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Store mongo_client for proper resource cleanup.

The AsyncMongoClient is created but not stored on the instance, preventing explicit cleanup. Past reviews suggested adding a close() method and async context manager support.

Apply this diff to store the client and enable cleanup:

     def __init__(self, clp_config: Any) -> None:
         """Initializes the ClpConnector with MongoDB and MariaDB configurations."""
         mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/"
-        mongo_client = AsyncMongoClient(mongo_url)
-        self._results_cache = mongo_client[clp_config.results_cache.db_name]
+        self._mongo_client = AsyncMongoClient(mongo_url)
+        self._results_cache = self._mongo_client[clp_config.results_cache.db_name]

Then add cleanup methods:

async def close(self) -> None:
    """Closes the MongoDB client connection."""
    self._mongo_client.close()

async def __aenter__(self) -> "ClpConnector":
    """Async context manager entry."""
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Async context manager exit."""
    await self.close()
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 25 to
29, the AsyncMongoClient is created but not saved to the instance so it cannot
be closed; update the __init__ to assign the client to self._mongo_client before
deriving self._results_cache, and add async cleanup/context support by
implementing async def close(self) -> None that calls
self._mongo_client.close(), plus async def __aenter__(self) -> "ClpConnector"
returning self and async def __aexit__(self, exc_type, exc_val, exc_tb) -> None
that awaits self.close(); ensure attribute names match and no blocking calls are
introduced.

"ruff>=0.11.12",
"pytest>=8.4.1",
"pytest-env>=1.1.5",
"pytest-asyncio>=1.2.0",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix impossible pytest-asyncio constraint

pytest-asyncio hasn't shipped any 1.x releases—the latest published wheels on PyPI are still 0.23.x. Requiring >=1.2.0 makes the dev environment unsatisfiable, so installs and CI will fail. Drop the floor to the highest tested 0.x release (e.g., >=0.23.6) or pin exactly if you prefer repeatability.

-    "pytest-asyncio>=1.2.0",
+    "pytest-asyncio>=0.23.6",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"pytest-asyncio>=1.2.0",
"pytest-asyncio>=0.23.6",
🤖 Prompt for AI Agents
In components/clp-mcp-server/pyproject.toml around line 29, the pytest-asyncio
version constraint ">=1.2.0" is impossible because pytest-asyncio has no 1.x
releases; update the constraint to a valid released version such as ">=0.23.6"
(or pin to an exact 0.23.x like "==0.23.6" for repeatability) so the dev
environment and CI can install successfully.

@All-less All-less requested a review from junhaoliao October 9, 2025 16:21

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (3)

25-29: Store mongo_client for proper resource cleanup.

The AsyncMongoClient is created but not stored on the instance, preventing explicit cleanup. Past reviews flagged this and suggested adding a close() method and async context manager support.

Apply this diff to store the client and enable cleanup:

     def __init__(self, clp_config: Any) -> None:
         """Initializes the ClpConnector with MongoDB and MariaDB configurations."""
         mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/"
-        mongo_client = AsyncMongoClient(mongo_url)
-        self._results_cache = mongo_client[clp_config.results_cache.db_name]
+        self._mongo_client = AsyncMongoClient(mongo_url)
+        self._results_cache = self._mongo_client[clp_config.results_cache.db_name]

Then add cleanup methods:

async def close(self) -> None:
    """Closes the MongoDB client connection."""
    self._mongo_client.close()

async def __aenter__(self) -> "ClpConnector":
    """Async context manager entry."""
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
    """Async context manager exit."""
    await self.close()

82-82: Consider making collection creation idempotent.

create_collection(query_id) may raise an exception if the collection already exists (e.g., due to a retry after partial failure). Wrapping this in a try-except to ignore CollectionInvalid errors improves retry safety.

Example:

-        await self._results_cache.create_collection(query_id)
+        try:
+            await self._results_cache.create_collection(query_id)
+        except pymongo.errors.CollectionInvalid:
+            pass  # Collection already exists

141-141: Fix timeout guard to honour zero-second deadlines.

Line 141 still has the bug flagged in past reviews: timeout and treats timeout=0 as falsy, so the loop never times out when timeout is 0. Update the guard to differentiate 0 from None.

Apply this diff:

-            if timeout and (event_loop.time() - start_time) > timeout:
+            if timeout is not None and (event_loop.time() - start_time) > timeout:
components/clp-mcp-server/clp_mcp_server/settings.py (1)

6-6: Consider explicit validation for required credentials.

The sentinel default "<no_password_set>" for CLP_DB_PASS may cause confusing runtime connection errors when a password is actually required. Past reviews suggested failing fast if the password is unset.

Option 1 - Fail fast if password is unset:

CLP_DB_PASS = os.environ.get("CLP_DB_PASS")
if CLP_DB_PASS is None:
    raise ValueError("CLP_DB_PASS environment variable must be set")

Option 2 - Use no default and let KeyError surface:

CLP_DB_PASS = os.environ["CLP_DB_PASS"]  # Raises KeyError if missing
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 342a802 and 3b47288.

📒 Files selected for processing (3)
  • components/clp-mcp-server/clp_mcp_server/clp_connector.py (1 hunks)
  • components/clp-mcp-server/clp_mcp_server/settings.py (1 hunks)
  • components/clp-mcp-server/tests/test_clp_connector.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
  • QueryJobStatus (18-27)
  • QueryJobType (10-15)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
  • ClpConnector (22-160)
  • submit_query (40-93)
  • read_job_status (95-113)
  • wait_query_completion (115-145)
  • read_results (147-160)
components/clp-mcp-server/clp_mcp_server/constants.py (1)
  • QueryJobStatus (18-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: rust-checks (ubuntu-24.04)
  • GitHub Check: rust-checks (ubuntu-22.04)
🔇 Additional comments (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)

147-160: LGTM on read_results implementation.

The method correctly applies the SEARCH_MAX_NUM_RESULTS limit and returns a bounded list, addressing past concerns about potential OOM issues.

components/clp-mcp-server/tests/test_clp_connector.py (1)

58-133: LGTM on test implementation.

The tests provide good coverage for validation, error cases, and success scenarios. The use of mocks, parametrization, and the _aiter helper is appropriate. Only minor type annotation issue flagged separately.

(999, RuntimeError), # unknown status
])
async def test_wait_query_completion_failure_cases(
fail_status: QueryJobStatus,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Type annotation mismatch in parametrised test.

Line 99 annotates fail_status: QueryJobStatus, but line 96 includes 999 (an int) which is not a member of the enum. The annotation should be int to match the actual parameter values.

Apply this diff:

-    fail_status: QueryJobStatus,
+    fail_status: int,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fail_status: QueryJobStatus,
async def test_wait_query_completion_failure_cases(
fail_status: int,
exc_type: Exception
) -> None:
...
🤖 Prompt for AI Agents
In components/clp-mcp-server/tests/test_clp_connector.py around line 99 the test
parameter is annotated as "fail_status: QueryJobStatus" while the parametrized
values include an int (999) which is not a member of the enum; change the
annotation to "fail_status: int" (or a union like "int | QueryJobStatus" if
other cases include enum members) so the type matches the actual parameter
values.

@All-less All-less merged commit f8fcd0e into y-scope:main Oct 9, 2025
22 checks passed
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants