Conversation
AllstarVirgo
commented
May 29, 2025
- 优化了pg upsert语句的异常
- 支持了pg的特殊类型,包括range,
- 支持了text[]等非primary类型的数组
|
The current postgresql reader plugin support the |
|
@wgzhao Besides the unsupported data types mentioned earlier,PostgreSQL's upsert has a bug: conflicting columns should be excluded from updates. |
|
which version you used? in the latest version ,the data type sql > CREATE TABLE tmp_t (id int , col2 int4range, col3 TEXT[]);
sql > INSERT INTO tmp_t values(1, '[10,20]', '{"breakfast", "consulting"}');
sql > select * from tmp_t;
id|col2 |col3 |
--+-------+----------------------+
1|[10,21)|{breakfast,consulting}| |
I compiled the source code from the master branch, but the writer component is not functioning. It seems you tested the PostgreSQL reader instead. |
|
Reader is fine.What doesn't work is pg writer |
lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/writer/util/WriterUtil.java
Outdated
Show resolved
Hide resolved
|
@wgzhao Thanks for your advice.I used the driver for PostGIS at first because I wanted to distinguish between the geometry types and the PGobject. But I have found another way to avoid using PostGis driver by using the column type name. |
|
in my local env, the PR can not pass the following test: -- for read
CREATE TABLE tmp_t (id int , col2 int4range, col3 TEXT[], col4 varchar(100), col5 json);
INSERT INTO tmp_t values(1, '[10,20]', '{"breakfast", "consulting"}', '[10,21)', '{"breakfast":"consulting"}');
-- for write
CREATE TABLE tmp_write (id int , col2 int4range, col3 TEXT[], col4 varchar(100), col5 json);The task json is the following: {
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
}
},
"content": {
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "pguser",
"password": "",
"column": ["*"],
"connection": {
"table": ["tmp_t"],
"jdbcUrl": "jdbc:postgresql://localhost:5432/test"
}
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "pguser",
"password": "",
"column": ["*"],
"connection": {
"table": ["tmp_write"],
"jdbcUrl": "jdbc:postgresql://localhost:5432/test"
}
}
}
}
}
} |
|
you can also using the following json to test: {
"job": {
"content":
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "1",
"type": "long"
},
{
"value": "[10,20]",
"type": "string"
},
{
"value": "{\"breakfast\", \"consulting\"}",
"type": "string"
},
{
"value": "[10,21)",
"type": "string"
},
{
"value": "{\"breakfast\": \"consulting\"}",
"type": "string"
}
],
"sliceRecordCount": 1
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://localhost:5432/test",
"table": ["tmp_write"]
}
],
"username": "pguser",
"password": ""
}
}
},
"setting": {
"speed": {
"bytes": -1,
"channel": 1
}
}
}
} |
|
I fix my code and it works. This's my log: ==================== DEPRECATED WARNING ======================== 2025-06-11 15:35:10.677 [ main] INFO Engine - / _ \ | | | | 2025-06-11 15:35:10.809 [ main] INFO JobContainer - The jobContainer begins to process the job. |
| - 第一个框: 选择 JDK 版本,当前支持 17 版本,建议选择 17 版本 | ||
| - 第二个框: 选择 `addax-core` 模块 | ||
| - `Main class`: 填写 `com.wgzhao.addax.core.Engine` | ||
| - 点击 `Modify options`,在弹出的下拉框中,选择 `Add VM Options`,在增加的 `VM Options` 中,填写 `-Daddax.home=/opt/app/addax/4.0.3` |
| preparedStatement.setObject(columnIndex, pgObject); | ||
| return preparedStatement; | ||
| } | ||
| String columnTypeName = getColumnTypeName(columnIndex); |
There was a problem hiding this comment.
Since you have already obtained the MetaData information of the database table, it implies that you are aware of the field type names (data type label names) for each field. Therefore, you only need to directly use setObject(index, value, dataType) based on this type, or convert the data to the corresponding PGObject according to type requirements. If there are any issues with the conversion, simply throw an exception. Relying on the representation of data to infer types can easily lead to oversights and cannot exhaustively enumerate all possible types.
There was a problem hiding this comment.
Thanks for your advice. I removed the handling for all types except two kinds.
The first type is an array: The type column of an array starts with prefix "_" and it should be transformed to type with "[]" appended.
The second type is an empty geometry ZM.The query result is a string in the format "xxx EMPTY".It must be transferred to "xxxx ZM EMPTY", or it will throw an exception by postgresql driver.
|
Thank you for your PR! I'll make a small modification to align it with the project's overall code style after merging. |
|
Thank you for reviewing my PR! I appreciate your guidance on aligning the
code style. Could you also let me know if there are other areas in the
project where I could contribute? I’d be happy to help further.
… Message ID: ***@***.***>
|
|
I noticed that you have removed some code about upserting data which I think shoulb be preserved. The conflict keys should not be updated. Otherwise, the Postgresql driver will throw an exception. I will show you an example. {
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"username": "rds_user",
"password": "Changeme_123",
"connection": [
{
"jdbcUrl": ["jdbc:postgresql://10.1.0.139:5432/silas-resource?options=-c%20search_path=public,postgis,topology"],
"driver": "org.postgresql.Driver",
"table": ["deprecated.spatial_events"]
}
],
"column":[
"id",
"event_name",
"event_number",
"event_time",
"event_location",
"event_area",
"event_path",
"event_tags",
"event_scores",
"traj",
"e_int4range",
"e_int8range",
"e_tsrange",
"e_tstzrange",
"e_daterange"
]
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"writeMode": "update (id)",
"username": "dbadmin",
"password": "IdeaRoot@2023",
"connection": [
{
"jdbcUrl": "jdbc:postgresql://10.1.3.183:5432/silas-warehouse?currentSchema=dim",
"driver": "org.postgresql.Driver",
"table": ["deprecated.spatial_events_test"]
}
],
"hasZColumns":[10],
"column": [
"id",
"event_name",
"event_number",
"event_time",
"event_location",
"event_area",
"event_path",
"event_tags",
"event_scores",
"traj"
"e_int4range",
"e_int8range",
"e_tsrange",
"e_tstzrange",
"e_daterange"
],
"uniqueKey":["id"],
"preSql": [],
"postSql": [],
"batchSize": 1000
}
}
}
]
}
}And the output is : |
|
can you provide the table DDL ? and it's better to provide sample data |
oh, there have addax-admin and correspond addax-ui 。 They are currently in the early stages of development. |
I ran the job and the result is: |
Thank you for pointing me to the addax-admin project! I'm excited to contribute to it, especially since it's in the early stages of development.Could you share more details about the current priorities or specific areas where I could help, such as open issues, feature development, or documentation?I'm happy to dive into tasks that align with the project's needs. Given my experience with [Java/databases], could you suggest specific tasks or open issues where I could make an impact? |
|
you make a mistake:
you can try the following task: {
"job": {
"content":
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "2",
"type": "long"
},
{
"value": "[10,20]",
"type": "string"
},
{
"value": "{\"breakfast\", \"consulting\"}",
"type": "string"
},
{
"value": "[10,21)",
"type": "string"
},
{
"value": "{\"breakfast\": \"consulting\"}",
"type": "string"
}
],
"sliceRecordCount": 1
}
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://10.1.0.139:5432/silas-resource",
"table": ["deprecated.tmp_t"]
}
],
"writeMode": "update (id)",
"username": "rds_user",
"password": "Changeme_123"
}
}
},
"setting": {
"speed": {
"bytes": -1,
"channel": 1
}
}
}
} |
I apologize for the incorrect configuration in my setup, as I was referencing my prior experience with DataX. The config did work on PG 16. However, it didn't work on Pg 10 with distributed deployment. Besides that, the conflict keys should not be updated because it's the same as before. The exception: |
|
can it pass on Postgresql 10 with distribution after adding the code |
… conflict columns in an update statement in PostgreSQL distribution deployment, if the primary key field in the update clause, it occurred the following error: Distributed column "id" can't be updated in the current version it refers to #1283
|
Yes, I updated the code in the master branch, and it works now.
wgzhao ***@***.***> 于2025年6月12日周四 15:32写道:
… *wgzhao* left a comment (wgzhao/Addax#1283)
<#1283 (comment)>
can it pass on Postgresql 10 with distribution after adding the code .filter(column
-> !conflictColumns.contains(column)) in WriterUtil#doPostgresqlUpdate
method?
—
Reply to this email directly, view it on GitHub
<#1283 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AFN73O6UIWEMQZLOURRM6HL3DEUJ5AVCNFSM6AAAAAB6FCHDBGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDSNRVGQ3DGMRTGI>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|


