Skip to content

Add RFC for supporting distributed procedure#12

Merged
tdcmeehan merged 3 commits into
prestodb:mainfrom
hantangwangd:main
Nov 14, 2025
Merged

Add RFC for supporting distributed procedure#12
tdcmeehan merged 3 commits into
prestodb:mainfrom
hantangwangd:main

Conversation

@hantangwangd

Copy link
Copy Markdown
Member

Propose design to expand the current procedure architecture in presto, support defining, registering and calling procedures which need to be executed in a distributed way.

Besides, in order to demonstrate as an example and to figure out the functional boundaries among the different architecture levels, also describe the design for Iceberg to support distributed procedure as well as the design of a specific distributed procedure rewrite_data_files.

@hantangwangd hantangwangd marked this pull request as draft June 11, 2024 03:44
@hantangwangd hantangwangd force-pushed the main branch 2 times, most recently from 949d74e to a01783a Compare June 13, 2024 03:51
@hantangwangd hantangwangd marked this pull request as ready for review June 13, 2024 03:54
@hantangwangd hantangwangd changed the title [WIP]Add RFC for supporting distributed procedure Add RFC for supporting distributed procedure Jun 13, 2024

@rschlussel rschlussel left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting feature. do you have thoughts on integrating this for cpp workers as well?

@hantangwangd

Copy link
Copy Markdown
Member Author

@rschlussel Thanks for the comment. Yes, of course we should integrate this for cpp workers if it's proved to be useful and needed.

I think we can take a two-step approach for this. Firstly, we support it on java workers, confirm the feasible of the entire architecture and figure out the best functional boundary division. Then, after that, we can support it on cpp workers following the design and implementation path on java workers. What's your opinion?

@aditi-pandit aditi-pandit left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hantangwangd for this RFC. Had couple of comments mainly related to Native execution related impact.

5. Similar to statements such as `create table as`/`insert`/`delete`/`refresh material view` that involve distributed processing, two related SPI methods are defined for `call distributed procedure` statement in the metadata and connector metadata interfaces. `beginCallDistributedProcedure` is used for preparation work before the start of distributed scheduling. And `finishCallDistributedProcedure` is used for transaction commitment after the completion of distributed writing.


6. As for a specific connector (such as Iceberg), in the implementation logic of method `beginCallDistributedProcedure` and `finishCallDistributedProcedure` in `ConnectorMetadata`, in addition to accomplish the common logic for this connector (such as starting a transaction, building a transaction context, committing a transaction, etc.), it should also resolve the specified distributed procedure and call its relevant method to execute the procedure-customized logic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Native vs Java runtimes begin to diverge significantly at this point. For native runtimes its best if the distributed procedure call method is in C++. But that would mean older java procedures would need a rewrite. Do you have any particular ideas on how you will handle this for Native engine ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. Actually, beginCallDistributedProcedure and finishCallDistributedProcedure in ConnectorMetadata are all invoked in coordinator, so native worker do not need to handle them, that is, there is no need in native worker end to resolve the distributed procedures and invoke them.

The non-coordinator worker's responsibility in call distributed procedure is similar to that of insert into or create table as. That is, local planning CallDistributedProcedureNode to a TableWriterOperator with ExecutionWriterTarget, getting corresponding ConnectorPageSink based on this writer target, executing the data writing and finally returning the fragments page to table finish stage.

So I think there is no need to consider this issue, what do you think? Any misunderstand please let me know.

