feat(mcp-server): Add ClpConnector for query job submission and result retrieval.#1388
Conversation
WalkthroughAdds a new ClpConnector class integrating MariaDB and MongoDB results cache, introduces constants/enums for job types/statuses, environment-driven DB credentials, dependency updates in pyproject, and async unit tests covering validation, polling, error handling, and results retrieval. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant CC as ClpConnector
participant MDB as MariaDB
participant MGO as MongoDB_Results
rect rgb(235,245,255)
note over CC: Initialization
Client->>CC: __init__(clp_config)
CC->>MGO: connect to results DB
CC->>MDB: configure MariaDB client
end
rect rgb(240,255,240)
note over Client,CC: Submit query
Client->>CC: submit_query(query, begin_ts, end_ts)
CC->>CC: validate timestamps & pack job (msgpack)
CC->>MDB: INSERT INTO query_jobs -> returns query_id
CC->>MGO: create collection "query_id" and insert metadata
CC-->>Client: return query_id
end
rect rgb(255,250,235)
note over Client,CC: Wait for completion (poll)
Client->>CC: wait_query_completion(query_id, timeout?)
loop every POLLING_INTERVAL_SECONDS
CC->>MDB: SELECT status WHERE id = query_id
MDB-->>CC: status
alt status == SUCCEEDED
CC-->>Client: return (success)
else status in [PENDING,RUNNING,CANCELLING]
CC->>CC: continue polling
else status in [FAILED,CANCELLED,KILLED,unknown]
CC-->>Client: raise RuntimeError
end
end
end
rect rgb(245,240,255)
note over Client,CC: Read results
Client->>CC: read_results(query_id)
CC->>MGO: find documents (limit SEARCH_MAX_NUM_RESULTS)
MGO-->>CC: docs[]
CC-->>Client: docs[]
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 Ruff (0.13.3)components/clp-mcp-server/clp_mcp_server/clp_connector.py�[1;31mruff failed�[0m components/clp-mcp-server/clp_mcp_server/settings.py�[1;31mruff failed�[0m components/clp-mcp-server/tests/test_clp_connector.py�[1;31mruff failed�[0m Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📥 Commits
Reviewing files that changed from the base of the PR and between ba34c27 and 8d5209ec5b85775989b52605aee17666fbd89b6a.
📒 Files selected for processing (5)
components/clp-mcp-server/clp_mcp_server/clp_connector.py(1 hunks)components/clp-mcp-server/clp_mcp_server/constants.py(1 hunks)components/clp-mcp-server/clp_mcp_server/settings.py(1 hunks)components/clp-mcp-server/pyproject.toml(3 hunks)components/clp-mcp-server/tests/test_clp_connector.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
QueryJobStatus(18-27)QueryJobType(10-15)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
CLPConnector(22-176)submit_query(40-97)read_job_status(99-120)wait_query_completion(122-157)read_results(159-176)components/clp-mcp-server/clp_mcp_server/constants.py (1)
QueryJobStatus(18-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (2)
components/clp-mcp-server/clp_mcp_server/constants.py (1)
5-27: LGTM: centralised constants and enums align with job-orchestration.Values and ordering look consistent; starting at 0 matches DB usage.
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
74-83: aiomysql.connect usage: confirm async context manager support.aiomysql.connect returns a coroutine; typical usage is conn = await aiomysql.connect(...); then async with conn.cursor().... Confirm that async with aiomysql.connect(...) is supported in your version; otherwise refactor.
Apply this safer pattern if needed:
- async with aiomysql.connect(**self.db_conf) as conn, conn.cursor() as cur: - await cur.execute( + conn = await aiomysql.connect(**self.db_conf) + try: + async with conn.cursor() as cur: + await cur.execute( "INSERT INTO query_jobs (type, job_config) VALUES (%s, %s);", (int(QueryJobType.SEARCH_OR_AGGREGATION), job_config), - ) - await conn.commit() - await cur.execute("SELECT LAST_INSERT_ID();") - result = await cur.fetchone() - query_id = result[0] if result else None + ) + await conn.commit() + await cur.execute("SELECT LAST_INSERT_ID();") + result = await cur.fetchone() + query_id = result[0] if result else None + finally: + conn.close()
| async def read_job_status(self, query_id: str) -> int: | ||
| """ | ||
| Reads the job status of a query. | ||
|
|
||
| Raises: | ||
| aiomysql.Error: if there is an error connecting to or querying MariaDB | ||
| ValueError: when the query is not found | ||
|
|
||
| Args: | ||
| query_id: the ID of the query | ||
|
|
||
| """ | ||
| async with aiomysql.connect(**self.db_conf) as conn, conn.cursor() as cur: | ||
| await cur.execute("SELECT status FROM query_jobs WHERE id = %s;", (query_id,)) | ||
| result = await cur.fetchone() | ||
| status = result[0] if result else None | ||
|
|
||
| if status is None: | ||
| err_msg = f"Query job with ID {query_id} not found." | ||
| raise ValueError(err_msg) | ||
|
|
||
| return status | ||
|
|
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider returning a typed status and documenting it.
read_job_status returns int, later compared to IntEnum. For clarity, consider converting to QueryJobStatus with a fallback for unknown values.
Example:
status_val = result[0] if result else None
if status_val is None:
raise ValueError(...)
try:
return QueryJobStatus(status_val)
except ValueError:
return status_val # preserve unknown for caller to handle🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 99 to
121, read_job_status currently returns a raw int which callers compare to an
IntEnum; convert the retrieved int to the QueryJobStatus enum before returning
and document the return type, but preserve unknown/invalid numeric values by
catching the enum conversion error and returning the original int (or another
sentinel) so callers can handle unknown statuses; ensure QueryJobStatus is
imported/available, validate None as before, and update the docstring to state
the function returns either QueryJobStatus or int for unknown values.
8d5209e to
8549a3e
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
components/clp-mcp-server/pyproject.toml (1)
73-76: Remove I001 ignore now that package-relative imports are used.Since
clp_connector.pynow uses package-relative imports (.constants,.settings), the I001 (unsorted imports) ignore is no longer necessary.Apply this diff:
[tool.ruff.lint.per-file-ignores] "tests/test_clp_connector.py" = ["INP001", "E402"] "tests/*.py" = ["S101"] - "clp_mcp_server/clp_connector.py" = ["I001"]
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📥 Commits
Reviewing files that changed from the base of the PR and between 8d5209ec5b85775989b52605aee17666fbd89b6a and 8549a3e6eb3c528d855c41443aaec30ac9a87382.
📒 Files selected for processing (5)
components/clp-mcp-server/clp_mcp_server/clp_connector.py(1 hunks)components/clp-mcp-server/clp_mcp_server/constants.py(1 hunks)components/clp-mcp-server/clp_mcp_server/settings.py(1 hunks)components/clp-mcp-server/pyproject.toml(3 hunks)components/clp-mcp-server/tests/test_clp_connector.py(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-20T05:38:30.720Z
Learnt from: junhaoliao
PR: y-scope/clp#1242
File: taskfile.yaml:545-547
Timestamp: 2025-08-20T05:38:30.720Z
Learning: The python component directories in the y-scope/clp repository are kept very clean and only contain essential files: source code directories, pyproject.toml, poetry.lock, README.md, and the dist/ directory. No cache directories, virtual environments, or other transient files exist in the component directories, so additional exclude patterns beyond "dist/**/*" are unnecessary.
Applied to files:
components/clp-mcp-server/pyproject.toml
🧬 Code graph analysis (2)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
CLPConnector(27-188)submit_query(45-104)read_job_status(106-130)wait_query_completion(132-169)read_results(171-188)components/clp-mcp-server/clp_mcp_server/constants.py (1)
QueryJobStatus(18-27)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
QueryJobStatus(18-27)QueryJobType(10-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: package-image
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: build (ubuntu-24.04)
🔇 Additional comments (9)
components/clp-mcp-server/pyproject.toml (1)
11-13: LGTM! Dependencies support async connector.The pymongo, aiomysql, and msgpack dependencies align with the CLPConnector implementation for async query submission and result retrieval.
components/clp-mcp-server/tests/test_clp_connector.py (3)
10-39: LGTM! Skipped tests appropriately marked.The skipped integration tests serve as useful placeholders and are correctly marked as requiring actual database connections.
101-103: LGTM! Correct async generator type annotation.The
async_genfunction is properly annotated asAsyncIterator[dict], correctly reflecting its return type.
93-109: LGTM! Effective mock structure for read_results.The test correctly mocks the async generator pattern and validates that
read_resultsretrieves expected documents.components/clp-mcp-server/clp_mcp_server/constants.py (1)
1-27: LGTM! Well-defined constants and enums.The constants and
IntEnumdefinitions are clear, well-documented, and appropriately scoped for the connector's query submission and status polling behaviour.components/clp-mcp-server/clp_mcp_server/clp_connector.py (4)
1-28: LGTM! Correct import patterns used.The imports follow best practices with the public
AsyncMongoClientimport frompymongoand package-relative imports for constants and settings.
30-43: LGTM! Constructor initializes DB clients correctly.The initialization correctly sets up MongoDB and MariaDB configurations. Note: For improved testability, consider allowing optional client/config injection (see test file feedback).
45-104: LGTM! Robust query submission with proper validation.The method correctly validates timestamps, handles the case where
LAST_INSERT_ID()might fail, creates the MongoDB collection, and returns the query ID.
171-188: LGTM! Results properly limited to prevent OOM.The method correctly applies
SEARCH_MAX_NUM_RESULTSlimit when reading from MongoDB, preventing potential out-of-memory issues.
|
|
||
| """ | ||
| waiting_states = {QueryJobStatus.PENDING, QueryJobStatus.RUNNING, QueryJobStatus.CANCELLING} | ||
| start_time = asyncio.get_event_loop().time() |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Use asyncio.get_running_loop() instead of asyncio.get_event_loop().
asyncio.get_event_loop() is deprecated in Python 3.10+. Use asyncio.get_running_loop() for accessing the current running loop.
Apply this diff:
- start_time = asyncio.get_event_loop().time()
+ start_time = asyncio.get_running_loop().time()And at line 153:
- if timeout and (asyncio.get_event_loop().time() - start_time) > timeout:
+ if timeout and (asyncio.get_running_loop().time() - start_time) > timeout:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| start_time = asyncio.get_event_loop().time() | |
| async def wait_query_completion( | |
| self, | |
| query_id: str, | |
| timeout: float | None = None, | |
| ) -> None: | |
| """ | |
| Waits for the query to complete, optionally timing out. | |
| """ | |
| waiting_states = { | |
| QueryJobStatus.PENDING, | |
| QueryJobStatus.RUNNING, | |
| QueryJobStatus.CANCELLING, | |
| } | |
| start_time = asyncio.get_running_loop().time() | |
| while True: | |
| status = await self.read_job_status(query_id) | |
| if status in waiting_states: | |
| await asyncio.sleep(POLLING_INTERVAL_SECONDS) | |
| if timeout and (asyncio.get_running_loop().time() - start_time) > timeout: | |
| raise TimeoutError(f"Timed out waiting for query job {query_id} to complete.") | |
| elif status == QueryJobStatus.SUCCEEDED: | |
| break | |
| elif status in { | |
| QueryJobStatus.FAILED, | |
| QueryJobStatus.CANCELLED, | |
| QueryJobStatus.KILLED, | |
| }: | |
| raise RuntimeError(f"Query job {query_id} did not succeed, status={status}.") | |
| else: | |
| raise RuntimeError(f"Query job with ID {query_id} has unknown status {status}.") |
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 147
and 153, replace asyncio.get_event_loop().time() with
asyncio.get_running_loop().time() to avoid using the deprecated
get_event_loop(); update both occurrences so the current running loop is used
(no additional logic required unless called outside a running loop, in which
case ensure the call occurs within async context).
junhaoliao
left a comment
There was a problem hiding this comment.
thanks for putting up the PR.
i posted a partial review - most comments are style / code organization related.
i'll take a deeper look at the exact logic once the posted comments are addressed. feel free to let me know if there's anything i can help with
8549a3e to
46a03f0
Compare
There was a problem hiding this comment.
Actionable comments posted: 9
♻️ Duplicate comments (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (2)
107-130: Consider returning QueryJobStatus for improved type safety.While returning
intis functional, converting toQueryJobStatuswould provide better type safety and make the API more self-documenting. Handle unknown status values by preserving the raw int as a fallback.Example:
- async def read_job_status(self, query_id: str) -> int: + async def read_job_status(self, query_id: str) -> QueryJobStatus | int: """ Reads the job status of a query. ... - :return: The status of the query. - :rtype: int + :return: The status of the query as QueryJobStatus, or raw int for unknown statuses. + :rtype: QueryJobStatus | int """ async with aiomysql.connect(**self.db_conf) as conn, conn.cursor() as cur: await cur.execute("SELECT status FROM query_jobs WHERE id = %s;", (query_id,)) result = await cur.fetchone() status = result[0] if result else None if status is None: err_msg = f"Query job with ID {query_id} not found." raise ValueError(err_msg) - return status + try: + return QueryJobStatus(status) + except ValueError: + return status # Return raw int for unknown statuses
147-147: Replace deprecatedasyncio.get_event_loop()withasyncio.get_running_loop().
asyncio.get_event_loop()is deprecated in Python 3.10+ when called from a running async context. Useasyncio.get_running_loop()instead.Apply this diff:
- start_time = asyncio.get_event_loop().time() + start_time = asyncio.get_running_loop().time()And at line 153:
- if timeout and (asyncio.get_event_loop().time() - start_time) > timeout: + if timeout and (asyncio.get_running_loop().time() - start_time) > timeout:Based on past review comments.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📥 Commits
Reviewing files that changed from the base of the PR and between 8549a3e6eb3c528d855c41443aaec30ac9a87382 and 46a03f0.
📒 Files selected for processing (5)
components/clp-mcp-server/clp_mcp_server/clp_connector.py(1 hunks)components/clp-mcp-server/clp_mcp_server/constants.py(1 hunks)components/clp-mcp-server/clp_mcp_server/settings.py(1 hunks)components/clp-mcp-server/pyproject.toml(3 hunks)components/clp-mcp-server/tests/test_clp_connector.py(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-20T05:38:30.720Z
Learnt from: junhaoliao
PR: y-scope/clp#1242
File: taskfile.yaml:545-547
Timestamp: 2025-08-20T05:38:30.720Z
Learning: The python component directories in the y-scope/clp repository are kept very clean and only contain essential files: source code directories, pyproject.toml, poetry.lock, README.md, and the dist/ directory. No cache directories, virtual environments, or other transient files exist in the component directories, so additional exclude patterns beyond "dist/**/*" are unnecessary.
Applied to files:
components/clp-mcp-server/pyproject.toml
🧬 Code graph analysis (2)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
CLPConnector(27-188)submit_query(45-105)read_job_status(107-130)wait_query_completion(132-169)read_results(171-188)components/clp-mcp-server/clp_mcp_server/constants.py (1)
QueryJobStatus(18-27)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
QueryJobStatus(18-27)QueryJobType(10-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: package-image
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
- GitHub Check: rust-checks (macos-15)
🔇 Additional comments (5)
components/clp-mcp-server/tests/test_clp_connector.py (4)
42-47: LGTM! Validation logic is correctly tested.The test properly verifies that
submit_queryraisesValueErrorwhenend_ts < begin_ts. The match pattern ensures the error message is meaningful.Note: Constructor side-effects (creating real DB clients) are addressed in the previous comment about dependency injection.
50-57: LGTM! Error propagation is correctly tested.The test properly verifies that
read_job_statusraisesValueErrorfor non-existent queries with the expected error message.
60-72: LGTM! Status progression is well-tested.The test effectively simulates the query lifecycle (PENDING → RUNNING → SUCCEEDED) and verifies that
wait_query_completioncorrectly polls until completion. Patchingasyncio.sleepavoids unnecessary delays in tests.
93-109: LGTM! Results retrieval is properly tested.The test effectively mocks the MongoDB collection and async iteration, verifying that
read_resultscorrectly retrieves and returns documents. The async generator pattern is well-implemented.components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
171-188: LGTM! Results retrieval is correctly implemented.The method properly limits results to
SEARCH_MAX_NUM_RESULTSand returns them as a list. The implementation correctly addresses past concerns about potential OOM issues by applying a limit.
| def __init__(self) -> None: | ||
| """Initializes the CLPConnector with MongoDB and MariaDB configurations.""" | ||
| mongo_url = f"mongodb://{CLP_RESULTS_CACHE_SERVICE_NAME}:{CLP_RESULTS_CACHE_PORT}/" | ||
| self.mongo_client = AsyncMongoClient(mongo_url) | ||
| self.results_cache = self.mongo_client[CLP_RESULTS_CACHE_CLP_DB_NAME] | ||
|
|
||
| # Configuration to be used in `aiomysql.connect` to MariaDB. | ||
| self.db_conf = { | ||
| "host": CLP_DB_SERVICE_NAME, | ||
| "port": CLP_DB_PORT, | ||
| "user": CLP_DB_USER, | ||
| "password": CLP_DB_PASS, | ||
| "db": CLP_DB_NAME, | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Add explicit resource cleanup for MongoDB client.
The AsyncMongoClient is created in __init__ but never explicitly closed, potentially causing resource leaks in long-running processes. Consider:
- Add an async cleanup method:
async def close(self) -> None:
"""Closes the MongoDB client connection."""
self.mongo_client.close()- Document the lifecycle: Add a note in the class docstring that users should call
close()when done, or use the connector as an async context manager:
async def __aenter__(self) -> "CLPConnector":
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit."""
await self.close()Then usage becomes:
async with CLPConnector() as connector:
await connector.submit_query(...)🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 30 to
43, the AsyncMongoClient is created in __init__ but never closed; add an
explicit cleanup API and context-manager support: implement a close(self) method
that calls the Mongo client close(), add async __aenter__ and async __aexit__ so
the connector can be used with "async with" and ensure __aexit__ calls close(),
and update the class docstring to document that callers should call close() or
use the async context manager to release MongoDB resources.
| err_msg = "Failed to retrieve the ID of the submitted query." | ||
| raise RuntimeError(err_msg) | ||
|
|
||
| await self.results_cache.create_collection(query_id) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Ensure idempotent collection creation.
create_collection(query_id) may raise an exception if the collection already exists (e.g., due to a retry after partial failure). Consider wrapping this in a try/except to ignore CollectionInvalid errors, or use list_collection_names() to check existence first.
Example:
- await self.results_cache.create_collection(query_id)
+ try:
+ await self.results_cache.create_collection(query_id)
+ except pymongo.errors.CollectionInvalid:
+ # Collection already exists, ignore
+ pass📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await self.results_cache.create_collection(query_id) | |
| # Create a collection in MongoDB where the name is the last_insert_id | |
| try: | |
| await self.results_cache.create_collection(query_id) | |
| except pymongo.errors.CollectionInvalid: | |
| # Collection already exists, ignore | |
| pass |
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around line 94, the
call to await self.results_cache.create_collection(query_id) can raise an
exception if the collection already exists; make creation idempotent by either
checking existence first (e.g., use results_cache.list_collection_names() or
equivalent and skip create if present) or wrapping the create_collection call in
a try/except that catches the specific "collection already exists" error (e.g.,
CollectionInvalid) and ignores it while re-raising other exceptions; ensure any
caught exception is handled silently or logged at debug level so retries are
safe.
| """Constants for CLP MCP server.""" | ||
|
|
||
| from enum import auto, IntEnum | ||
|
|
||
| SEARCH_MAX_NUM_RESULTS = 1000 | ||
|
|
||
| POLLING_INTERVAL_SECONDS = 1 | ||
|
|
||
|
|
||
| class QueryJobType(IntEnum): | ||
| """Matching the `QueryJobType` class in `job_orchestration.query_scheduler.constants`.""" | ||
|
|
||
| SEARCH_OR_AGGREGATION = 0 | ||
| EXTRACT_IR = auto() | ||
| EXTRACT_JSON = auto() | ||
|
|
||
|
|
||
| class QueryJobStatus(IntEnum): | ||
| """Matching the `QueryJobStatus` class in `job_orchestration.query_scheduler.constants`.""" | ||
|
|
||
| PENDING = 0 | ||
| RUNNING = auto() | ||
| SUCCEEDED = auto() | ||
| FAILED = auto() | ||
| CANCELLING = auto() | ||
| CANCELLED = auto() | ||
| KILLED = auto() |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
🧩 Analysis chain
Verify alignment with job_orchestration constants and plan for consolidation.
The constants and enums duplicate definitions from job_orchestration.query_scheduler.constants, creating a maintenance risk if the canonical definitions evolve independently. Per past discussions, build system incompatibilities currently prevent direct imports.
Short-term: Verify that the enum values precisely match their counterparts.
Long-term: Consider migrating to a shared constants package or resolving the build system conflict to enable direct imports, eliminating duplication.
Based on past review comments.
🏁 Script executed:
#!/bin/bash
# Verify that constants match job_orchestration definitions
echo "=== Checking QueryJobType in job_orchestration ==="
ast-grep --pattern $'class QueryJobType($$$):
$$$
'
echo ""
echo "=== Checking QueryJobStatus in job_orchestration ==="
ast-grep --pattern $'class QueryJobStatus($$$):
$$$
'
echo ""
echo "=== Searching for SEARCH_MAX_NUM_RESULTS usage ==="
rg -n "SEARCH_MAX_NUM_RESULTS\s*=" --type py
echo ""
echo "=== Searching for POLLING_INTERVAL usage ==="
rg -n "POLLING_INTERVAL\w*\s*=" --type pyLength of output: 3345
Enums match canonical definitions; consider future consolidation
QueryJobTypeandQueryJobStatusinclp_mcp_server/constants.pyexactly match those injob_orchestration.query_scheduler.constants.- Given current build-system constraints, duplication is acceptable; plan a future refactor to resolve the poetry vs. hatchling conflict and import these enums from a shared package.
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/constants.py lines 1-27, the
QueryJobType and QueryJobStatus enums duplicate canonical definitions in
job_orchestration.query_scheduler.constants; leave the enums as-is for now but
add a concise TODO comment above the enums stating they should be imported from
the shared package once the poetry vs. hatchling build-system conflict is
resolved, include a link or issue number for the follow-up refactor, and add a
one-line comment to remind maintainers to keep these enums in sync until the
refactor is done.
| CLP_DB_SERVICE_NAME = os.environ.get("CLP_DB_SERVICE_NAME", "db") | ||
| CLP_DB_PORT = int(os.environ.get("CLP_DB_PORT", "3306")) | ||
| CLP_DB_NAME = os.environ.get("CLP_DB_NAME", "clp-db") | ||
| CLP_DB_USER = os.environ.get("CLP_DB_USER", "clp-user") | ||
| CLP_DB_PASS = os.environ.get("CLP_DB_PASS", "<no_password_set>") | ||
|
|
||
| CLP_RESULTS_CACHE_SERVICE_NAME = os.environ.get("CLP_RESULTS_CACHE_SERVICE_NAME", "results-cache") | ||
| CLP_RESULTS_CACHE_PORT = int(os.environ.get("CLP_RESULTS_CACHE_PORT", "27017")) | ||
| CLP_RESULTS_CACHE_CLP_DB_NAME = os.environ.get("CLP_RESULTS_CACHE_CLP_DB_NAME", "clp-query-results") |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Consider explicit validation for required credentials.
The sentinel default "<no_password_set>" for CLP_DB_PASS may cause confusing runtime connection errors when a password is actually required. Per past review discussions, consider:
Option 1: Fail fast if the password is unset:
CLP_DB_PASS = os.environ.get("CLP_DB_PASS")
if CLP_DB_PASS is None:
raise ValueError("CLP_DB_PASS environment variable must be set")Option 2: Use typed configuration (dataclass/pydantic) with required fields:
from dataclasses import dataclass
@dataclass
class CLPSettings:
db_service_name: str = "db"
db_port: int = 3306
db_name: str = "clp-db"
db_user: str = "clp-user"
db_pass: str # Required, no default
@classmethod
def from_env(cls) -> "CLPSettings":
return cls(
db_service_name=os.environ.get("CLP_DB_SERVICE_NAME", "db"),
db_port=int(os.environ.get("CLP_DB_PORT", "3306")),
db_name=os.environ.get("CLP_DB_NAME", "clp-db"),
db_user=os.environ.get("CLP_DB_USER", "clp-user"),
db_pass=os.environ["CLP_DB_PASS"], # Raises KeyError if missing
)Based on past review comments.
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/settings.py around lines 5 to 13,
the CLP_DB_PASS default of "<no_password_set>" can mask missing credentials and
cause confusing runtime errors; change to require an explicit password by
reading CLP_DB_PASS without a sentinel and fail-fast if absent (raise a
ValueError or let KeyError surface), or replace module-level values with a
typed/config object (dataclass or pydantic) where db_pass is a required field
populated from the environment so the application fails immediately with a clear
message when CLP_DB_PASS is not set.
18d8071 to
0ecb2d2
Compare
0ecb2d2 to
2cc4b14
Compare
2cc4b14 to
a46309c
Compare
f4683b2 to
22020e0
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
136-136: Fix timeout check to honour zero-second deadlines.The condition
if timeout andtreatstimeout=0as falsy, preventing immediate timeout. Useif timeout is not None andto allow zero-second deadlines while still supportingtimeout=Nonefor no timeout.Apply this diff:
- if timeout and (event_loop.time() - start_time) > timeout: + if timeout is not None and (event_loop.time() - start_time) > timeout:components/clp-mcp-server/tests/test_clp_connector.py (1)
94-98: Fix type annotation for exception class parameter.The
exc_typeparameter should be typed astype[Exception](a class type), notException(an instance type), since it's used withpytest.raiseswhich expects an exception class.Apply this diff:
async def test_wait_query_completion_failure_cases( fail_status: QueryJobStatus, - exc_type: Exception, + exc_type: type[Exception], mock_clp_config: Any ) -> None:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📥 Commits
Reviewing files that changed from the base of the PR and between e9e1053994043a90e08c1bdfc5b1efd4bb3dfb82 and 22020e01214f15395a9d5aaf6e39801c7a783f34.
⛔ Files ignored due to path filters (1)
components/clp-mcp-server/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
components/clp-mcp-server/clp_mcp_server/clp_connector.py(1 hunks)components/clp-mcp-server/pyproject.toml(3 hunks)components/clp-mcp-server/tests/test_clp_connector.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
ClpConnector(22-162)submit_query(40-94)read_job_status(96-115)wait_query_completion(117-146)read_results(148-162)components/clp-mcp-server/clp_mcp_server/constants.py (1)
QueryJobStatus(18-27)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
QueryJobStatus(18-27)QueryJobType(10-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (macos-15)
🔇 Additional comments (9)
components/clp-mcp-server/pyproject.toml (2)
13-13: Verify msgpack version matches latest requirement.Based on past review comments, the suggested version was
msgpack>=1.1.2, but this line showsmsgpack>=1.0.7. Please confirm whether>=1.1.2is required or if>=1.0.7is intentional.
77-81: LGTM on per-file ignores for tests.The per-file ignores are appropriately scoped: INP001 allows implicit namespace packages for tests, and S101 permits assert statements in test cases.
components/clp-mcp-server/tests/test_clp_connector.py (4)
3-6: LGTM on imports and organization.The imports are properly organized with standard library, third-party, and local imports clearly separated. Using
from types import SimpleNamespaceand proper typing annotations shows good practice.
13-19: LGTM on fixture design.The
mock_clp_configfixture usingSimpleNamespaceprovides a clean, reusable mock configuration for tests without requiring actual database connections.
106-116: LGTM on async generator helper.The
_aiterhelper cleanly converts an iterable to an async generator for testing. The TypeVar usage and type annotations are correct.
118-129: LGTM on read_results test.The test properly mocks the MongoDB collection and uses
patch.objectto inject the mock into the connector's_results_cache, verifying the results are retrieved correctly.components/clp-mcp-server/clp_mcp_server/clp_connector.py (3)
1-19: LGTM on imports and module structure.The imports are correctly organized with standard library, third-party packages, and package-relative imports. Using
from pymongo import AsyncMongoClientfollows the recommended public API.
96-115: LGTM on read_job_status implementation.The method correctly queries MariaDB for the job status and raises a clear
ValueErrorwhen the query is not found. The error message is descriptive.
148-162: LGTM on read_results implementation.The method correctly retrieves documents from the MongoDB collection with the
SEARCH_MAX_NUM_RESULTSlimit applied, preventing unbounded memory usage.
| class ClpConnector: | ||
| """A connector class to interact with the CLP database and results cache.""" | ||
|
|
||
| def __init__(self, clp_config: Any) -> None: | ||
| """Initializes the ClpConnector with MongoDB and MariaDB configurations.""" | ||
| mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/" | ||
| mongo_client = AsyncMongoClient(mongo_url) | ||
| self._results_cache = mongo_client[clp_config.results_cache.db_name] | ||
|
|
||
| # Configuration to be used in `aiomysql.connect` to MariaDB. | ||
| self._db_conf = { | ||
| "host": clp_config.database.host, | ||
| "port": clp_config.database.port, | ||
| "user": CLP_DB_USER, | ||
| "password": CLP_DB_PASS, | ||
| "db": clp_config.database.name, | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider adding resource cleanup for MongoDB client.
The AsyncMongoClient created in __init__ is never explicitly closed, which can cause resource leaks in long-running processes. Consider adding an async context manager protocol (__aenter__ and __aexit__) or a close() method to properly clean up the MongoDB connection.
Example implementation:
async def close(self) -> None:
"""Closes the MongoDB client connection."""
self._mongo_client.close()
async def __aenter__(self) -> "ClpConnector":
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit."""
await self.close()Note: Store mongo_client as self._mongo_client to enable cleanup.
🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 22 to
38, the locally created AsyncMongoClient is not stored on the instance nor
closed, risking resource leaks; store the client as self._mongo_client, keep
self._results_cache as before, add a close(self) method that calls
self._mongo_client.close(), and implement async context manager methods async
def __aenter__(self) -> "ClpConnector": return self and async def
__aexit__(self, exc_type, exc, tb) -> None: self.close(); update callers to use
"async with ClpConnector(...)" or explicitly call await
connector.close()/connector.close() as appropriate.
| async def submit_query(self, query: str, begin_ts: int, end_ts: int) -> str: | ||
| """ | ||
| Submits a query to the CLP database and returns the ID of the query. | ||
|
|
||
| :param query: The query string. | ||
| :param begin_ts: The beginning timestamp of the query range. | ||
| :param end_ts: The end timestamp of the query range. | ||
| :raise ValueError: If ``end_ts`` is smaller than ``begin_ts``. | ||
| :raise aiomysql.Error: If there is an error connecting to or querying MariaDB. | ||
| :raise pymongo.errors.PyMongoError: If there is an error interacting with MongoDB. | ||
| :raise Exception: For any other unexpected errors. | ||
| :return: The ID assigned to the query. | ||
|
|
||
| """ | ||
| if end_ts < begin_ts: | ||
| err_msg = f"end_ts {end_ts} is smaller than begin_ts {begin_ts}." | ||
| raise ValueError(err_msg) | ||
|
|
||
| job_config = msgpack.packb( | ||
| { | ||
| "begin_timestamp": begin_ts, | ||
| "dataset": None, | ||
| "end_timestamp": end_ts, | ||
| "ignore_case": True, | ||
| "max_num_results": SEARCH_MAX_NUM_RESULTS, | ||
| "query_string": query, | ||
| } | ||
| ) | ||
|
|
||
| async with aiomysql.connect(**self._db_conf) as conn, conn.cursor() as cur: | ||
| await cur.execute( | ||
| "INSERT INTO query_jobs (type, job_config) VALUES (%s, %s);", | ||
| (int(QueryJobType.SEARCH_OR_AGGREGATION), job_config), | ||
| ) | ||
| await conn.commit() | ||
| await cur.execute("SELECT LAST_INSERT_ID();") | ||
| result = await cur.fetchone() | ||
| query_id = str(result[0]) if result else None | ||
|
|
||
| if query_id is None: | ||
| err_msg = "Failed to retrieve the ID of the submitted query." | ||
| raise RuntimeError(err_msg) | ||
|
|
||
| await self._results_cache.create_collection(query_id) | ||
|
|
||
| results_metadata_doc = { | ||
| "_id": str(query_id), | ||
| "errorMsg": None, | ||
| "errorName": None, | ||
| "lastSignal": "resp-querying", | ||
| "queryEngine": "clp", | ||
| } | ||
| await self._results_cache["results-metadata"].insert_one(results_metadata_doc) | ||
|
|
||
| return query_id |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
LGTM on submit_query implementation with a minor idempotency suggestion.
The method correctly validates timestamps, packs the job config, inserts into MariaDB, retrieves the query ID, and creates the MongoDB collection. The error handling is appropriate.
Minor suggestion: Consider making collection creation idempotent by catching pymongo.errors.CollectionInvalid (raised when the collection already exists) to support retry scenarios:
try:
await self._results_cache.create_collection(query_id)
except pymongo.errors.CollectionInvalid:
pass # Collection already exists🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 40-94,
make the MongoDB collection creation idempotent by wrapping the await
self._results_cache.create_collection(query_id) call in a try/except that
catches pymongo.errors.CollectionInvalid and silently continues (or logs a debug
message) so retries won’t fail if the collection already exists; ensure
pymongo.errors is imported or referenced correctly.
765bbd6 to
d6676bb
Compare
junhaoliao
left a comment
There was a problem hiding this comment.
the very last batch of comments (hopefully)
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (5)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (4)
129-130: Replace deprecatedget_event_loop()withget_running_loop().
asyncio.get_event_loop()is deprecated in Python 3.10+ and the project requires Python >=3.10. This issue was flagged in past reviews but remains unaddressed.Apply this diff:
- event_loop = asyncio.get_event_loop() + event_loop = asyncio.get_running_loop() start_time = event_loop.time()Also update line 136:
- if timeout and (event_loop.time() - start_time) > timeout: + if timeout and (event_loop.time() - start_time) > timeout:
83-83: Make collection creation idempotent for retry scenarios.The
create_collectioncall will raise an exception if the collection already exists, which can happen during retries. Based on past review comments, consider making this idempotent.- await self._results_cache.create_collection(query_id) + try: + await self._results_cache.create_collection(query_id) + except pymongo.errors.CollectionInvalid: + pass # Collection already existsRemember to import
pymongo.errorsif not already imported.
139-146: Simplify control flow by replacingelifwithif.Since the
ifblock at line 134 containscontinueand the block at line 140 containsbreak, control flow is already interrupted. Theelifstatements can be simplified toifstatements for better readability.Apply this diff:
- elif status == QueryJobStatus.SUCCEEDED: + if status == QueryJobStatus.SUCCEEDED: break - elif status in error_states: + if status in error_states: err_msg = f"Query job with ID {query_id} ended in status {status.name}." raise RuntimeError(err_msg) - else: - err_msg = f"Query job with ID {query_id} has unknown status {status}." - raise RuntimeError(err_msg) + + err_msg = f"Query job with ID {query_id} has unknown status {status}." + raise RuntimeError(err_msg)
136-136: Fix timeout check to honour zero-second deadlines.The condition
if timeout and ...treatstimeout=0as falsy, so a zero-second deadline will never time out. This issue was flagged in past reviews.Apply this diff:
- if timeout and (event_loop.time() - start_time) > timeout: + if timeout is not None and (event_loop.time() - start_time) > timeout: err_msg = f"Timeout waiting for query job with ID {query_id} to complete." raise TimeoutError(err_msg)components/clp-mcp-server/tests/test_clp_connector.py (1)
88-98: Fix type annotation to accept both enum and int.The
fail_statusparameter is annotated asQueryJobStatus, but the parametrize decorator includes999(anint) to test unknown status codes. This creates a type mismatch.Apply this diff to fix the annotation:
async def test_wait_query_completion_failure_cases( - fail_status: QueryJobStatus, + fail_status: int, exc_type: type[Exception], mock_clp_config: Any ) -> None:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📥 Commits
Reviewing files that changed from the base of the PR and between 22020e01214f15395a9d5aaf6e39801c7a783f34 and d6676bbc74b83d6bb5150036cb483eed96e66ede.
⛔ Files ignored due to path filters (1)
components/clp-mcp-server/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
components/clp-mcp-server/clp_mcp_server/clp_connector.py(1 hunks)components/clp-mcp-server/pyproject.toml(3 hunks)components/clp-mcp-server/tests/test_clp_connector.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
QueryJobStatus(18-27)QueryJobType(10-15)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
ClpConnector(22-162)submit_query(40-94)read_job_status(96-115)wait_query_completion(117-146)read_results(148-162)components/clp-mcp-server/clp_mcp_server/constants.py (1)
QueryJobStatus(18-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: rust-checks (macos-15)
🔇 Additional comments (1)
components/clp-mcp-server/tests/test_clp_connector.py (1)
118-129: LGTM on test implementation.The test correctly mocks the MongoDB collection and uses the async generator helper to simulate document retrieval. The approach of patching
_results_cacheis appropriate for unit testing without requiring actual database connections.
| def __init__(self, clp_config: Any) -> None: | ||
| """Initializes the ClpConnector with MongoDB and MariaDB configurations.""" | ||
| mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/" | ||
| mongo_client = AsyncMongoClient(mongo_url) | ||
| self._results_cache = mongo_client[clp_config.results_cache.db_name] | ||
|
|
||
| # Configuration to be used in `aiomysql.connect` to MariaDB. | ||
| self._db_conf = { | ||
| "host": clp_config.database.host, | ||
| "port": clp_config.database.port, | ||
| "user": CLP_DB_USER, | ||
| "password": CLP_DB_PASS, | ||
| "db": clp_config.database.name, | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider adding resource cleanup for MongoDB client.
The AsyncMongoClient is created but not stored as an instance variable and never explicitly closed, which can cause resource leaks in long-running processes. Based on past review comments, consider adding a cleanup method or async context manager support.
Example implementation:
def __init__(self, clp_config: Any) -> None:
"""Initializes the ClpConnector with MongoDB and MariaDB configurations."""
mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/"
self._mongo_client = AsyncMongoClient(mongo_url)
self._results_cache = self._mongo_client[clp_config.results_cache.db_name]
# ... rest of init
async def close(self) -> None:
"""Closes the MongoDB client connection."""
self._mongo_client.close()
async def __aenter__(self) -> "ClpConnector":
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit."""
await self.close()🤖 Prompt for AI Agents
components/clp-mcp-server/clp_mcp_server/clp_connector.py lines 25-38: The
AsyncMongoClient is instantiated but not stored or closed, risking resource
leaks; store the client on the instance (e.g., self._mongo_client =
AsyncMongoClient(...)) and use it to set self._results_cache, add a close method
that closes the client (call self._mongo_client.close()), and implement async
context manager support by adding async __aenter__ (return self) and async
__aexit__ (call/await close) so callers can use "async with" or explicitly call
close to release resources.
d6676bb to
1430ba9
Compare
1430ba9 to
d57200a
Compare
junhaoliao
left a comment
There was a problem hiding this comment.
there's one more suggestion (sorry for not raising it earlier), which is optional to be applied
for the title, how about:
feat(mcp-server): Add ClpConnector for query job submission and result retrieval.
45cd269 to
342a802
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (4)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (3)
82-82: Consider making collection creation idempotent.If
submit_queryis retried (e.g., after a partial failure),create_collectionwill raisepymongo.errors.CollectionInvalid. Consider wrapping the call to handle existing collections gracefully.Example:
+ try: await self._results_cache.create_collection(query_id) + except pymongo.errors.CollectionInvalid: + pass # Collection already exists, safe to continue
141-141: Fix timeout check to handle zero-second deadlines.The condition
if timeout and (event_loop.time() - start_time) > timeout:treatstimeout=0as falsy, so a zero-second deadline will never fire. Change the guard to explicitly check forNone.Apply this diff:
- if timeout and (event_loop.time() - start_time) > timeout: + if timeout is not None and (event_loop.time() - start_time) > timeout: err_msg = f"Timeout waiting for query job with ID {query_id} to complete." raise TimeoutError(err_msg)
127-128: Replace deprecatedget_event_loop()withget_running_loop().
asyncio.get_event_loop()is deprecated in Python 3.10+ and the project requires Python >=3.10. Useasyncio.get_running_loop()to access the currently running event loop.Apply this diff:
- event_loop = asyncio.get_event_loop() + event_loop = asyncio.get_running_loop() start_time = event_loop.time()components/clp-mcp-server/tests/test_clp_connector.py (1)
94-98: Fix type annotation to match parametrized test values.The parametrized test includes
999(anint) which is not a member of theQueryJobStatusenum, butfail_statusis annotated asQueryJobStatus. This causes a type mismatch.Apply this diff:
async def test_wait_query_completion_failure_cases( - fail_status: QueryJobStatus, + fail_status: int, exc_type: type[Exception], mock_clp_config: Any ) -> None:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📥 Commits
Reviewing files that changed from the base of the PR and between d57200a1d7c22ed323cd4d657eeb93dd5db3d314 and 342a802.
⛔ Files ignored due to path filters (1)
components/clp-mcp-server/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
components/clp-mcp-server/clp_mcp_server/clp_connector.py(1 hunks)components/clp-mcp-server/pyproject.toml(3 hunks)components/clp-mcp-server/tests/test_clp_connector.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
ClpConnector(22-160)submit_query(40-93)read_job_status(95-113)wait_query_completion(115-145)read_results(147-160)components/clp-mcp-server/clp_mcp_server/constants.py (1)
QueryJobStatus(18-27)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
QueryJobStatus(18-27)QueryJobType(10-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: package-image
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-22.04)
- GitHub Check: rust-checks (macos-15)
- GitHub Check: lint-check (macos-15)
| def __init__(self, clp_config: Any) -> None: | ||
| """Initializes the ClpConnector with MongoDB and MariaDB configurations.""" | ||
| mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/" | ||
| mongo_client = AsyncMongoClient(mongo_url) | ||
| self._results_cache = mongo_client[clp_config.results_cache.db_name] |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Store mongo_client for proper resource cleanup.
The AsyncMongoClient is created but not stored on the instance, preventing explicit cleanup. Past reviews suggested adding a close() method and async context manager support.
Apply this diff to store the client and enable cleanup:
def __init__(self, clp_config: Any) -> None:
"""Initializes the ClpConnector with MongoDB and MariaDB configurations."""
mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/"
- mongo_client = AsyncMongoClient(mongo_url)
- self._results_cache = mongo_client[clp_config.results_cache.db_name]
+ self._mongo_client = AsyncMongoClient(mongo_url)
+ self._results_cache = self._mongo_client[clp_config.results_cache.db_name]Then add cleanup methods:
async def close(self) -> None:
"""Closes the MongoDB client connection."""
self._mongo_client.close()
async def __aenter__(self) -> "ClpConnector":
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit."""
await self.close()🤖 Prompt for AI Agents
In components/clp-mcp-server/clp_mcp_server/clp_connector.py around lines 25 to
29, the AsyncMongoClient is created but not saved to the instance so it cannot
be closed; update the __init__ to assign the client to self._mongo_client before
deriving self._results_cache, and add async cleanup/context support by
implementing async def close(self) -> None that calls
self._mongo_client.close(), plus async def __aenter__(self) -> "ClpConnector"
returning self and async def __aexit__(self, exc_type, exc_val, exc_tb) -> None
that awaits self.close(); ensure attribute names match and no blocking calls are
introduced.
| "ruff>=0.11.12", | ||
| "pytest>=8.4.1", | ||
| "pytest-env>=1.1.5", | ||
| "pytest-asyncio>=1.2.0", |
There was a problem hiding this comment.
Fix impossible pytest-asyncio constraint
pytest-asyncio hasn't shipped any 1.x releases—the latest published wheels on PyPI are still 0.23.x. Requiring >=1.2.0 makes the dev environment unsatisfiable, so installs and CI will fail. Drop the floor to the highest tested 0.x release (e.g., >=0.23.6) or pin exactly if you prefer repeatability.
- "pytest-asyncio>=1.2.0",
+ "pytest-asyncio>=0.23.6",📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "pytest-asyncio>=1.2.0", | |
| "pytest-asyncio>=0.23.6", |
🤖 Prompt for AI Agents
In components/clp-mcp-server/pyproject.toml around line 29, the pytest-asyncio
version constraint ">=1.2.0" is impossible because pytest-asyncio has no 1.x
releases; update the constraint to a valid released version such as ">=0.23.6"
(or pin to an exact 0.23.x like "==0.23.6" for repeatability) so the dev
environment and CI can install successfully.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (4)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (3)
25-29: Store mongo_client for proper resource cleanup.The
AsyncMongoClientis created but not stored on the instance, preventing explicit cleanup. Past reviews flagged this and suggested adding aclose()method and async context manager support.Apply this diff to store the client and enable cleanup:
def __init__(self, clp_config: Any) -> None: """Initializes the ClpConnector with MongoDB and MariaDB configurations.""" mongo_url = f"mongodb://{clp_config.results_cache.host}:{clp_config.results_cache.port}/" - mongo_client = AsyncMongoClient(mongo_url) - self._results_cache = mongo_client[clp_config.results_cache.db_name] + self._mongo_client = AsyncMongoClient(mongo_url) + self._results_cache = self._mongo_client[clp_config.results_cache.db_name]Then add cleanup methods:
async def close(self) -> None: """Closes the MongoDB client connection.""" self._mongo_client.close() async def __aenter__(self) -> "ClpConnector": """Async context manager entry.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Async context manager exit.""" await self.close()
82-82: Consider making collection creation idempotent.
create_collection(query_id)may raise an exception if the collection already exists (e.g., due to a retry after partial failure). Wrapping this in a try-except to ignoreCollectionInvaliderrors improves retry safety.Example:
- await self._results_cache.create_collection(query_id) + try: + await self._results_cache.create_collection(query_id) + except pymongo.errors.CollectionInvalid: + pass # Collection already exists
141-141: Fix timeout guard to honour zero-second deadlines.Line 141 still has the bug flagged in past reviews:
timeout andtreatstimeout=0as falsy, so the loop never times out when timeout is 0. Update the guard to differentiate0fromNone.Apply this diff:
- if timeout and (event_loop.time() - start_time) > timeout: + if timeout is not None and (event_loop.time() - start_time) > timeout:components/clp-mcp-server/clp_mcp_server/settings.py (1)
6-6: Consider explicit validation for required credentials.The sentinel default
"<no_password_set>"forCLP_DB_PASSmay cause confusing runtime connection errors when a password is actually required. Past reviews suggested failing fast if the password is unset.Option 1 - Fail fast if password is unset:
CLP_DB_PASS = os.environ.get("CLP_DB_PASS") if CLP_DB_PASS is None: raise ValueError("CLP_DB_PASS environment variable must be set")Option 2 - Use no default and let KeyError surface:
CLP_DB_PASS = os.environ["CLP_DB_PASS"] # Raises KeyError if missing
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (3)
components/clp-mcp-server/clp_mcp_server/clp_connector.py(1 hunks)components/clp-mcp-server/clp_mcp_server/settings.py(1 hunks)components/clp-mcp-server/tests/test_clp_connector.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
components/clp-mcp-server/clp_mcp_server/constants.py (2)
QueryJobStatus(18-27)QueryJobType(10-15)
components/clp-mcp-server/tests/test_clp_connector.py (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (5)
ClpConnector(22-160)submit_query(40-93)read_job_status(95-113)wait_query_completion(115-145)read_results(147-160)components/clp-mcp-server/clp_mcp_server/constants.py (1)
QueryJobStatus(18-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-24.04)
- GitHub Check: rust-checks (ubuntu-22.04)
🔇 Additional comments (2)
components/clp-mcp-server/clp_mcp_server/clp_connector.py (1)
147-160: LGTM on read_results implementation.The method correctly applies the
SEARCH_MAX_NUM_RESULTSlimit and returns a bounded list, addressing past concerns about potential OOM issues.components/clp-mcp-server/tests/test_clp_connector.py (1)
58-133: LGTM on test implementation.The tests provide good coverage for validation, error cases, and success scenarios. The use of mocks, parametrization, and the
_aiterhelper is appropriate. Only minor type annotation issue flagged separately.
| (999, RuntimeError), # unknown status | ||
| ]) | ||
| async def test_wait_query_completion_failure_cases( | ||
| fail_status: QueryJobStatus, |
There was a problem hiding this comment.
Type annotation mismatch in parametrised test.
Line 99 annotates fail_status: QueryJobStatus, but line 96 includes 999 (an int) which is not a member of the enum. The annotation should be int to match the actual parameter values.
Apply this diff:
- fail_status: QueryJobStatus,
+ fail_status: int,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fail_status: QueryJobStatus, | |
| async def test_wait_query_completion_failure_cases( | |
| fail_status: int, | |
| exc_type: Exception | |
| ) -> None: | |
| ... |
🤖 Prompt for AI Agents
In components/clp-mcp-server/tests/test_clp_connector.py around line 99 the test
parameter is annotated as "fail_status: QueryJobStatus" while the parametrized
values include an int (999) which is not a member of the enum; change the
annotation to "fail_status: int" (or a union like "int | QueryJobStatus" if
other cases include enum members) so the type matches the actual parameter
values.
Description
This PR adds the connector between MCP server and CLP package. It enables the MCP to submit queries and read results.
Checklist
breaking change.
Validation performed
Summary by CodeRabbit
New Features
Tests
Chores