Skip to content

feat: 支持pg range 类型#1283

Merged
wgzhao merged 7 commits intowgzhao:masterfrom
AllstarVirgo:fte/enhancepg
Jun 12, 2025
Merged

feat: 支持pg range 类型#1283
wgzhao merged 7 commits intowgzhao:masterfrom
AllstarVirgo:fte/enhancepg

Conversation

@AllstarVirgo
Copy link
Contributor

  1. 优化了pg upsert语句的异常
  2. 支持了pg的特殊类型,包括range,
  3. 支持了text[]等非primary类型的数组

@wgzhao
Copy link
Owner

wgzhao commented May 30, 2025

The current postgresql reader plugin support the range, text data type.

@AllstarVirgo
Copy link
Contributor Author

image text[] is not supported now

@AllstarVirgo
Copy link
Contributor Author

@wgzhao Besides the unsupported data types mentioned earlier,PostgreSQL's upsert has a bug: conflicting columns should be excluded from updates.
image

@wgzhao
Copy link
Owner

wgzhao commented Jun 3, 2025

which version you used? in the latest version ,the data type text[] is also supported, here is my test result:

sql > CREATE TABLE tmp_t (id int , col2 int4range, col3 TEXT[]);
sql > INSERT INTO tmp_t values(1, '[10,20]', '{"breakfast", "consulting"}');
sql > select * from tmp_t;

id|col2   |col3                  |
--+-------+----------------------+
 1|[10,21)|{breakfast,consulting}|
2025-06-03 13:50:53.130 [        main] INFO  Engine               - 
  ___      _     _            
 / _ \    | |   | |           
/ /_\ \ __| | __| | __ ___  __
|  _  |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |>  < 
\_| |_/\__,_|\__,_|\__,_/_/\_\
:: Addax version ::    (v5.1.3-SNAPSHOT)
2025-06-03 13:50:53.226 [        main] INFO  Engine               - 
{
	"setting":{
		"speed":{
			"byte":-1,
			"channel":1
		}
	},
	"content":{
		"reader":{
			"name":"postgresqlreader",
			"parameter":{
				"username":"pguser",
				"password":"*****",
				"autoPk":true,
				"column":[
					"*"
				],
				"connection":{
					"table":[
						"tmp_t"
					],
					"jdbcUrl":"jdbc:postgresql://etl02:5432/dolphinscheduler"
				}
			}
		},
		"writer":{
			"name":"streamwriter",
			"parameter":{
				"print":true
			}
		}
	}
}

2025-06-03 13:50:53.237 [        main] INFO  JobContainer         - The jobContainer begins to process the job.
2025-06-03 13:50:53.683 [       job-0] WARN  OriginalConfPretreatmentUtil - There are some risks in the column configuration. Because you did not configure the columns to read the database table, changes in the number and types of fields in your table may affect the correctness of the task or even cause errors.
2025-06-03 13:50:53.683 [       job-0] INFO  CommonRdbmsReader$Job - The split key is not configured, try to guess the split key.
2025-06-03 13:50:54.060 [       job-0] WARN  CommonRdbmsReader$Job - There is no primary key or unique key in the table, and the split key cannot be guessed.
2025-06-03 13:50:54.064 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] perform prepare work .
2025-06-03 13:50:54.065 [       job-0] INFO  JobContainer         - The Writer.Job [streamwriter] perform prepare work .
2025-06-03 13:50:54.065 [       job-0] INFO  JobContainer         - Job set Channel-Number to 1 channel(s).
2025-06-03 13:50:54.068 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] is divided into [1] task(s).
2025-06-03 13:50:54.068 [       job-0] INFO  JobContainer         - The Writer.Job [streamwriter] is divided into [1] task(s).
2025-06-03 13:50:54.084 [       job-0] INFO  JobContainer         - The Scheduler launches [1] taskGroup(s).
2025-06-03 13:50:54.088 [ taskGroup-0] INFO  TaskGroupContainer   - The taskGroupId=[0] started [1] channels for [1] tasks.
2025-06-03 13:50:54.089 [ taskGroup-0] INFO  Channel              - The Channel set byte_speed_limit to -1, No bps activated.
2025-06-03 13:50:54.089 [ taskGroup-0] INFO  Channel              - The Channel set record_speed_limit to -1, No tps activated.
2025-06-03 13:50:54.094 [  reader-0-0] INFO  CommonRdbmsReader$Task - Begin reading records by executing SQL query: [SELECT * FROM tmp_t ].
2025-06-03 13:50:54.474 [  reader-0-0] INFO  CommonRdbmsReader$Task - Finished reading records by executing SQL query: [SELECT * FROM tmp_t ].

1       [10,21) {breakfast,consulting}

2025-06-03 13:50:57.098 [       job-0] INFO  AbstractScheduler    - The scheduler has completed all tasks.
2025-06-03 13:50:57.098 [       job-0] INFO  JobContainer         - The Writer.Job [streamwriter] perform post work.
2025-06-03 13:50:57.099 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] perform post work.
2025-06-03 13:50:57.105 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 1 records, 30 bytes | Speed 10B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2025-06-03 13:50:57.106 [       job-0] INFO  JobContainer         - 
Job start  at             : 2025-06-03 13:50:53
Job end    at             : 2025-06-03 13:50:57
Job took secs             :                  3s
Average   bps             :               10B/s
Average   rps             :              0rec/s
Number of rec             :                   1
Failed record             :                   0

@AllstarVirgo
Copy link
Contributor Author

which version you used? in the latest version ,the data type text[] is also supported, here is my test result:

sql > CREATE TABLE tmp_t (id int , col2 int4range, col3 TEXT[]);
sql > INSERT INTO tmp_t values(1, '[10,20]', '{"breakfast", "consulting"}');
sql > select * from tmp_t;

id|col2   |col3                  |
--+-------+----------------------+
 1|[10,21)|{breakfast,consulting}|
2025-06-03 13:50:53.130 [        main] INFO  Engine               - 
  ___      _     _            
 / _ \    | |   | |           
/ /_\ \ __| | __| | __ ___  __
|  _  |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |>  < 
\_| |_/\__,_|\__,_|\__,_/_/\_\
:: Addax version ::    (v5.1.3-SNAPSHOT)
2025-06-03 13:50:53.226 [        main] INFO  Engine               - 
{
	"setting":{
		"speed":{
			"byte":-1,
			"channel":1
		}
	},
	"content":{
		"reader":{
			"name":"postgresqlreader",
			"parameter":{
				"username":"pguser",
				"password":"*****",
				"autoPk":true,
				"column":[
					"*"
				],
				"connection":{
					"table":[
						"tmp_t"
					],
					"jdbcUrl":"jdbc:postgresql://etl02:5432/dolphinscheduler"
				}
			}
		},
		"writer":{
			"name":"streamwriter",
			"parameter":{
				"print":true
			}
		}
	}
}