extends WriterTarget
{
private final QualifiedObjectName procedureName;
private final Object[] procedureArguments;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presume the procedureArguments have types. There could be a difference in the types supported by native vs java execution. Would be great to clarify about that.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, CallDistributedProcedureTarget is a subclass of WriterTarget which is used only in coordinator. That is, it's always used in java execution environment.

ExecuteProcedureHandle which is a subclass of ExecutionWriterTarget will be sent to worker. The handling of it is similar to InsertHandle or CreateHandle.


Among them, `TableScanNode -> FilterNode` defines the data to be processed. It's based on the target table determined by `schema` and `table_name` in the parameters, as well as the possibly existing filter conditions.

The `CallDistributedProcedureNode -> TableFinishNode` structure is similar to the `TableWriterNode -> TableFinishNode` structure, used to perform distributed data manipulation and final unified submission behavior.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to give more details on what row/control messages are exchanged between the CallDistricutedProcedureNode and TableFinishNode. e.g. TableWriteNode provides protocol of PartitionUpdates and CommitContext that is shared between the two.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it's a good suggestion. I will supplement this part of the content.


#### 6. Iceberg connector's support for distributed procedure

In Iceberg, we often need to record the original data files that have been scanned and rewritten during the execution of table data operations (including deleted files that have been fully applied), and in the final submission, combine the newly generated data files due to rewriting to make some changes and transaction submissions at the metadata level.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg in Native engine uses HiveConnector itself. The HiveConnector in Native engine handles both Hive and Iceberg splits. Does HiveConnector support distributed procedure ? How will it be reused or enhanced for Iceberg ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As beginCallDistributedProcedure and finishCallDistributedProcedure in ConnectorMetadata are all invoked in coordinator, I think the main change for hive to support distributed procedures is in java implementation.

Hive should implement beginCallDistributedProcedure and finishCallDistributedProcedure in ConnectorMetadata, customize it's own preparation and submission logic, and then implement and register its own distributed procedures. All these jobs could be done in java only.

So I think the main change in worker end logic which need to support native c++ is that it should generate and provide ConnectorPageSink based on the newly added ConnectorDistributedProcedureHandle. Please let me know if there are any omissions.

3. Add a new plan node type: `CallDistributedProcedureNode`. During the analyzing and logical planning phase, construct a logical plan with the following shape for `call distributed procedure` statement:

```text
TableScanNode -> FilterNode -> CallDistributedProcedureNode -> TableFinishNode -> OutputNode

@jaystarshot jaystarshot Jun 26, 2024

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand correctly, but if we are going to support a general distributed procedure then why does it have to be just above TableScanNode or table finish?
If its not for a general case but specific to table layouts, then why not just a custom operator implementation?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. As described in this RFC, compared to existing coordinator-only procedures, these newly expanded kind of procedures which need to be executed distributively involve operations on table's data other than metadata. For example, these procedures can rewrite table data, merge small data files, sort table data, repartition table data etc. So the common action process of them includes reading data from the target table, doing some filtering and translation, writing the translated data to files and submitting the changes in metadata level. So I think the general distributed procedure could be planned to a plan tree like this. (Or adding some other plan nodes like JoinNode/AggregationNode etc. for functional expansion in future)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just a custom operator implementation?

After optimization and plan fragmentation, we should be able to shuffle data between stages to utilize the capabilities of the entire cluster.


## Summary

Propose design to expand the current procedure architecture in presto, support defining, registering and calling procedures which need to be executed in a distributed way.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also define what you mean by procedure?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Strictly speaking, maybe they should be called connector specific system procedures. I'll add some explanations here.

@hantangwangd

Copy link
Copy Markdown
Member Author

This RFC is ready for review, the comments are all handled, and the relevant implementation can be viewed here: prestodb/presto#22659. Please take a look when convenient, thanks! cc: @rschlussel @aditi-pandit @jaystarshot @tdcmeehan @ZacBlanco @yingsu00

* Acquire and use procedure registry in `presto-analyzer` and `connectors` module


2. Define a new query type `CALL_DISTRIBUTED_PROCEDURE`, and associate the call distributed procedure statement with this type during preparer phase. So that this kind of statements would fall into the path of `SqlQueryExecutionFactory.createQueryExecution(...)`. The generated `SqlQueryExecution` object can utilize the existing distributed querying, writing, and final committing mechanism to achieve distributed execution for procedures.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not clear what is the query syntax proposed. Are you proposing a CALL statement or ALTER TABLE ? Can you give some sample examples with their statements here. It would be great to specify the statement syntax specifying what parameters will be and their behavior/limitations.

In the future sections a table and filter are used. How are they specified in the statement used by the end user ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion. I will supplement this part of the content. The following is a brief explanation.

We propose the syntax of Call statement for the reason described in line 24 to line 28 (https://github.com/prestodb/rfcs/pull/12/files#diff-7655a74167c417d09fba4cc48ec39232f65d7056a449e1cb09d51664f4fe1b7eR24-R28), for example:

CALL iceberg.system.rewrite_data_files('db', 'sample');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample');
CALL iceberg.system.rewrite_data_files('db', 'sample', 'partition_key = 1');
CALL iceberg.system.rewrite_data_files(schema => 'db', table_name => 'sample', filter => 'partition_key = 1');

For all distributed procedures, the implementation of their base class defaults to including parameters schema, table_name and filter. Among them, schema and table_name are required parameters, which specify the target table to be processed by this distributed procedure. While filter is an optional parameter, which indicates the filtering conditions for the data to be processed.

The specific procedure implementations can expand the parameter list, but they must include the required parameters schema and table_name.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3. Add a new plan node type: `CallDistributedProcedureNode`. During the analyzing and logical planning phase, construct a logical plan with the following shape for `call distributed procedure` statement:

```text
TableScanNode -> FilterNode -> CallDistributedProcedureNode -> TableFinishNode -> OutputNode

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its interesting that this is rooted in a TableScanNode. Is this of a regular table or something else ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a regular table representing the target table. As mentioned above, the target table is specified through the parameters schema and table_name.

![Distributed_procedure_architecture](RFC-0006/distributed_procedure.png)


4. The optimizing, segmenting, group execution tagging, and local planning of `CallDistributedProcedureNode` are similar to `TableWriterNode`. And it would be ultimately local planned to a `TableWriterOperator` (which holds a specific type of `ExecutionWriterTarget` subclass related to `call distributed procedure` statement). When creating a `PageSink` to execute data writing, a corresponding `ConnectorPageSink` will be generated based on the specific subclass and property values of `ExecutionWriterTarget` that are actually held by `TableWriterOperator`.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConnectorPageSink will need a Prestissimo implementation as well.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so familiar with Prestissimo implementation. Are we currently reusing the implementation of ConnectorPageSink from Hive in native worker? If so, yes maybe it's better to implement an Iceberg own ConnectorPageSink.

![Distributed_procedure_architecture](RFC-0006/distributed_procedure.png)


4. The optimizing, segmenting, group execution tagging, and local planning of `CallDistributedProcedureNode` are similar to `TableWriterNode`. And it would be ultimately local planned to a `TableWriterOperator` (which holds a specific type of `ExecutionWriterTarget` subclass related to `call distributed procedure` statement). When creating a `PageSink` to execute data writing, a corresponding `ConnectorPageSink` will be generated based on the specific subclass and property values of `ExecutionWriterTarget` that are actually held by `TableWriterOperator`.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the fields and members of ExecutionWriterTarget ?

TableWriter partitions and bucketizes input rows to files. This is a very specfic action whereas ExecutionWriterTarget is very generic. Are there specific actions like say a rewrite action or optimize action that we will implement in code that could be specified by ExecutionWriterTarget ? If yes, then we will need to implement the same logic in Native engine as well.

@hantangwangd hantangwangd Aug 14, 2024

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The detailed description for ExecuteProcedureHandle (which is an implementation class of interface ExecutionWriterTarget) and it's members is in line 252 to line 273 (https://github.com/prestodb/rfcs/pull/12/files#diff-7655a74167c417d09fba4cc48ec39232f65d7056a449e1cb09d51664f4fe1b7eR252-R273) and line 357 to line 372 (https://github.com/prestodb/rfcs/pull/12/files#diff-7655a74167c417d09fba4cc48ec39232f65d7056a449e1cb09d51664f4fe1b7eR357-R372). The definitions of its fields and members are similar to InsertHandle.

Currently there is no specific logic that needs to be implemented through code in ExecuteProcedureHandle, so in order to support ExecuteProcedureHandle in native, we can basically refer to the implementation of InsertHandle. The only difference is that an additional field will appear in ExecuteProcedureHandle: QualifiedObjectName procedureName.

4. The optimizing, segmenting, group execution tagging, and local planning of `CallDistributedProcedureNode` are similar to `TableWriterNode`. And it would be ultimately local planned to a `TableWriterOperator` (which holds a specific type of `ExecutionWriterTarget` subclass related to `call distributed procedure` statement). When creating a `PageSink` to execute data writing, a corresponding `ConnectorPageSink` will be generated based on the specific subclass and property values of `ExecutionWriterTarget` that are actually held by `TableWriterOperator`.


5. Similar to statements such as `create table as`/`insert`/`delete`/`refresh material view` that involve distributed processing, two related SPI methods are defined for `call distributed procedure` statement in the metadata and connector metadata interfaces. `beginCallDistributedProcedure` is used for preparation work before the start of distributed scheduling. And `finishCallDistributedProcedure` is used for transaction commitment after the completion of distributed writing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its good to clarify if beginCallDistributedProcedure and finishCallDistributedProcedure will happen at the co-ordinator.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will add this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public static class CallDistributedProcedureTarget
extends WriterTarget
{
private final QualifiedObjectName procedureName;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this procedure need to be dynamically loaded ? It might be worth thinking how you will do it in C++. Are you planning to use dlopen call ? That can come with lots of security implications.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly, in the current design, distributed procedures serve as Connector specific built-in system procedures, therefore they do not need to be dynamically loaded, but only need to be statically pre-defined and loaded. Dynamic loading is more like what need to be done when supporting UDF or UDP, which is beyond the scope of this proposal. And we can consider how to support it if necessary in the future.

Secondly, as discussed above, the methods of a DistributedProcedure will always be called in Java environment. Even when executing the final submission, the method FinishCallDistributedProcedure.finish(...) will be executed in operator TableFinishOperator in the Java worker inside coordinator as well. So it seems that there is currently no need to consider dynamic loading on C++ environment. Please let me know if there are any omissions or misunderstandings.

@yingsu00 yingsu00 left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd Is there a working prototype or a PR now?

3. Add a new plan node type: `CallDistributedProcedureNode`. During the analyzing and logical planning phase, construct a logical plan with the following shape for `call distributed procedure` statement:

```text
TableScanNode -> FilterNode -> CallDistributedProcedureNode -> TableFinishNode -> OutputNode

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The operation may not necessarily update the real data files. For example, We may want to run CALL system.clean_data_cache('ALL_WORKERS'); to just call AsyncDataCache::shutdown() on all Presto C++ workers. What would the plan look like?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The operation may not necessarily update the real data files. For example, We may want to run CALL system.clean_data_cache('ALL_WORKERS'); to just call AsyncDataCache::shutdown() on all Presto C++ workers. What would the plan look like?

Thanks for the question, @yingsu00. When we need to realize the functionality you mentioned, I think we can extend the subtypes of DistributedProcedure to, for example, DistributedDataProcedure and DistributedMetadataProcedure. For the type of DistributedMetadataProcedure, we can analyze and plan it to a logical plan with the following shape:

OutputNode <- ProcedureFinishNode <- CallDistributedProcedureNode

Wherein, ProcedureFinishNode is a newly added plan node. And after being segmented, the plan only needs to include two stages. Do you think it's reasonable?

@hantangwangd

Copy link
Copy Markdown
Member Author

Is there a working prototype or a PR now?

Yes, there is already a PR, #22659. Will rebase it to the newest commit list as soon as possible.

@tdcmeehan tdcmeehan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd thanks for your perseverance on this RFC.

It would be nice to sketch out the work required to execute distributed procedures in C++ clusters. It would seem like it's the following:

  1. Add CallDistributedProcedureNode to the protocol
  2. Add a new method PrestoToVeloxConnector.toVeloxInsertTableHandle that interprets the handle and creates a Velox insert handle

@hantangwangd

Copy link
Copy Markdown
Member Author

@tdcmeehan Thanks for the review and suggestion. Sure, I'll add this part soon!

@hantangwangd

Copy link
Copy Markdown
Member Author

Hi @tdcmeehan, I have updated the RFC document to include the work required for native workers to support distributed procedures. Furthermore, I have extended the current distributed procedure types to position the currently designed table-data-rewrite feature as a subtype. This could ensure the extensibility required for other types of distributed procedures in the future. Please take a look when you got a chance. Any feedback would be greatly appreciated!

@hantangwangd

hantangwangd commented Oct 14, 2025

Copy link
Copy Markdown
Member Author

@rschlussel @aditi-pandit @yingsu00 @jaystarshot thanks for your reviews and comments.

Currently, we have expanded the type system for distributed procedures. The table-data-rewrite functionality we mainly focus on in this RFC is one specific subtype of DistributedProcedure now. Under this architecture, we can extend another subtype (e.g., DistributedMaintenanceProcedure) to support the system.clean_data_cache('ALL_WORKERS') mentioned above. During the analysis and logical planning phases, different subtypes of DistributedProcedure will follow different branching logic based on their subtypes, ultimately being planned to plan trees with different shape (or even include newly defined plan nodes).

For example, the logical plan ultimately compiled for the DistributedMaintenanceProcedure type may look like the following:

DistributedMaintenanceProcedureNode -> ProcedureFinishNode -> OutputNode

After being segmented, the plan only needs to include two stages, and the leaf one's partitioning handle should be set to ALL_WORKERS_DISTRIBUTION (a newly defined system partitioning handle).

In this way, we retain the flexibility to extend distributed procedures in the future in a structured way.

Besides, description has been added for the work required to support distributed procedures (with table-data-rewrite type) on native workers. Please take another look when you have a chance. Any feedback would be greatly appreciated!

@tdcmeehan tdcmeehan left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Can you update the RFC number in the file name and also add it to the title, similar to other RFCs?

@hantangwangd

Copy link
Copy Markdown
Member Author

@tdcmeehan Sure, I have updated the RFC number of the file name to RFC-0021 and added it to the title. Please take a look when you have a chance. Thanks a lot!

### Detailed description

#### 1. Re-factor Procedure/ProcedureRegistry

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think its clear about how we discover these procedures on server startup... Are these like plugins ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditi-pandit thanks for your feedback. In summary, it extends and reuses the existing procedure registration and discovery mechanism, so it follows the same pattern as existing Procedures. See the registration of the newly added RewriteDataFilesProcedure here.

@tdcmeehan tdcmeehan merged commit efafb61 into prestodb:main Nov 14, 2025
hantangwangd added a commit to prestodb/presto that referenced this pull request Nov 15, 2025
…#26373)

## Description

This PR is the first part of many PRs to support distributed procedure
into Presto. It is a split of the original entire PR which is located
here: #22659.

The whole work in this PR includes the following parts:

1. Re-factor `ProcedureRegistry`/`Procedure` data structure to support
the creation and register of `DistributedProcedure`. And make sure
`ProcedureRegistry` be available in `presto-analyzer` module and
connectors, so that we can recognize distributed procedures in call
statement during prepare analyze stages.

2. Handle call statement on distributed procedures in preparer stage. In
this stage, we figure out the procedure's type in call statement, and
define a new query type `CALL_DISTRIBUTED_PROCEDURE` for `call
distributed procedure` in `BuiltInPreparedQuery`. In this way, `call
distributed procedure` statement would be handled by
`SqlQueryExecutionFactory`, then be created and handled as a
`SqlQueryExecution`.

3. Analyze and plan the `call distributed procedure` statement based on
the subtype of the distributed procedure. For subtype
`TableDataRewriteDistributedProcedure`, ultimately generate a logical
plan for it as follows:

```
OutputNode <- TableFinishNode <- CallDistributedProcedureNode <- FilterNode <- TableScanNode
```

4. Optimize, segmentation, grouped tag and local plan for the logical
plan generated above. The handle logical for
`CallDistributedProcedureNode` is similar as `TableWriterNode`. Besides,
a new optimizer `RewriteWriterTarget` is added, which is placed after
all optimization rules. It is used to update the `TableHandle` held in
`TableFinishNode` and `CallDistributedProcedureNode` based on the
underlying `TableScanNode` after the entire optimization is completed,
considering the possible filter pushing down.

## Motivation and Context

prestodb/rfcs#12

## Impact

N/A

## Test Plan

- Add test cases in each phase involving the procedure architecture
expansion, including creating and registering for distributed
procedures, preparing for call distributed procedure, analyzing for call
distributed procedure, logical planning and optimizing for call
distributed procedure

## Contributor checklist

- [x] Please make sure your submission complies with our [contributing
guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md),
in particular [code
style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style)
and [commit
standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards).
- [x] PR description addresses the issue accurately and concisely. If
the change is non-trivial, a GitHub Issue is referenced.
- [ ] Documented new properties (with its default value), SQL syntax,
functions, or other functionality.
- [x] If release notes are required, they follow the [release notes
guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines).
- [x] Adequate tests were added if applicable.
- [x] CI passed.
- [ ] If adding new dependencies, verified they have an [OpenSSF
Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or
higher (or obtained explicit TSC approval for lower scores).

## Release Notes

```
== RELEASE NOTES ==

General Changes
 * Upgrade the procedure architecture to support distributed executing procedures

```
tdcmeehan pushed a commit to prestodb/presto that referenced this pull request Dec 8, 2025
…26717)

## Description

This PR add the developer documentation for procedures architecture in
Presto, covering the development of both normal procedures and
distributed procedures.

## Motivation and Context

#26679
prestodb/rfcs#12

## Impact

N/A

## Test Plan

N/A

## Contributor checklist

- [ ] Please make sure your submission complies with our [contributing
guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md),
in particular [code
style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style)
and [commit
standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards).
- [ ] PR description addresses the issue accurately and concisely. If
the change is non-trivial, a GitHub Issue is referenced.
- [ ] Documented new properties (with its default value), SQL syntax,
functions, or other functionality.
- [ ] If release notes are required, they follow the [release notes
guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines).
- [ ] Adequate tests were added if applicable.
- [ ] CI passed.
- [ ] If adding new dependencies, verified they have an [OpenSSF
Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or
higher (or obtained explicit TSC approval for lower scores).

## Release Notes

```
== NO RELEASE NOTE ==
```
hantangwangd added a commit to prestodb/presto that referenced this pull request Dec 8, 2025
## Description

This PR is the second part of many PRs to support distributed procedure
into Presto. It is a split of the original entire PR which is located
here: #22659.

The whole work in this PR includes the following parts:

1. Re-factor Iceberg connector to support `call distributed procedure`.
Introduce Iceberg's procedure context and expand `IcebergSplitManager`
to support split source planned by
`IcebergAbstractMetadata.beginCallDistributedProcedure(...)`. This split
source will be set to procedure context, and use procedure context to
hold all the files to be rewritten as well.

2. Support Iceberg `rewrite_data_files` procedure. It build a customized
split source, set the split source to procedure context in order to be
used in `IcebergSplitManager`. And register a file scan task consumer to
collector and hold all the scanned files into procedure context. Then
finally in the commit stage, get all the data files and delete files
that has been rewritten, and all the files that has been newly
generated, change and commit their metadata through Iceberg table's
`RewriteFiles` transaction.

## Motivation and Context

prestodb/rfcs#12

## Impact

N/A

## Test Plan

- Add test cases for validating the result and plan tree shape of
iceberg specific distributed procedure: `rewrite_data_files`

## Contributor checklist

- [ ] Please make sure your submission complies with our [contributing
guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md),
in particular [code
style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style)
and [commit
standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards).
- [ ] PR description addresses the issue accurately and concisely. If
the change is non-trivial, a GitHub Issue is referenced.
- [ ] Documented new properties (with its default value), SQL syntax,
functions, or other functionality.
- [ ] If release notes are required, they follow the [release notes
guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines).
- [ ] Adequate tests were added if applicable.
- [ ] CI passed.
- [ ] If adding new dependencies, verified they have an [OpenSSF
Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or
higher (or obtained explicit TSC approval for lower scores).

## Release Notes

```
== NO RELEASE NOTE ==
```
gggrace14 pushed a commit to gggrace14/rfcs that referenced this pull request Feb 10, 2026
gggrace14 pushed a commit to gggrace14/rfcs that referenced this pull request Feb 10, 2026
hantangwangd added a commit to prestodb/presto that referenced this pull request Apr 15, 2026
## Description

This PR is the third part of many PRs to support distributed procedure
into Presto. It is a split of the original entire PR which is located
here: #22659.

The whole work in this PR includes the necessary presto c++ protocol and
native changes to support calling distributed procedures in native
workers.

## Motivation and Context

prestodb/rfcs#12

## Impact

N/A

## Test Plan

N/A

## Contributor checklist

- [ ] Please make sure your submission complies with our [contributing
guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md),
in particular [code
style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style)
and [commit
standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards).
- [ ] PR description addresses the issue accurately and concisely. If
the change is non-trivial, a GitHub Issue is referenced.
- [ ] Documented new properties (with its default value), SQL syntax,
functions, or other functionality.
- [ ] If release notes are required, they follow the [release notes
guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines).
- [ ] Adequate tests were added if applicable.
- [ ] CI passed.
- [ ] If adding new dependencies, verified they have an [OpenSSF
Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or
higher (or obtained explicit TSC approval for lower scores).

## Release Notes

```
== NO RELEASE NOTE ==
```

## Summary by Sourcery

Add native execution and connector support for invoking distributed
procedures, including Iceberg-specific handling, and cover the Iceberg
rewrite_data_files procedure with native worker tests.

New Features:
- Support converting CallDistributedProcedure plan nodes into Velox
TableWriteNodes in the native query plan converter.
- Introduce connector-level conversion from ExecuteProcedureHandle to
Velox insert table handles for distributed procedures, with an
Iceberg-specific implementation for distributed procedure handles.

Enhancements:
- Extend the Iceberg connector protocol to carry distributed procedure
handles used by native workers.

Tests:
- Add end-to-end native worker tests for the Iceberg
system.rewrite_data_files procedure, including
partitioned/non-partitioned tables, filters, invalid parameters, and
partition evolution scenarios.
Copilot AI pushed a commit to Joe-Abraham/presto that referenced this pull request Apr 23, 2026
## Description

This PR is the third part of many PRs to support distributed procedure
into Presto. It is a split of the original entire PR which is located
here: prestodb#22659.

The whole work in this PR includes the necessary presto c++ protocol and
native changes to support calling distributed procedures in native
workers.

## Motivation and Context

prestodb/rfcs#12

## Impact

N/A

## Test Plan

N/A

## Contributor checklist

- [ ] Please make sure your submission complies with our [contributing
guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md),
in particular [code
style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style)
and [commit
standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards).
- [ ] PR description addresses the issue accurately and concisely. If
the change is non-trivial, a GitHub Issue is referenced.
- [ ] Documented new properties (with its default value), SQL syntax,
functions, or other functionality.
- [ ] If release notes are required, they follow the [release notes
guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines).
- [ ] Adequate tests were added if applicable.
- [ ] CI passed.
- [ ] If adding new dependencies, verified they have an [OpenSSF
Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or
higher (or obtained explicit TSC approval for lower scores).

## Release Notes

```
== NO RELEASE NOTE ==
```

## Summary by Sourcery

Add native execution and connector support for invoking distributed
procedures, including Iceberg-specific handling, and cover the Iceberg
rewrite_data_files procedure with native worker tests.

New Features:
- Support converting CallDistributedProcedure plan nodes into Velox
TableWriteNodes in the native query plan converter.
- Introduce connector-level conversion from ExecuteProcedureHandle to
Velox insert table handles for distributed procedures, with an
Iceberg-specific implementation for distributed procedure handles.

Enhancements:
- Extend the Iceberg connector protocol to carry distributed procedure
handles used by native workers.

Tests:
- Add end-to-end native worker tests for the Iceberg
system.rewrite_data_files procedure, including
partitioned/non-partitioned tables, filters, invalid parameters, and
partition evolution scenarios.
Co-authored-by: Joe-Abraham <53977252+Joe-Abraham@users.noreply.github.com>
msmygit pushed a commit to msmygit/presto that referenced this pull request Jun 3, 2026
## Description

This PR is the third part of many PRs to support distributed procedure
into Presto. It is a split of the original entire PR which is located
here: prestodb#22659.

The whole work in this PR includes the necessary presto c++ protocol and
native changes to support calling distributed procedures in native
workers.

## Motivation and Context

prestodb/rfcs#12

## Impact

N/A

## Test Plan

N/A

## Contributor checklist

- [ ] Please make sure your submission complies with our [contributing
guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md),
in particular [code
style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style)
and [commit
standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards).
- [ ] PR description addresses the issue accurately and concisely. If
the change is non-trivial, a GitHub Issue is referenced.
- [ ] Documented new properties (with its default value), SQL syntax,
functions, or other functionality.
- [ ] If release notes are required, they follow the [release notes
guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines).
- [ ] Adequate tests were added if applicable.
- [ ] CI passed.
- [ ] If adding new dependencies, verified they have an [OpenSSF
Scorecard](https://securityscorecards.dev/#the-checks) score of 5.0 or
higher (or obtained explicit TSC approval for lower scores).

## Release Notes

```
== NO RELEASE NOTE ==
```

## Summary by Sourcery

Add native execution and connector support for invoking distributed
procedures, including Iceberg-specific handling, and cover the Iceberg
rewrite_data_files procedure with native worker tests.

New Features:
- Support converting CallDistributedProcedure plan nodes into Velox
TableWriteNodes in the native query plan converter.
- Introduce connector-level conversion from ExecuteProcedureHandle to
Velox insert table handles for distributed procedures, with an
Iceberg-specific implementation for distributed procedure handles.

Enhancements:
- Extend the Iceberg connector protocol to carry distributed procedure
handles used by native workers.

Tests:
- Add end-to-end native worker tests for the Iceberg
system.rewrite_data_files procedure, including
partitioned/non-partitioned tables, filters, invalid parameters, and
partition evolution scenarios.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants