feat(native): Add Java planner integration, protocol, and dynamic RPC function detection [8/8] (OSS) (#27358)#27358
Conversation
Reviewer's GuideIntegrates Java planner RPCNode support with Velox C++ RPC plan conversion and dynamic RPC function discovery, enabling async RPC functions (e.g., LLM inference) to run via a dedicated RPC plan node wired through the planning, protocol, sidecar, and execution stacks. Sequence diagram for RPC function planning and async execution via RPCNodesequenceDiagram
actor User
participant Coordinator as CoordinatorPlanner
participant Sidecar as NativeSidecarFunctionRegistryTool
participant RpcOpt as RpcFunctionOptimizer
participant JavaRPCNode as RPCNode
participant Proto as ProtocolSerde
participant Worker as PrestoServerVelox
participant Conv as VeloxQueryPlanConverterBase
participant Registry as AsyncRPCFunctionRegistry
participant RPCOp as RPCOperator
User->>Coordinator: Submit query with rpc_function(...)
Coordinator->>Sidecar: getRpcFunctionNames()
Sidecar->>Sidecar: Fetch /v1/functions
Sidecar->>Sidecar: Filter JsonBasedUdfFunctionMetadata where isRpcFunction=true
Sidecar-->>Coordinator: Set<String> rpcFunctionNames
Coordinator->>RpcOpt: optimize(plan, rpcFunctionNames)
RpcOpt->>RpcOpt: Scan ProjectNode assignments
RpcOpt->>RpcOpt: Detect CallExpression where name in rpcFunctionNames
RpcOpt->>RpcOpt: Parse options JSON arg[3]
RpcOpt->>RpcOpt: Determine streamingMode, dispatchBatchSize
RpcOpt->>JavaRPCNode: Create RPCNode(source, functionName,
RpcOpt->>JavaRPCNode: arguments, argumentColumns,
RpcOpt->>JavaRPCNode: outputVariable, streamingMode, dispatchBatchSize
RpcOpt->>Coordinator: Return rewritten plan
Coordinator->>Proto: Serialize plan
Proto->>Proto: Write protocol.RPCNode JSON
Proto-->>Worker: Send plan fragment with RPCNode
Worker->>Conv: toVeloxQueryPlan(protocol.RPCNode)
Conv->>Registry: isRegistered(functionName)
Registry-->>Conv: bool isRegistered
Conv->>Conv: VELOX_CHECK isRegistered
Conv->>Conv: Parse resultType from outputVariable.type
Conv->>Conv: Convert arguments to Velox expressions
Conv->>Conv: Extract constantInputs and argumentTypes
Conv->>Conv: Parse streamingMode, dispatchBatchSize
Conv->>Worker: Build core.RPCNode
Worker->>RPCOp: Initialize from core.RPCNode
RPCOp->>Registry: Instantiate AsyncRPCFunction by name
RPCOp->>RPCOp: Dispatch async RPC calls
RPCOp-->>Worker: Produce result column
Worker-->>User: Return query results with RPC function output
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- In RpcFunctionOptimizer.parseOptionsJson, using optionsArg.getValue().toString() is fragile for VARCHAR constants (e.g., Slice.toString() won’t yield the JSON payload); consider decoding the constant according to its type (e.g., via VarcharType or Slice utilities) so streaming_mode and dispatch_batch_size are parsed reliably.
- RPCNode’s Java constructor doesn’t enforce that arguments.size() matches argumentColumns.size(), which can easily drift during rewrites; adding a precondition check here would surface planner bugs earlier instead of producing invalid protocol plans.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In RpcFunctionOptimizer.parseOptionsJson, using optionsArg.getValue().toString() is fragile for VARCHAR constants (e.g., Slice.toString() won’t yield the JSON payload); consider decoding the constant according to its type (e.g., via VarcharType or Slice utilities) so streaming_mode and dispatch_batch_size are parsed reliably.
- RPCNode’s Java constructor doesn’t enforce that arguments.size() matches argumentColumns.size(), which can easily drift during rewrites; adding a precondition check here would surface planner bugs earlier instead of producing invalid protocol plans.
## Individual Comments
### Comment 1
<location path="presto-main-base/src/main/java/com/facebook/presto/sql/planner/plan/RPCNode.java" line_range="88-89" />
<code_context>
+ super(sourceLocation, id, Optional.empty());
+ this.source = requireNonNull(source, "source is null");
+ this.functionName = requireNonNull(functionName, "functionName is null");
+ this.arguments = ImmutableList.copyOf(requireNonNull(arguments, "arguments is null"));
+ this.argumentColumns = ImmutableList.copyOf(requireNonNull(argumentColumns, "argumentColumns is null"));
+ this.outputVariable = requireNonNull(outputVariable, "outputVariable is null");
+ this.streamingMode = streamingMode != null ? streamingMode : StreamingMode.PER_ROW;
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider validating that arguments and argumentColumns have matching cardinality.
RPCNode relies on arguments[i] matching argumentColumns[i], but the constructor doesn’t currently guarantee equal list sizes. A mismatch would manifest as a runtime error on the native side. Please add a size check (e.g., checkArgument(arguments.size() == argumentColumns.size(), ...)) so inconsistencies fail fast at construction time.
Suggested implementation:
```java
this.source = requireNonNull(source, "source is null");
this.functionName = requireNonNull(functionName, "functionName is null");
this.arguments = ImmutableList.copyOf(requireNonNull(arguments, "arguments is null"));
this.argumentColumns = ImmutableList.copyOf(requireNonNull(argumentColumns, "argumentColumns is null"));
checkArgument(
this.arguments.size() == this.argumentColumns.size(),
"arguments and argumentColumns must have the same size: arguments=%s, argumentColumns=%s",
this.arguments.size(),
this.argumentColumns.size());
this.outputVariable = requireNonNull(outputVariable, "outputVariable is null");
this.streamingMode = streamingMode != null ? streamingMode : StreamingMode.PER_ROW;
```
If `checkArgument` is not already imported in `RPCNode.java`, add:
- A static import at the top of the file (among other imports):
`import static com.google.common.base.Preconditions.checkArgument;`
Adjust the import placement to match the existing import ordering conventions in the file.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
2b0b5f1 to
7610368
Compare
|
Thanks @zhichenxu-meta for this work. Do you have a small design doc or RFC for this feature ? |
@aditi-pandit Thanks, I have an Meta internal design doc, and will create a public one. |
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
7610368 to
392c87b
Compare
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
392c87b to
dd84a31
Compare
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
dd84a31 to
875dac3
Compare
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
875dac3 to
6758f8d
Compare
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
6758f8d to
5873d4e
Compare
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
5873d4e to
b71b95c
Compare
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
b71b95c to
a07383e
Compare
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
a07383e to
09fa75b
Compare
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
09fa75b to
78aa8e3
Compare
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
78aa8e3 to
13f6478
Compare
…nction detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
e3984a9 to
fcfdef2
Compare
… function detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
fcfdef2 to
9014402
Compare
… function detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Differential Revision: D94996326
9014402 to
da43f63
Compare
| private final NodeManager nodeManager; | ||
| private final HttpClient httpClient; | ||
| private static final String FUNCTION_SIGNATURES_ENDPOINT = "/v1/functions"; | ||
| private volatile UdfFunctionSignatureMap cachedSignatureMap; |
There was a problem hiding this comment.
Why we need a cachedSignatureMap for the RPC function? Why it's special comparing with other functions? cc @kevintang2022 who knows this code better
There was a problem hiding this comment.
It looks like it maintains existing behavior.
It's simple caching so that getNativeFunctionSignatureMap() is not called more than once. Otherwise, it would get called twice. Once in getWorkerFunctions and once in getRpcFunctionNames
| * Whether this function is an RPC function (dispatched via the async RPC framework). | ||
| * Set by the sidecar based on AsyncRPCFunctionRegistry. | ||
| */ | ||
| private final Optional<Boolean> isRpcFunction; |
There was a problem hiding this comment.
Why only add this to JsonBasedUdf metadata? Are we expecting that it will only be registered in json function namespace manager?
There was a problem hiding this comment.
This JsonBasedUdf metadata is used to store info about the functions that are in worker, and each function from worker is represented by one of these instances. It looks like this just adds an extra field to how a worker function is represented
There was a problem hiding this comment.
Thanks Feilong for the question and Kevin for the answer/comment!
There was a problem hiding this comment.
This JsonBasedUdf metadata is used to store info about the functions that are in worker,
So worker function uses JsonBasedUdf for registration in coordinator? Didn't know this.
| node.getStreamingMode(), | ||
| node.getDispatchBatchSize()); | ||
|
|
||
| return new PlanWithProperties(rewrittenNode, source.getProperties()); |
There was a problem hiding this comment.
Not sure if we want to pass source.getProperties() untouched, at least the ordering properties and grouping properties should not be kept (it may not be used in addExchanges though, need double check)
There was a problem hiding this comment.
@feilong-liu Thanks for the comment! Fixed. Changed to rebaseAndDeriveProperties(node, source) — same pattern as RowNumberNode. This properly derives output properties from the rewritten node.
| @Override | ||
| public PlanWithProperties visitRPC(RPCNode node, StreamPreferredProperties parentPreferences) | ||
| { | ||
| return planAndEnforceChildren(node, parentPreferences.withDefaultParallelism(session), parentPreferences.withDefaultParallelism(session)); |
There was a problem hiding this comment.
Not sure if we want to enforce parentPreferences for this node
There was a problem hiding this comment.
@feilong-liu Added comment explaining the rationale: RPCNode benefits from multiple drivers for concurrent RPC dispatch. For constant-only queries, the C++ RPCPlanNodeTranslator::maxDrivers() forces single-driver to avoid ROUND_ROBIN distribution issues.
There was a problem hiding this comment.
Um, I guess RPCNode does not need to satisfy the parent preferences, since it's a preference it's a good to have property. Usually it's the parent node which has special requirement to decide to add additional node to enforce input to be a specific distribution.
Since this is just performance related and do not affect correctness, I guess we can land with current code and improve later if we see inefficient plans
| // Guard: if a new streaming mode is added that changes cardinality, | ||
| // this must be revisited. | ||
| RPCNode.StreamingMode mode = node.getStreamingMode(); | ||
| checkState( |
There was a problem hiding this comment.
Looks redundant to me, it has only two modes after all.
There was a problem hiding this comment.
@feilong-liu Good catch! Fixed. Removed the checkState guard. Kept the rewrite logic with a comment that RPCNode preserves cardinality (1:1) in both modes.
| * 2. The function name will be automatically discovered via the sidecar | ||
| */ | ||
| public class RpcFunctionOptimizer | ||
| implements PlanOptimizer |
There was a problem hiding this comment.
Wonder if IterativeRule is better than PlanOptimizer here
There was a problem hiding this comment.
Good question. PlanOptimizer works well here because we need bottom-up RowExpressionTreeRewriter for nested RPC extraction (e.g., rpc(rpc(col))), which doesn't map cleanly to IterativeRule's top-down Pattern matching. Happy to migrate in a follow-up if you feel strongly.
| * 1. Register the AsyncRPCFunction with signatures in the C++ worker | ||
| * 2. The function name will be automatically discovered via the sidecar | ||
| */ | ||
| public class RpcFunctionOptimizer |
There was a problem hiding this comment.
Can you add a plan test for this optimizer? And include cases where there are nested rpc function calls (if supported)
There was a problem hiding this comment.
Great suggestion! Will add in a follow-up — need to set up a mock RPC function in the test query runner. The optimizer is already E2E tested with 23 SQL queries (including nested RPCs) on the verifier cluster.
| * This node contains all the RPC metadata needed to create the operator: | ||
| * - functionName: The RPC function to call (e.g., "llm_inference") | ||
| * - arguments: The original argument expressions (used by C++ plan converter | ||
| * to extract types and constant values for AsyncRPCFunction::initialize()) |
There was a problem hiding this comment.
to extract types
I assume it can also be retrieved from the argumentColumns too?
There was a problem hiding this comment.
Updated comment. argumentColumns are just column name strings — they don't carry type information. The expression tree in arguments is needed for the C++ plan converter to extract types and constant values.
| @Override | ||
| public PlanNode assignStatsEquivalentPlanNode(Optional<PlanNode> statsEquivalentPlanNode) | ||
| { | ||
| return this; |
There was a problem hiding this comment.
This will lose the statsEquivalentPlanNode. Perhaps refer to how ProjectNode handle this function
There was a problem hiding this comment.
@feilong-liu Good catch! Fixed. Added two-constructor pattern following ProjectNode: @JsonCreator delegates to full constructor with Optional.empty(), assignStatsEquivalentPlanNode creates new RPCNode with the stats node, and replaceChildren now preserves getStatsEquivalentPlanNode().
| schema, | ||
| protocol::FunctionKind::SCALAR, | ||
| *signature, | ||
| isRpcFunction)) { |
There was a problem hiding this comment.
If there is a new field isRpcFunction in function metadata, wonder if we can directly tell from function metadata to see if a function is rpc, instead of relying on function name check.
There was a problem hiding this comment.
Both are needed for different purposes. The metadata field (isRpcFunction) is for Java-side sidecar filtering — the coordinator uses it to build the set of RPC function names for the optimizer. The C++ registry check (isRegistered()) validates the function is actually linked and registered in the binary. The metadata field alone can't guarantee the function exists in the C++ worker.
kevintang2022
left a comment
There was a problem hiding this comment.
Approving the changes on the registry tools.Feilong will review the visitor changes
Thanks ! |
feilong-liu
left a comment
There was a problem hiding this comment.
LGTM except some nits.
I think we can improve it as we use it.
| * Whether this function is an RPC function (dispatched via the async RPC framework). | ||
| * Set by the sidecar based on AsyncRPCFunctionRegistry. | ||
| */ | ||
| private final Optional<Boolean> isRpcFunction; |
There was a problem hiding this comment.
This JsonBasedUdf metadata is used to store info about the functions that are in worker,
So worker function uses JsonBasedUdf for registration in coordinator? Didn't know this.
| @Override | ||
| public PlanWithProperties visitRPC(RPCNode node, StreamPreferredProperties parentPreferences) | ||
| { | ||
| return planAndEnforceChildren(node, parentPreferences.withDefaultParallelism(session), parentPreferences.withDefaultParallelism(session)); |
There was a problem hiding this comment.
Um, I guess RPCNode does not need to satisfy the parent preferences, since it's a preference it's a good to have property. Usually it's the parent node which has special requirement to decide to add additional node to enforce input to be a specific distribution.
Since this is just performance related and do not affect correctness, I guess we can land with current code and improve later if we see inefficient plans
|
The CI failure on FunctionMetadata.cpp is expected — it includes velox/expression/rpc/AsyncRPCFunctionRegistry.h which was recently merged into Velox main (via PRs #16645, #16727, #16787, #16792, #16793). The Presto Velox submodule needs to be advanced to pick up these headers. Could someone with write access bump the Velox submodule to latest main? cd presto-native-execution/velox |
|
Thanks for the PR! Please add a release note - or |
|
@prestodb/release-notes Release NotesGeneral Changes
|
… function detection [8/8] (OSS) (prestodb#27358) Summary: Pull Request resolved: prestodb#27358 X-link: https://github.com/facebookexternal/presto-facebook/pull/3595 End-to-end integration connecting the Java planner to the C++ RPCOperator. ## Java Planner (presto-main-base) - **RPCNode.java**: Plan node for async RPC operations. - **RpcFunctionOptimizer.java**: Rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Uses Supplier<Set<String>> for lazy RPC function name resolution. - Visitor integration across plan visitors. ## Dynamic RPC Function Detection The coordinator discovers RPC functions from the sidecar without hardcoded function name lists: 1. C++ FunctionMetadata checks isRegistered(), sets isRpcFunction=true 2. presto_protocol adds isRpcFunction field to serialization 3. Java NativeSidecarFunctionRegistryTool filters for isRpcFunction=true 4. Guice Supplier<Set<String>> defers sidecar call to first query planning ## C++ Plan Converter - Converts protocol RPCNode -> core::RPCNode (name-based) - Validates function exists via isRegistered() (no instantiation) - Parses result type via typeParser_.parse() - Function instantiation deferred to RPCOperator::initialize() ## Tests - RPCPlanConverterTest: Protocol deserialization, plan conversion with name-based validation, error paths for missing functions. ## Reading Guide 1. **RpcFunctionOptimizer.java** — Start here. The core optimizer that rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode -> ProjectNode. Key methods: optimize() walks the plan, rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode with arguments and result type. 2. **RPCNode.java** — Java plan node. Constructor, getters, serialization fields. Maps to the C++ core::RPCNode from diff 2. 3. **PrestoToVeloxQueryPlan.cpp** — C++ plan converter. Converts protocol RPCNode -> Velox RPCNode. Validates function via isRegistered(). 4. **presto_protocol_core.h/cpp/yml** — Protocol serde for RPCNode. Generated-style code; review the .yml for the schema, skim the .h/.cpp for correctness. 5. **RPCPlanConverterTest.cpp** — Unit tests for the C++ plan converter. Good for understanding the expected protocol format. 6. **FunctionMetadata.cpp** — Adds isRpcFunction field based on AsyncRPCFunctionRegistry::isRegistered(). 7. **NativeSidecarFunctionRegistryTool.java** — Filters sidecar function list by isRpcFunction flag to build the RPC function name set. 8. **PlanOptimizers.java + ServerMainModule.java** — Wiring: registers RpcFunctionOptimizer with Guice Supplier for lazy RPC function discovery. 9. **Visitor touchpoints** (~15 Java files, 5-20 lines each) — Mechanical: adds RPCNode cases to existing plan visitors (AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.). Safe to skim. Reviewed By: gggrace14 Differential Revision: D94996326
Summary:
X-link: https://github.com/facebookexternal/presto-facebook/pull/3595
End-to-end integration connecting the Java planner to the C++ RPCOperator.
Java Planner (presto-main-base)
into Source -> RPCNode -> ProjectNode. Uses Supplier<Set> for
lazy RPC function name resolution.
Dynamic RPC Function Detection
The coordinator discovers RPC functions from the sidecar without
hardcoded function name lists:
C++ Plan Converter
Tests
name-based validation, error paths for missing functions.
Reading Guide
RpcFunctionOptimizer.java — Start here. The core optimizer that
rewrites ProjectNode(rpc_function(...)) into Source -> RPCNode ->
ProjectNode. Key methods: optimize() walks the plan,
rewriteProjectWithRpcFunction() extracts the RPC call, builds RPCNode
with arguments and result type.
RPCNode.java — Java plan node. Constructor, getters, serialization
fields. Maps to the C++ core::RPCNode from diff 2.
PrestoToVeloxQueryPlan.cpp — C++ plan converter. Converts
protocol RPCNode -> Velox RPCNode. Validates function via
isRegistered().
presto_protocol_core.h/cpp/yml — Protocol serde for RPCNode.
Generated-style code; review the .yml for the schema, skim the
.h/.cpp for correctness.
RPCPlanConverterTest.cpp — Unit tests for the C++ plan converter.
Good for understanding the expected protocol format.
FunctionMetadata.cpp — Adds isRpcFunction field based on
AsyncRPCFunctionRegistry::isRegistered().
NativeSidecarFunctionRegistryTool.java — Filters sidecar function
list by isRpcFunction flag to build the RPC function name set.
PlanOptimizers.java + ServerMainModule.java — Wiring: registers
RpcFunctionOptimizer with Guice Supplier for lazy RPC function
discovery.
Visitor touchpoints (~15 Java files, 5-20 lines each) —
Mechanical: adds RPCNode cases to existing plan visitors
(AddExchanges, LimitPushDown, PruneUnreferencedOutputs, etc.).
Safe to skim.
Reviewed By: gggrace14
Differential Revision: D94996326