2025-06-03 13:50:53.237 [        main] INFO  JobContainer         - The jobContainer begins to process the job.
2025-06-03 13:50:53.683 [       job-0] WARN  OriginalConfPretreatmentUtil - There are some risks in the column configuration. Because you did not configure the columns to read the database table, changes in the number and types of fields in your table may affect the correctness of the task or even cause errors.
2025-06-03 13:50:53.683 [       job-0] INFO  CommonRdbmsReader$Job - The split key is not configured, try to guess the split key.
2025-06-03 13:50:54.060 [       job-0] WARN  CommonRdbmsReader$Job - There is no primary key or unique key in the table, and the split key cannot be guessed.
2025-06-03 13:50:54.064 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] perform prepare work .
2025-06-03 13:50:54.065 [       job-0] INFO  JobContainer         - The Writer.Job [streamwriter] perform prepare work .
2025-06-03 13:50:54.065 [       job-0] INFO  JobContainer         - Job set Channel-Number to 1 channel(s).
2025-06-03 13:50:54.068 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] is divided into [1] task(s).
2025-06-03 13:50:54.068 [       job-0] INFO  JobContainer         - The Writer.Job [streamwriter] is divided into [1] task(s).
2025-06-03 13:50:54.084 [       job-0] INFO  JobContainer         - The Scheduler launches [1] taskGroup(s).
2025-06-03 13:50:54.088 [ taskGroup-0] INFO  TaskGroupContainer   - The taskGroupId=[0] started [1] channels for [1] tasks.
2025-06-03 13:50:54.089 [ taskGroup-0] INFO  Channel              - The Channel set byte_speed_limit to -1, No bps activated.
2025-06-03 13:50:54.089 [ taskGroup-0] INFO  Channel              - The Channel set record_speed_limit to -1, No tps activated.
2025-06-03 13:50:54.094 [  reader-0-0] INFO  CommonRdbmsReader$Task - Begin reading records by executing SQL query: [SELECT * FROM tmp_t ].
2025-06-03 13:50:54.474 [  reader-0-0] INFO  CommonRdbmsReader$Task - Finished reading records by executing SQL query: [SELECT * FROM tmp_t ].

1       [10,21) {breakfast,consulting}

2025-06-03 13:50:57.098 [       job-0] INFO  AbstractScheduler    - The scheduler has completed all tasks.
2025-06-03 13:50:57.098 [       job-0] INFO  JobContainer         - The Writer.Job [streamwriter] perform post work.
2025-06-03 13:50:57.099 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] perform post work.
2025-06-03 13:50:57.105 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 1 records, 30 bytes | Speed 10B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2025-06-03 13:50:57.106 [       job-0] INFO  JobContainer         - 
Job start  at             : 2025-06-03 13:50:53
Job end    at             : 2025-06-03 13:50:57
Job took secs             :                  3s
Average   bps             :               10B/s
Average   rps             :              0rec/s
Number of rec             :                   1
Failed record             :                   0

I compiled the source code from the master branch, but the writer component is not functioning. It seems you tested the PostgreSQL reader instead.

@AllstarVirgo
Copy link
Contributor Author

Reader is fine.What doesn't work is pg writer

@AllstarVirgo
Copy link
Contributor Author

@wgzhao Thanks for your advice.I used the driver for PostGIS at first because I wanted to distinguish between the geometry types and the PGobject. But I have found another way to avoid using PostGis driver by using the column type name.

@wgzhao
Copy link
Owner

wgzhao commented Jun 11, 2025

in my local env, the PR can not pass the following test:

-- for read
CREATE TABLE tmp_t (id int , col2 int4range, col3 TEXT[], col4 varchar(100), col5 json);
INSERT INTO tmp_t values(1, '[10,20]', '{"breakfast", "consulting"}', '[10,21)', '{"breakfast":"consulting"}');
-- for write
CREATE TABLE tmp_write (id int , col2 int4range, col3 TEXT[], col4 varchar(100), col5 json);

The task json is the following:

{
  "job": {
    "setting": {
      "speed": {
        "byte": -1,
        "channel": 1
      }
    },
    "content": {
      "reader": {
        "name": "postgresqlreader",
        "parameter": {
          "username": "pguser",
          "password": "",
          "column": ["*"],
          "connection": {
            "table": ["tmp_t"],
            "jdbcUrl": "jdbc:postgresql://localhost:5432/test"
          }
        }
      },
      "writer": {
        "name": "postgresqlwriter",
        "parameter": {
          "username": "pguser",
          "password": "",
          "column": ["*"],
          "connection": {
            "table": ["tmp_write"],
            "jdbcUrl": "jdbc:postgresql://localhost:5432/test"
          }
        }
      }
    }
  }
}
2025-06-11 10:36:18.946 [        main] INFO  Engine               - 
  ___      _     _            
 / _ \    | |   | |           
/ /_\ \ __| | __| | __ ___  __
|  _  |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |>  < 
\_| |_/\__,_|\__,_|\__,_/_/\_\
:: Addax version ::    (v6.0.1-SNAPSHOT)
2025-06-11 10:36:19.059 [        main] INFO  Engine               - 

......

2025-06-11 10:36:19.072 [        main] INFO  JobContainer         - The jobContainer begins to process the job.
2025-06-11 10:36:19.564 [       job-0] WARN  OriginalConfPretreatmentUtil - There are some risks in the column configuration. Because you did not configure the columns to read the database table, changes in the number and types of fields in your table may affect the correctness of the task or even cause errors.
2025-06-11 10:36:20.040 [       job-0] INFO  OriginalConfPretreatmentUtil - The table [tmp_write] has columns [id,col2,col3,col4,col5].
2025-06-11 10:36:20.040 [       job-0] WARN  OriginalConfPretreatmentUtil - There are some risks in the column configuration. Because you did not configure the columns to read the database table, changes in the number and types of fields in your table may affect the correctness of the task or even cause errors.
2025-06-11 10:36:20.043 [       job-0] INFO  OriginalConfPretreatmentUtil - Writing data using [INSERT INTO %s ( id,col2,col3,col4,col5) VALUES ( ?,?,?,?,? )].
2025-06-11 10:36:20.044 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] perform prepare work .
2025-06-11 10:36:20.044 [       job-0] INFO  JobContainer         - The Writer.Job [postgresqlwriter] perform prepare work .
2025-06-11 10:36:20.044 [       job-0] INFO  JobContainer         - Job set Channel-Number to 1 channel(s).
2025-06-11 10:36:20.048 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] is divided into [1] task(s).
2025-06-11 10:36:20.048 [       job-0] INFO  JobContainer         - The Writer.Job [postgresqlwriter] is divided into [1] task(s).
2025-06-11 10:36:20.066 [       job-0] INFO  JobContainer         - The Scheduler launches [1] taskGroup(s).
2025-06-11 10:36:20.071 [ taskGroup-0] INFO  TaskGroupContainer   - The taskGroupId=[0] started [1] channels for [1] tasks.
2025-06-11 10:36:20.072 [ taskGroup-0] INFO  Channel              - The Channel set byte_speed_limit to -1, No bps activated.
2025-06-11 10:36:20.072 [ taskGroup-0] INFO  Channel              - The Channel set record_speed_limit to -1, No tps activated.
2025-06-11 10:36:20.078 [  reader-0-0] INFO  CommonRdbmsReader$Task - Begin reading records by executing SQL query: [SELECT * FROM tmp_t ].
2025-06-11 10:36:20.860 [  reader-0-0] INFO  CommonRdbmsReader$Task - Finished reading records by executing SQL query: [SELECT * FROM tmp_t ].
2025-06-11 10:36:21.222 [  writer-0-0] WARN  CommonRdbmsWriter$Task - Rolling back the write, try to write one line at a time. because: Batch entry 0 INSERT INTO tmp_write ( id,col2,col3,col4,col5) VALUES ( ('1'::int8),('{"columnTypeName":"int4range","rawData":"{\"null\":false,\"type\":\"int4range\",\"value\":\"[10,21)\"}"}'),('{"breakfast","consulting"}'),('[10,21)'),('{"columnTypeName":"json","rawData":"{\"null\":false,\"type\":\"json\",\"value\":\"{\\\"breakfast\\\":\\\"consulting\\\"}\"}"}') ) was aborted: 错误: 有缺陷的范围字串:"{"columnTypeName":"int4range","rawData":"{\"null\":false,\"type\":\"int4range\",\"value\":\"[10,21)\"}"}"
  详细:缺少一个左大括弧或左方括弧.
  在位置:unnamed portal parameter $2 = '...'  Call getNextException to see other errors in the batch.
