-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
最近在实用flinkx(flink1.4.0,slot内存大小1G,每个taskmanager配置两个slot,三台服务器组成的集群),发现在抽取mysql数据(单表1000万条,单条记录500字节左右)出现任务卡死的现象,JdbcInputFormat.java openInternal方法中 resultSet = statement.executeQuery();执行后无法返回,我按照JdbcInputFormat.java 中jdbc读取数据的方式单独写个java测试用例,jvm参数 -Xms1024m -Xmx1024m,出现oom异常,将jvm参数调大至2G即可正常运行,我怀疑是FetchSize设置没有生效,将openInternal方法中statement.setFetchSize(databaseInterface.getFetchSize());改为statement.setFetchSize(Integer.MIN_VALUE);,在1G内存下即可正常运行。
同时需要将获取descColumnTypeList的代码提前到statement.executeQuery();之前执行,否则会抛出如下异常
java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@6438a396 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
附修改后的JdbcInputFormat.java 文件
/*
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
*/
package com.dtstack.flinkx.rdb.inputformat;
import com.dtstack.flinkx.rdb.DatabaseInterface;
import com.dtstack.flinkx.rdb.type.TypeConverterInterface;
import com.dtstack.flinkx.rdb.util.DBUtil;
import com.dtstack.flinkx.util.ClassUtil;
import com.dtstack.flinkx.util.DateUtil;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.Counter;
import org.apache.flink.types.Row;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.;
import java.sql.Date;
import java.util.;
import com.dtstack.flinkx.inputformat.RichInputFormat;
/**
-
InputFormat for reading data from a database and generate Rows.
-
Company: www.dtstack.com
-
@author huyifan.zju@163.com
*/
public class JdbcInputFormat extends RichInputFormat {protected static final long serialVersionUID = 1L;
protected DatabaseInterface databaseInterface;
protected String username;
protected String password;
protected String drivername;
protected String dbURL;
protected String queryTemplate;
protected int resultSetType;
protected int resultSetConcurrency;
protected List descColumnTypeList;
protected transient Connection dbConn;
protected transient PreparedStatement statement;
protected transient ResultSet resultSet;
protected boolean hasNext;
protected Object[][] parameterValues;
protected int columnCount;
protected String table;
protected TypeConverterInterface typeConverter;
protected List column;
public JdbcInputFormat() {
resultSetType = ResultSet.TYPE_FORWARD_ONLY;
resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
}@OverRide
public void configure(Configuration configuration) {}
@OverRide
public void openInternal(InputSplit inputSplit) throws IOException {
try {
ClassUtil.forName(drivername, getClass().getClassLoader());
dbConn = DBUtil.getConnection(dbURL, username, password);if(drivername.equalsIgnoreCase("org.postgresql.Driver")){ dbConn.setAutoCommit(false); } statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); //提前执行 |by lgm if(descColumnTypeList == null) { descColumnTypeList = DBUtil.analyzeTable(dbConn,databaseInterface,table,column); } if (inputSplit != null && parameterValues != null) { for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) { Object param = parameterValues[inputSplit.getSplitNumber()][i]; DBUtil.setParameterValue(param,statement,i); } if (LOG.isDebugEnabled()) { LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()]))); } } //statement.setFetchSize(databaseInterface.getFetchSize()); //重新设定FetchSize |by lgm if(drivername.equalsIgnoreCase("mysqlreader")) { statement.setFetchSize(Integer.MIN_VALUE); } else { statement.setFetchSize(databaseInterface.getFetchSize()); } statement.setQueryTimeout(databaseInterface.getQueryTimeout()); resultSet = statement.executeQuery(); hasNext = resultSet.next(); columnCount = resultSet.getMetaData().getColumnCount(); } catch (SQLException se) { throw new IllegalArgumentException("open() failed." + se.getMessage(), se); } LOG.info("JdbcInputFormat[" + jobName + "]open: end");}
@OverRide
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
return cachedStatistics;
}@OverRide
public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
if (parameterValues == null) {
return new GenericInputSplit[]{new GenericInputSplit(0, 1)};
}
GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length];
for (int i = 0; i < ret.length; i++) {
ret[i] = new GenericInputSplit(i, ret.length);
}
return ret;
}@OverRide
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
return new DefaultInputSplitAssigner(inputSplits);
}@OverRide
public boolean reachedEnd() throws IOException {
return !hasNext;
}@OverRide
public Row nextRecordInternal(Row row) throws IOException {
row = new Row(columnCount);
try {
if (!hasNext) {
return null;
}DBUtil.getRow(dbURL,row,descColumnTypeList,resultSet,typeConverter); //update hasNext after we've read the record hasNext = resultSet.next(); return row; } catch (SQLException se) { throw new IOException("Couldn't read data - " + se.getMessage(), se); } catch (NullPointerException npe) { throw new IOException("Couldn't access resultSet", npe); }}
@OverRide
public void closeInternal() throws IOException {
DBUtil.closeDBResources(resultSet,statement,dbConn);
parameterValues = null;
}
}