2025-06-11 10:36:21.237 [  writer-0-0] ERROR StdoutPluginCollector - 
org.postgresql.util.PSQLException: 错误: 有缺陷的范围字串:"{"columnTypeName":"int4range","rawData":"{\"null\":false,\"type\":\"int4range\",\"value\":\"[10,21)\"}"}"
  详细:缺少一个左大括弧或左方括弧.
  在位置:unnamed portal parameter $2 = '...'
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355)
	at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:490)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:408)
	at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:167)
	at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:156)
	at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:95)
	at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:95)
	at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.doOneInsert(CommonRdbmsWriter.java:447)
	at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.doBatchInsert(CommonRdbmsWriter.java:426)
	at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.startWriteWithConnection(CommonRdbmsWriter.java:326)
	at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.startWrite(CommonRdbmsWriter.java:344)
	at com.wgzhao.addax.plugin.writer.postgresqlwriter.PostgresqlWriter$Task.startWrite(PostgresqlWriter.java:246)
	at com.wgzhao.addax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:74)
	at java.base/java.lang.Thread.run(Thread.java:840)
2025-06-11 10:36:21.243 [  writer-0-0] ERROR StdoutPluginCollector - The dirty data: {"exception":"错误: 有缺陷的范围字串:\"{\"columnTypeName\":\"int4range\",\"rawData\":\"{\\\"null\\\":false,\\\"type\\\":\\\"int4range\\\",\\\"value\\\":\\\"[10,21)\\\"}\"}\"\n  详细:缺少一个左大括弧或左方括弧.\n  在位置:unnamed portal parameter $2 = '...'","record":[{"byteSize":1,"rawData":1,"type":"LONG"},{"byteSize":104,"rawData":[123,34,99,111,108,117,109,110,84,121,112,101,78,97,109,101,34,58,34,105,110,116,52,114,97,110,103,101,34,44,34,114,97,119,68,97,116,97,34,58,34,123,92,34,110,117,108,108,92,34,58,102,97,108,115,101,44,92,34,116,121,112,101,92,34,58,92,34,105,110,116,52,114,97,110,103,101,92,34,44,92,34,118,97,108,117,101,92,34,58,92,34,91,49,48,44,50,49,41,92,34,125,34,125],"type":"BYTES"},{"byteSize":60,"rawData":[123,34,99,111,108,117,109,110,84,121,112,101,78,97,109,101,34,58,34,116,101,120,116,34,44,34,114,97,119,68,97,116,97,34,58,34,123,98,114,101,97,107,102,97,115,116,44,99,111,110,115,117,108,116,105,110,103,125,34,125],"type":"BYTES"},{"byteSize":7,"rawData":"[10,21)","type":"STRING"},{"byteSize":125,"rawData":[123,34,99,111,108,117,109,110,84,121,112,101,78,97,109,101,34,58,34,106,115,111,110,34,44,34,114,97,119,68,97,116,97,34,58,34,123,92,34,110,117,108,108,92,34,58,102,97,108,115,101,44,92,34,116,121,112,101,92,34,58,92,34,106,115,111,110,92,34,44,92,34,118,97,108,117,101,92,34,58,92,34,123,92,92,92,34,98,114,101,97,107,102,97,115,116,92,92,92,34,58,92,92,92,34,99,111,110,115,117,108,116,105,110,103,92,92,92,34,125,92,34,125,34,125],"type":"BYTES"}],"type":"writer"}
2025-06-11 10:36:23.083 [       job-0] INFO  AbstractScheduler    - The scheduler has completed all tasks.
2025-06-11 10:36:23.084 [       job-0] INFO  JobContainer         - The Writer.Job [postgresqlwriter] perform post work.
2025-06-11 10:36:23.084 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] perform post work.
2025-06-11 10:36:23.093 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 1 records, 297 bytes | Speed 99B/s, 0 records/s | Error 1 records, 297 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2025-06-11 10:36:23.095 [       job-0] INFO  JobContainer         - 
Job start  at             : 2025-06-11 10:36:19
Job end    at             : 2025-06-11 10:36:23
Job took secs             :                  4s
Average   bps             :               99B/s
Average   rps             :              0rec/s
Number of rec             :                   1
Failed record             :                   1

@wgzhao
Copy link
Owner

wgzhao commented Jun 11, 2025

you can also using the following json to test:

{
  "job": {
    "content": 
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "1",
                "type": "long"
              },
              {
                "value": "[10,20]",
                "type": "string"
              },
              {
                "value": "{\"breakfast\", \"consulting\"}",
                "type": "string"
              },
              {
                "value": "[10,21)",
                "type": "string"
              },
              {
                "value": "{\"breakfast\": \"consulting\"}",
                "type": "string"
              }
            ],
            "sliceRecordCount": 1
          }
        },
        "writer": {
          "name": "postgresqlwriter",
          "parameter": {
            "column": ["*"],
            "connection": [
              {
                "jdbcUrl": "jdbc:postgresql://localhost:5432/test",
                "table": ["tmp_write"]
              }
            ],
            "username": "pguser",
            "password": ""
          }
        }
      },
    "setting": {
      "speed": {
        "bytes": -1,
        "channel": 1
      }
    }
  }
}

@AllstarVirgo
Copy link
Contributor Author

I fix my code and it works. This's my log:

==================== DEPRECATED WARNING ========================
addax.py is deprecated, It's going to be removed in future release.
As a replacement, you can use addax.sh to run job
==================== DEPRECATED WARNING ========================

2025-06-11 15:35:10.677 [ main] INFO Engine -


/ _ \ | | | |
/ /\ \ __| | __| | __ ___ __
| _ |/ |/ _ |/ ` \ / /
| | | | (
| | (
| | (
| |> <
_| |/_,|_,|_,//_
:: Addax version :: (v5.1.3-SNAPSHOT)
2025-06-11 15:35:10.793 [ main] INFO Engine -
{
"content":{
"reader":{
"name":"streamreader",
"parameter":{
"column":[
{
"value":"1",
"type":"long"
},
{
"value":"[10,20]",
"type":"string"
},
{
"value":"{"breakfast", "consulting"}",
"type":"string"
},
{
"value":"[10,21)",
"type":"string"
},
{
"value":"{"breakfast": "consulting"}",
"type":"string"
}
],
"sliceRecordCount":1
}
},
"writer":{
"name":"postgresqlwriter",
"parameter":{
"column":[
"*"
],
"connection":{
"jdbcUrl":"jdbc:postgresql://10.1.0.139:5432/test",
"table":[
"deprecated.tmp_write"
]
},
"username":"xxxxxxx",
"password":"*****"
}
}
},
"setting":{
"speed":{
"bytes":-1,
"channel":1
}
}
}

2025-06-11 15:35:10.809 [ main] INFO JobContainer - The jobContainer begins to process the job.
2025-06-11 15:35:11.769 [ job-0] INFO OriginalConfPretreatmentUtil - The table [deprecated.tmp_write] has columns [id,col2,col3,col4,col5].
2025-06-11 15:35:11.770 [ job-0] WARN OriginalConfPretreatmentUtil - There are some risks in the column configuration. Because you did not configure the columns to read the database table, changes in the number and types of fields in your table may affect the correctness of the task or even cause errors.
2025-06-11 15:35:11.772 [ job-0] INFO OriginalConfPretreatmentUtil - Writing data using [INSERT INTO %s ( id,col2,col3,col4,col5) VALUES ( ?,?,?,?,? )].
2025-06-11 15:35:11.772 [ job-0] INFO JobContainer - The Reader.Job [streamreader] perform prepare work .
2025-06-11 15:35:11.772 [ job-0] INFO JobContainer - The Writer.Job [postgresqlwriter] perform prepare work .
2025-06-11 15:35:11.772 [ job-0] INFO JobContainer - Job set Channel-Number to 1 channel(s).
2025-06-11 15:35:11.773 [ job-0] INFO JobContainer - The Reader.Job [streamreader] is divided into [1] task(s).
2025-06-11 15:35:11.773 [ job-0] INFO JobContainer - The Writer.Job [postgresqlwriter] is divided into [1] task(s).
2025-06-11 15:35:11.786 [ job-0] INFO JobContainer - The Scheduler launches [1] taskGroup(s).
2025-06-11 15:35:11.790 [ taskGroup-0] INFO TaskGroupContainer - The taskGroupId=[0] started [1] channels for [1] tasks.
2025-06-11 15:35:11.791 [ taskGroup-0] INFO Channel - The Channel set byte_speed_limit to -1, No bps activated.
2025-06-11 15:35:11.792 [ taskGroup-0] INFO Channel - The Channel set record_speed_limit to -1, No tps activated.
2025-06-11 15:35:14.802 [ job-0] INFO AbstractScheduler - The scheduler has completed all tasks.
2025-06-11 15:35:14.804 [ job-0] INFO JobContainer - The Writer.Job [postgresqlwriter] perform post work.
2025-06-11 15:35:14.805 [ job-0] INFO JobContainer - The Reader.Job [streamreader] perform post work.
2025-06-11 15:35:14.812 [ job-0] INFO StandAloneJobContainerCommunicator - Total 1 records, 69 bytes | Speed 23B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2025-06-11 15:35:14.814 [ job-0] INFO JobContainer -
Job start at : 2025-06-11 15:35:10
Job end at : 2025-06-11 15:35:14
Job took secs : 3s
Average bps : 23B/s
Average rps : 0rec/s
Number of rec : 1
Failed record : 0

- 第一个框: 选择 JDK 版本,当前支持 17 版本,建议选择 17 版本
- 第二个框: 选择 `addax-core` 模块
- `Main class`: 填写 `com.wgzhao.addax.core.Engine`
- 点击 `Modify options`,在弹出的下拉框中,选择 `Add VM Options`,在增加的 `VM Options` 中,填写 `-Daddax.home=/opt/app/addax/4.0.3`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the default strategy for the addax-rdbms dependency is provided, adding the library path to the classpath is necessary when debugging the code.
image

preparedStatement.setObject(columnIndex, pgObject);
return preparedStatement;
}
String columnTypeName = getColumnTypeName(columnIndex);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have already obtained the MetaData information of the database table, it implies that you are aware of the field type names (data type label names) for each field. Therefore, you only need to directly use setObject(index, value, dataType) based on this type, or convert the data to the corresponding PGObject according to type requirements. If there are any issues with the conversion, simply throw an exception. Relying on the representation of data to infer types can easily lead to oversights and cannot exhaustively enumerate all possible types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice. I removed the handling for all types except two kinds.
The first type is an array: The type column of an array starts with prefix "_" and it should be transformed to type with "[]" appended.
The second type is an empty geometry ZM.The query result is a string in the format "xxx EMPTY".It must be transferred to "xxxx ZM EMPTY", or it will throw an exception by postgresql driver.

@wgzhao
Copy link
Owner

wgzhao commented Jun 12, 2025

Thank you for your PR! I'll make a small modification to align it with the project's overall code style after merging.

@wgzhao wgzhao merged commit 6f95151 into wgzhao:master Jun 12, 2025
2 checks passed
@AllstarVirgo
Copy link
Contributor Author

AllstarVirgo commented Jun 12, 2025 via email

@AllstarVirgo
Copy link
Contributor Author

AllstarVirgo commented Jun 12, 2025

I noticed that you have removed some code about upserting data which I think shoulb be preserved. The conflict keys should not be updated. Otherwise, the Postgresql driver will throw an exception.
My pg version is: PostgreSQL 10.0 @ TBase_v5.21.8.8 (commit: 983bd7692) 2025-02-14 13:50:50 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 7.3.0, 64-bit

I will show you an example.
The config is:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "postgresqlreader",
                    "parameter": {
                        "username": "rds_user",
                        "password": "Changeme_123",
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:postgresql://10.1.0.139:5432/silas-resource?options=-c%20search_path=public,postgis,topology"],
                                "driver": "org.postgresql.Driver",
                                "table": ["deprecated.spatial_events"]
                            }
                        ],
                        "column":[
                            "id",
                            "event_name",
                            "event_number",
                            "event_time",
                            "event_location",
                            "event_area",
                            "event_path",
                            "event_tags",
                            "event_scores",
                            "traj",
                            "e_int4range",
                            "e_int8range",
                            "e_tsrange",
                            "e_tstzrange",
                            "e_daterange"
                        ]
                    }
                },
                "writer": {
                    "name": "postgresqlwriter",
                    "parameter": {
                        "writeMode": "update (id)",
                        "username": "dbadmin",
                        "password": "IdeaRoot@2023",
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:postgresql://10.1.3.183:5432/silas-warehouse?currentSchema=dim",
                                "driver": "org.postgresql.Driver",
                                "table": ["deprecated.spatial_events_test"]
                            }
                        ],
                        "hasZColumns":[10],
                        "column": [
                            "id",
                            "event_name",
                            "event_number",
                            "event_time",
                            "event_location",
                            "event_area",
                            "event_path",
                            "event_tags",
                            "event_scores",
                            "traj"
                            "e_int4range",
                            "e_int8range",
                            "e_tsrange",
                            "e_tstzrange",
                            "e_daterange"
                        ],
                        "uniqueKey":["id"],
                        "preSql": [],
                        "postSql": [],
                        "batchSize": 1000
                    }
                }
            }
        ]
    }
}

And the output is :

==================== DEPRECATED WARNING ========================
addax.py is deprecated, It's going to be removed in future release.
As a replacement, you can use addax.sh to run job
==================== DEPRECATED WARNING ========================

2025-06-12 12:51:19.019 [        main] INFO  Engine               - 
  ___      _     _            
 / _ \    | |   | |           
/ /_\ \ __| | __| | __ ___  __
|  _  |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |>  < 
\_| |_/\__,_|\__,_|\__,_/_/\_\
:: Addax version ::    (v6.0.1-SNAPSHOT)
2025-06-12 12:51:19.121 [        main] INFO  Engine               - 
{
        "setting":{
                "speed":{
                        "channel":2
                }
        },
        "content":{
                "reader":{
                        "name":"postgresqlreader",
                        "parameter":{
                                "username":"****",
                                "password":"*****",
                                "connection":{
                                        "driver":"org.postgresql.Driver",
                                        "jdbcUrl":"jdbc:postgresql://10.1.0.139:5432/silas-resource?options=-c%20search_path=public,postgis,topology",
                                        "table":[
                                                "deprecated.spatial_events"
                                        ]
                                },
                                "column":[
                                        "id",
                                        "event_name",
                                        "event_number",
                                        "event_time",
                                        "event_location",
                                        "event_area",
                                        "event_path",
                                        "event_tags",
                                        "event_scores",
                                        "traj",
                                        "e_int4range",
                                        "e_int8range",
                                        "e_tsrange",
                                        "e_tstzrange",
                                        "e_daterange"
                                ]
                        }
                },
                "writer":{
                        "name":"postgresqlwriter",
                        "parameter":{
                                "writeMode":"update (id)",
                                "username":"****",
                                "password":"*****",
                                "connection":{
                                        "driver":"org.postgresql.Driver",
                                        "jdbcUrl":"jdbc:postgresql://10.1.3.183:5432/silas-warehouse?currentSchema=dim",
                                        "table":[
                                                "deprecated.spatial_events_test"
                                        ]
                                },
                                "hasZColumns":[
                                        10
                                ],
                                "column":[
                                        "id",
                                        "event_name",
                                        "event_number",
                                        "event_time",
                                        "event_location",
                                        "event_area",
                                        "event_path",
                                        "event_tags",
                                        "event_scores",
                                        "traj",
                                        "e_int4range",
                                        "e_int8range",
                                        "e_tsrange",
                                        "e_tstzrange",
                                        "e_daterange"
                                ],
                                "uniqueKey":[
                                        "id"
                                ],
                                "preSql":[],
                                "postSql":[],
                                "batchSize":1000
                        }
                }
        }
}

2025-06-12 12:51:19.136 [        main] INFO  JobContainer         - The jobContainer begins to process the job.
2025-06-12 12:51:19.142 [       job-0] WARN  OriginalConfPretreatmentUtil - Use specified driver class: org.postgresql.Driver
2025-06-12 12:51:20.711 [       job-0] INFO  OriginalConfPretreatmentUtil - The table [deprecated.spatial_events] has columns [id,event_name,event_number,event_time,event_location,event_area,event_path,event_tags,event_scores,traj,e_int4range,e_int8range,e_tsrange,e_tstzrange,e_daterange].
2025-06-12 12:51:20.716 [       job-0] WARN  OriginalConfPretreatmentUtil - Use specified driver class [org.postgresql.Driver]
2025-06-12 12:51:21.679 [       job-0] INFO  OriginalConfPretreatmentUtil - The table [deprecated.spatial_events_test] has columns [id,event_name,event_number,event_time,event_location,event_area,event_path,event_tags,event_scores,traj,e_int4range,e_int8range,e_tsrange,e_tstzrange,e_daterange].
2025-06-12 12:51:21.701 [       job-0] INFO  OriginalConfPretreatmentUtil - Writing data using [INSERT INTO %s (id,event_name,event_number,event_time,event_location,event_area,event_path,event_tags,event_scores,traj,e_int4range,e_int8range,e_tsrange,e_tstzrange,e_daterange) VALUES ( ?,?,?,?,?,?,?,?,?,?,?,?,?,?,? ) ON CONFLICT (id) DO UPDATE SET id=excluded.id,event_name=excluded.event_name,event_number=excluded.event_number,event_time=excluded.event_time,event_location=excluded.event_location,event_area=excluded.event_area,event_path=excluded.event_path,event_tags=excluded.event_tags,event_scores=excluded.event_scores,traj=excluded.traj,e_int4range=excluded.e_int4range,e_int8range=excluded.e_int8range,e_tsrange=excluded.e_tsrange,e_tstzrange=excluded.e_tstzrange,e_daterange=excluded.e_daterange].
2025-06-12 12:51:21.701 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] perform prepare work .
2025-06-12 12:51:21.701 [       job-0] INFO  JobContainer         - The Writer.Job [postgresqlwriter] perform prepare work .
2025-06-12 12:51:21.702 [       job-0] INFO  JobContainer         - Job set Channel-Number to 2 channel(s).
2025-06-12 12:51:21.704 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] is divided into [1] task(s).
2025-06-12 12:51:21.705 [       job-0] INFO  JobContainer         - The Writer.Job [postgresqlwriter] is divided into [1] task(s).
2025-06-12 12:51:21.721 [       job-0] INFO  JobContainer         - The Scheduler launches [1] taskGroup(s).
2025-06-12 12:51:21.725 [ taskGroup-0] INFO  TaskGroupContainer   - The taskGroupId=[0] started [1] channels for [1] tasks.
2025-06-12 12:51:21.727 [ taskGroup-0] INFO  Channel              - The Channel set byte_speed_limit to -1, No bps activated.
2025-06-12 12:51:21.727 [ taskGroup-0] INFO  Channel              - The Channel set record_speed_limit to -1, No tps activated.
2025-06-12 12:51:21.735 [  reader-0-0] INFO  CommonRdbmsReader$Task - Begin reading records by executing SQL query: [SELECT id,event_name,event_number,event_time,event_location,event_area,event_path,event_tags,event_scores,traj,e_int4range,e_int8range,e_tsrange,e_tstzrange,e_daterange FROM deprecated.spatial_events ].
2025-06-12 12:51:23.119 [  reader-0-0] INFO  CommonRdbmsReader$Task - Finished reading records by executing SQL query: [SELECT id,event_name,event_number,event_time,event_location,event_area,event_path,event_tags,event_scores,traj,e_int4range,e_int8range,e_tsrange,e_tstzrange,e_daterange FROM deprecated.spatial_events ].
2025-06-12 12:51:23.665 [  writer-0-0] WARN  CommonRdbmsWriter$Task - Rolling back the write, try to write one line at a time. because: Batch entry 0 INSERT INTO deprecated.spatial_events_test (id,event_name,event_number,event_time,event_location,event_area,event_path,event_tags,event_scores,traj,e_int4range,e_int8range,e_tsrange,e_tstzrange,e_daterange) VALUES ( ('1'::int8)::INT,('Sample Event'),('[10,50]'),('["2025-05-27 10:00:00","2025-05-27 12:00:00"]'),('0101000020E610000050FC1873D79A5EC0D0D556EC2FE34240'),('0103000020E610000001000000050000007B14AE47E19A5EC0C3F5285C8FE242407B14AE47E19A5EC0A4703D0AD7E342400AD7A3703D9A5EC0A4703D0AD7E342400AD7A3703D9A5EC0C3F5285C8FE242407B14AE47E19A5EC0C3F5285C8FE24240'),('0102000020E61000000200000050FC1873D79A5EC0D0D556EC2FE342405F29CB10C79A5EC0B37BF2B050E34240'),('{conference,tech,public}'),('{85,90,95}'),(NULL),(NULL),(NULL),(NULL),(NULL),(NULL) ) ON CONFLICT (id) DO UPDATE SET id=excluded.id,event_name=excluded.event_name,event_number=excluded.event_number,event_time=excluded.event_time,event_location=excluded.event_location,event_area=excluded.event_area,event_path=excluded.event_path,event_tags=excluded.event_tags,event_scores=excluded.event_scores,traj=excluded.traj,e_int4range=excluded.e_int4range,e_int8range=excluded.e_int8range,e_tsrange=excluded.e_tsrange,e_tstzrange=excluded.e_tstzrange,e_daterange=excluded.e_daterange was aborted: ERROR: Distributed column "id" can't be updated in current version  Call getNextException to see other errors in the batch.
2025-06-12 12:51:23.777 [  writer-0-0] ERROR StdoutPluginCollector - 
org.postgresql.util.PSQLException: ERROR: Distributed column "id" can't be updated in current version
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355)
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:490)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:408)
        at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:167)
        at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:156)
        at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:95)
        at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:95)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.doOneInsert(CommonRdbmsWriter.java:447)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.doBatchInsert(CommonRdbmsWriter.java:426)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.startWriteWithConnection(CommonRdbmsWriter.java:326)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.startWrite(CommonRdbmsWriter.java:344)
        at com.wgzhao.addax.plugin.writer.postgresqlwriter.PostgresqlWriter$Task.startWrite(PostgresqlWriter.java:255)
        at com.wgzhao.addax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:74)
        at java.base/java.lang.Thread.run(Thread.java:833)
2025-06-12 12:51:23.789 [  writer-0-0] ERROR StdoutPluginCollector - The dirty data: {"exception":"ERROR: Distributed column \"id\" can't be updated in current version","record":[{"byteSize":1,"rawData":1,"type":"LONG"},{"byteSize":12,"rawData":"Sample Event","type":"STRING"},{"byteSize":7,"rawData":"[10,50]","type":"STRING"},{"byteSize":45,"rawData":"[\"2025-05-27 10:00:00\",\"2025-05-27 12:00:00\"]","type":"STRING"},{"byteSize":50,"rawData":"0101000020E610000050FC1873D79A5EC0D0D556EC2FE34240","type":"STRING"},{"byteSize":194,"rawData":"0103000020E610000001000000050000007B14AE47E19A5EC0C3F5285C8FE242407B14AE47E19A5EC0A4703D0AD7E342400AD7A3703D9A5EC0A4703D0AD7E342400AD7A3703D9A5EC0C3F5285C8FE242407B14AE47E19A5EC0C3F5285C8FE24240","type":"STRING"},{"byteSize":90,"rawData":"0102000020E61000000200000050FC1873D79A5EC0D0D556EC2FE342405F29CB10C79A5EC0B37BF2B050E34240","type":"STRING"},{"byteSize":24,"rawData":"{conference,tech,public}","type":"STRING"},{"byteSize":10,"rawData":"{85,90,95}","type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"}],"type":"writer"}
2025-06-12 12:51:23.857 [  writer-0-0] ERROR StdoutPluginCollector - The dirty data: {"exception":"ERROR: Distributed column \"id\" can't be updated in current version","record":[{"byteSize":1,"rawData":2,"type":"LONG"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":26,"rawData":"01020000E0E610000000000000","type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"},{"byteSize":0,"type":"STRING"}],"type":"writer"}
2025-06-12 12:51:24.735 [       job-0] INFO  AbstractScheduler    - The scheduler has completed all tasks.
2025-06-12 12:51:24.736 [       job-0] INFO  JobContainer         - The Writer.Job [postgresqlwriter] perform post work.
2025-06-12 12:51:24.736 [       job-0] INFO  JobContainer         - The Reader.Job [postgresqlreader] perform post work.
2025-06-12 12:51:24.742 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 460 bytes | Speed 153B/s, 0 records/s | Error 2 records, 460 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2025-06-12 12:51:24.743 [       job-0] INFO  JobContainer         - 
Job start  at             : 2025-06-12 12:51:19
Job end    at             : 2025-06-12 12:51:24
Job took secs             :                  5s
Average   bps             :              153B/s
Average   rps             :              0rec/s
Number of rec             :                   2
Failed record             :                   2

@wgzhao
Copy link
Owner

wgzhao commented Jun 12, 2025

can you provide the table DDL ? and it's better to provide sample data

@wgzhao
Copy link
Owner

wgzhao commented Jun 12, 2025

Thank you for reviewing my PR! I appreciate your guidance on aligning the code style. Could you also let me know if there are other areas in the project where I could contribute? I’d be happy to help further.

Message ID: @.***>

oh, there have addax-admin and correspond addax-ui

They are currently in the early stages of development.

@AllstarVirgo
Copy link
Contributor Author

AllstarVirgo commented Jun 12, 2025

can you provide the table DDL ? and it's better to provide sample data

create table tmp_t
(
    id   integer not null
        constraint pk
            primary key,
    col2 int4range,
    col3 text[],
    col4 varchar(100),
    col5 json
);
INSERT INTO tmp_t (id, col2, col3, col4, col5) VALUES (2, '[10,21)', '{breakfastv1,consultingv1}', '[10,21)', '{"breakfast":"consulting"}');

{
  "job": {
    "content": 
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "2",
                "type": "long"
              },
              {
                "value": "[10,20]",
                "type": "string"
              },
              {
                "value": "{\"breakfast\", \"consulting\"}",
                "type": "string"
              },
              {
                "value": "[10,21)",
                "type": "string"
              },
              {
                "value": "{\"breakfast\": \"consulting\"}",
                "type": "string"
              }
            ],
            "sliceRecordCount": 1
          }
        },
        "writer": {
          "name": "postgresqlwriter",
          "mode": "update (id)",
          "parameter": {
            "column": ["*"],
            "connection": [
              {
                "jdbcUrl": "jdbc:postgresql://10.1.0.139:5432/silas-resource",
                "table": ["deprecated.tmp_t"]
              }
            ],
            "username": "***",
            "password": "***"
          }
        }
      },
    "setting": {
      "speed": {
        "bytes": -1,
        "channel": 1
      }
    }
  }
}

I ran the job and the result is:

  ___      _     _            
 / _ \    | |   | |           
/ /_\ \ __| | __| | __ ___  __
|  _  |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |>  < 
\_| |_/\__,_|\__,_|\__,_/_/\_\
:: Addax version ::    (v6.0.1-SNAPSHOT)
2025-06-12 14:11:39.705 [        main] INFO  Engine               - 
{
        "content":{
                "reader":{
                        "name":"streamreader",
                        "parameter":{
                                "column":[
                                        {
                                                "value":"2",
                                                "type":"long"
                                        },
                                        {
                                                "value":"[10,20]",
                                                "type":"string"
                                        },
                                        {
                                                "value":"{\"breakfast\", \"consulting\"}",
                                                "type":"string"
                                        },
                                        {
                                                "value":"[10,21)",
                                                "type":"string"
                                        },
                                        {
                                                "value":"{\"breakfast\": \"consulting\"}",
                                                "type":"string"
                                        }
                                ],
                                "sliceRecordCount":1
                        }
                },
                "writer":{
                        "name":"postgresqlwriter",
                        "mode":"update (id)",
                        "parameter":{
                                "column":[
                                        "*"
                                ],
                                "connection":{
                                        "jdbcUrl":"jdbc:postgresql://10.1.0.139:5432/silas-resource",
                                        "table":[
                                                "deprecated.tmp_t"
                                        ]
                                },
                                "username":"rds_user",
                                "password":"*****"
                        }
                }
        },
        "setting":{
                "speed":{
                        "bytes":-1,
                        "channel":1
                }
        }
}

2025-06-12 14:11:39.722 [        main] INFO  JobContainer         - The jobContainer begins to process the job.
2025-06-12 14:11:40.471 [       job-0] INFO  OriginalConfPretreatmentUtil - The table [deprecated.tmp_t] has columns [id,col2,col3,col4,col5].
2025-06-12 14:11:40.472 [       job-0] WARN  OriginalConfPretreatmentUtil - There are some risks in the column configuration. Because you did not configure the columns to read the database table, changes in the number and types of fields in your table may affect the correctness of the task or even cause errors.
2025-06-12 14:11:40.475 [       job-0] INFO  OriginalConfPretreatmentUtil - Writing data using [INSERT INTO %s ( id,col2,col3,col4,col5) VALUES ( ?,?,?,?,? )].
2025-06-12 14:11:40.475 [       job-0] INFO  JobContainer         - The Reader.Job [streamreader] perform prepare work .
2025-06-12 14:11:40.475 [       job-0] INFO  JobContainer         - The Writer.Job [postgresqlwriter] perform prepare work .
2025-06-12 14:11:40.475 [       job-0] INFO  JobContainer         - Job set Channel-Number to 1 channel(s).
2025-06-12 14:11:40.476 [       job-0] INFO  JobContainer         - The Reader.Job [streamreader] is divided into [1] task(s).
2025-06-12 14:11:40.476 [       job-0] INFO  JobContainer         - The Writer.Job [postgresqlwriter] is divided into [1] task(s).
2025-06-12 14:11:40.488 [       job-0] INFO  JobContainer         - The Scheduler launches [1] taskGroup(s).
2025-06-12 14:11:40.491 [ taskGroup-0] INFO  TaskGroupContainer   - The taskGroupId=[0] started [1] channels for [1] tasks.
2025-06-12 14:11:40.492 [ taskGroup-0] INFO  Channel              - The Channel set byte_speed_limit to -1, No bps activated.
2025-06-12 14:11:40.492 [ taskGroup-0] INFO  Channel              - The Channel set record_speed_limit to -1, No tps activated.
2025-06-12 14:11:41.777 [  writer-0-0] WARN  CommonRdbmsWriter$Task - Rolling back the write, try to write one line at a time. because: Batch entry 0 INSERT INTO deprecated.tmp_t ( id,col2,col3,col4,col5) VALUES ( ('2'::int8),('[10,20]'),('{"breakfast", "consulting"}'),('[10,21)'),('{"breakfast": "consulting"}') ) was aborted: ERROR: duplicate key value violates unique constraint "pk"
  详细:Key (id)=(2) already exists.  Call getNextException to see other errors in the batch.
2025-06-12 14:11:41.897 [  writer-0-0] ERROR StdoutPluginCollector - 
org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "pk"
  详细:Key (id)=(2) already exists.
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355)
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:490)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:408)
        at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:167)
        at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:156)
        at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:95)
        at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:95)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.doOneInsert(CommonRdbmsWriter.java:447)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.doBatchInsert(CommonRdbmsWriter.java:426)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.startWriteWithConnection(CommonRdbmsWriter.java:326)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.startWrite(CommonRdbmsWriter.java:344)
        at com.wgzhao.addax.plugin.writer.postgresqlwriter.PostgresqlWriter$Task.startWrite(PostgresqlWriter.java:255)
        at com.wgzhao.addax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:74)
        at java.base/java.lang.Thread.run(Thread.java:833)
2025-06-12 14:11:41.911 [  writer-0-0] ERROR StdoutPluginCollector - The dirty data: {"exception":"ERROR: duplicate key value violates unique constraint \"pk\"\n  详细:Key (id)=(2) eady exists.","record":[{"byteSize":1,"rawData":2,"type":"LONG"},{"byteSize":7,"rawData":"[10,20]","type":"STRING"},{"byteSize":27,"rawData":"{\"breakfast\", \"consulting\"}","type":"STRING"},{"byteSize":7,"rawData":"[10,21)","type":"STRING"},{"byteSize":27,"rawData":"{\"breakfast\": \"consulting\"}","type":"STRING"}],"type":"writer"}
2025-06-12 14:11:43.504 [       job-0] INFO  AbstractScheduler    - The scheduler has completed all tasks.
2025-06-12 14:11:43.505 [       job-0] INFO  JobContainer         - The Writer.Job [postgresqlwriter] perform post work.
2025-06-12 14:11:43.506 [       job-0] INFO  JobContainer         - The Reader.Job [streamreader] perform post work.
2025-06-12 14:11:43.513 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 1 records, 69 bytes | Speed 23B/s, 0 records/s | Error 1 records, 69 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2025-06-12 14:11:43.515 [       job-0] INFO  JobContainer         - 
Job start  at             : 2025-06-12 14:11:39
Job end    at             : 2025-06-12 14:11:43
Job took secs             :                  3s
Average   bps             :               23B/s
Average   rps             :              0rec/s
Number of rec             :                   1
Failed record             :                   1

@AllstarVirgo
Copy link
Contributor Author

Thank you for reviewing my PR! I appreciate your guidance on aligning the code style. Could you also let me know if there are other areas in the project where I could contribute? I’d be happy to help further.

Message ID: @.***>

oh, there have addax-admin and correspond addax-ui

They are currently in the early stages of development.

Thank you for pointing me to the addax-admin project! I'm excited to contribute to it, especially since it's in the early stages of development.Could you share more details about the current priorities or specific areas where I could help, such as open issues, feature development, or documentation?I'm happy to dive into tasks that align with the project's needs.

Given my experience with [Java/databases], could you suggest specific tasks or open issues where I could make an impact?

@wgzhao
Copy link
Owner

wgzhao commented Jun 12, 2025

you make a mistake:

  1. the key of write mode is writeMode instead of mode
  2. the writeMode should be placed the subordinate of parameter

you can try the following task:

{
  "job": {
    "content": 
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "2",
                "type": "long"
              },
              {
                "value": "[10,20]",
                "type": "string"
              },
              {
                "value": "{\"breakfast\", \"consulting\"}",
                "type": "string"
              },
              {
                "value": "[10,21)",
                "type": "string"
              },
              {
                "value": "{\"breakfast\": \"consulting\"}",
                "type": "string"
              }
            ],
            "sliceRecordCount": 1
          }
        },
        "writer": {
          "name": "postgresqlwriter",
          "parameter": {
            "column": ["*"],
            "connection": [
              {
                "jdbcUrl": "jdbc:postgresql://10.1.0.139:5432/silas-resource",
                "table": ["deprecated.tmp_t"]
              }
            ],
          "writeMode": "update (id)",
            "username": "rds_user",
            "password": "Changeme_123"
          }
        }
      },
    "setting": {
      "speed": {
        "bytes": -1,
        "channel": 1
      }
    }
  }
}

@AllstarVirgo
Copy link
Contributor Author

AllstarVirgo commented Jun 12, 2025

you make a mistake:

  1. the key of write mode is writeMode instead of mode
  2. the writeMode should be placed the subordinate of parameter

you can try the following task:

{
  "job": {
    "content": 
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              {
                "value": "2",
                "type": "long"
              },
              {
                "value": "[10,20]",
                "type": "string"
              },
              {
                "value": "{\"breakfast\", \"consulting\"}",
                "type": "string"
              },
              {
                "value": "[10,21)",
                "type": "string"
              },
              {
                "value": "{\"breakfast\": \"consulting\"}",
                "type": "string"
              }
            ],
            "sliceRecordCount": 1
          }
        },
        "writer": {
          "name": "postgresqlwriter",
          "parameter": {
            "column": ["*"],
            "connection": [
              {
                "jdbcUrl": "jdbc:postgresql://10.1.0.139:5432/silas-resource",
                "table": ["deprecated.tmp_t"]
              }
            ],
          "writeMode": "update (id)",
            "username": "rds_user",
            "password": "Changeme_123"
          }
        }
      },
    "setting": {
      "speed": {
        "bytes": -1,
        "channel": 1
      }
    }
  }
}

I apologize for the incorrect configuration in my setup, as I was referencing my prior experience with DataX. The config did work on PG 16. However, it didn't work on Pg 10 with distributed deployment.

Besides that, the conflict keys should not be updated because it's the same as before.

The exception:

2025-06-12 14:47:28.592 [  writer-0-0] ERROR StdoutPluginCollector - 
org.postgresql.util.PSQLException: ERROR: Distributed column "id" can't be updated in current version
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355)
        at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:490)
        at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:408)
        at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:167)
        at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:156)
        at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:95)
        at org.apache.commons.dbcp2.DelegatingPreparedStatement.execute(DelegatingPreparedStatement.java:95)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.doOneInsert(CommonRdbmsWriter.java:447)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.doBatchInsert(CommonRdbmsWriter.java:426)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.startWriteWithConnection(CommonRdbmsWriter.java:326)
        at com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter$Task.startWrite(CommonRdbmsWriter.java:344)
        at com.wgzhao.addax.plugin.writer.postgresqlwriter.PostgresqlWriter$Task.startWrite(PostgresqlWriter.java:255)
        at com.wgzhao.addax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:74)
        at java.base/java.lang.Thread.run(Thread.java:833)
2025-06-12 14:47:28.606 [  writer-0-0] ERROR StdoutPluginCollector - The dirty data: {"exception":"ERROR: Distributed column \"id\" can't be updated in current version","record":[{"byteSize":1,"rawData":2,"type":"LONG"},{"byteSize":7,"rawData":"[10,20]","type":"STRING"},{"byteSize":27,"rawData":"{\"breakfast\", \"consulting\"}","type":"STRING"},{"byteSize":7,"rawData":"[10,21)","type":"STRING"},{"byteSize":27,"rawData":"{\"breakfast\": \"consulting\"}","type":"STRING"}],"type":"writer"}
2025-06-12 14:47:30.321 [       job-0] INFO  AbstractScheduler    - The scheduler has completed all tasks.
2025-06-12 14:47:30.323 [       job-0] INFO  JobContainer         - The Writer.Job [postgresqlwriter] perform post work.
2025-06-12 14:47:30.324 [       job-0] INFO  JobContainer         - The Reader.Job [streamreader] perform post work.
2025-06-12 14:47:30.333 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 1 records, 69 bytes | Speed 23B/s, 0 records/s | Error 1 records, 69 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2025-06-12 14:47:30.334 [       job-0] INFO  JobContainer         - 
Job start  at             : 2025-06-12 14:47:26
Job end    at             : 2025-06-12 14:47:30
Job took secs             :                  3s
Average   bps             :               23B/s
Average   rps             :              0rec/s
Number of rec             :                   1
Failed record             :                   1

@wgzhao
Copy link
Owner

wgzhao commented Jun 12, 2025

can it pass on Postgresql 10 with distribution after adding the code .filter(column -> !conflictColumns.contains(column)) in WriterUtil#doPostgresqlUpdate method?

wgzhao added a commit that referenced this pull request Jun 12, 2025
… conflict columns in an update statement

in PostgreSQL distribution deployment, if the primary key field in the update clause, it occurred the following error:

 Distributed column "id" can't be updated in the current version

 it refers to #1283
@AllstarVirgo
Copy link
Contributor Author

AllstarVirgo commented Jun 12, 2025 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants