Skip to content

Conversation

@15656215623
Copy link
Contributor

Purpose of this pull request

Which issue you fix

Fixes emqx断开不能重连bug修复

Checklist:

  • I have executed the 'mvn spotless:apply' command to format my code.
  • I have a meaningful commit message (including the issue id, the template of commit message is '[label-type-#issue-id][fixed-module] a meaningful commit message.')
  • I have performed a self-review of my own code.
  • I have commented my code, particularly in hard-to-understand areas.
  • I have made corresponding changes to the documentation.
  • I have added tests that prove my fix is effective or that my feature works.
  • New and existing unit tests pass locally with my changes.
  • I have checked my code and corrected any misspellings.
  • My commit is only one. (If there are multiple commits, you can use 'git squash' to compress multiple commits into one.)

Copy link
Contributor

@mggger mggger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

您好, 提交pr相关规范可以参考下这个文档

需要完善下pr信息, commit信息

// 如果断开就重新连接
if (this.client == null || !this.client.isConnected()) {
// 如果关闭的话,就重新连接
this.openInternal(1, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

重新创建连接, 直接调用 MqttConnectUtil.getMqttClient(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种创建连接,有问题,我不知道底层为什么行不通,但是我测试的结果告诉我,不行!!还得是 this.openInternal(1, 1);

public static MqttClient getMqttClient(EmqxConf emqxConf, String clientId) {
MqttClient client = null;
for (int i = 0; i <= 2; i++) {
for (int i = 0; i <= 60; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

重试次数, 建议弄成可配置的属性

protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordException {
try {
// 如果断开就重新连接
if (this.client == null || !this.client.isConnected()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果写入每条数据都进行client.isConnected()判断 的话,十分消耗性能

可以对client.publish()进行捕获, 产生错误的时候,再重新建立连接

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

隔了两个星期正式回复一下,我之前提交了两次代码,按照您的指导。 但是我亲自测试,最后的版本运行不超过6h就会失败,报错是未连接! 然后我不断的改地方,发现,如果在异常产生错误的时候,做判断,然后建立连接,是行不通的!!! 然后在每次写入数据的时候判断,测试几个小时,发现还是不行!!!! 然后我把{重新创建连接, 直接调用 MqttConnectUtil.getMqttClient(...) } 改成第一版本的this.openInternal(1, 1); 运行1day都没问题,现在还在运行不报错。第一个版本,之前测试了一个星期都没有问题。 这个是我两个星期测试的结果反馈

@@ -0,0 +1,19 @@
Stack trace:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

引入了额外的文件

throw new RuntimeException(interruptedException);
}
if (i == 2) {
if (i == 60) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个咋还是60呢?

Set<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(BROKER);
requiredOptions.add(TOPIC);
requiredOptions.add(TIMES);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

重试次数不是必须的配置项, 没有配置用默认的重试次数即可

.defaultValue("writer")
.withDescription("dclient.id.pre");
/** 重连接的次数 * */
public static final ConfigOption<Integer> TIMES =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

变量命名可以规范一点, CONNECT_RETRY_TIMES

/** emq codec */
private String codec = "plain";
/** emqx reconnect times */
private int times = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

变量命名建议 connectRetryTimes

private boolean isCleanSession = true;
/** emq EXACTLY_ONCE */
private int qos = 2;
private int qos = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么改这个参数的值呢?

2、设置重连次数为非必要配置项
3、把60改为变量
4、删除了额外的文件
5、把每次写入数据都判断状态这种耗资源的写法改为捕获错误的时候再重新建立连接
@15656215623
Copy link
Contributor Author

1、把写入每条数据都进行client.isConnected()判断 ,十分消耗性能

改为
对client.publish()进行捕获, 产生错误的时候,再重新建立连接
2、删除引入了额外的文件
3、参数重命名
4、修改为非必要配置选项

@15656215623
Copy link
Contributor Author

1、emqx模块 代码中文注释变英文注释

@mggger
Copy link
Contributor

mggger commented Aug 25, 2022

@15656215623 commit信息还需要完善一下

  1. 提交前对代码格式化一下mvn spotless:apply
  2. 用git rebase将这次所有的commit压缩成一个commit
  3. commit信息能够对应该问题的issue, ex: [hotfix-#xxx] fix xxxxx; 将xxx替换为issue id, xxxxx这次修改的内容

MqttConnectUtil.getMqttClient(
emqxConf,
CLIENT_ID_WRITER.defaultValue() + LocalTime.now().toSecondOfDay() + jobId);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

只重新建立了连接,没有把那条数据也publish, 上面这条数据也需要publish一下

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里行不通的,不超过几个小时就会连接失败,更不要提数据的完整性了!!!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

因为业务需求,我一直在测试emqx这个插件的稳定性,根据你的建议,反复的测试,验证,做了很多实验,发现行不通!!! 实验一、最后的版本(在抛出异常的时候判断连接状态,然后建立连接用 MqttConnectUtil.getMqttClient(,把连接次数搞成可调节的参数) 运行不超过3h 实验二、(把判断连接状态放在每次写入数据那里,和实验一相比,其他的不动) 运行不超过5h 实验三、在实验二的基础上,把连接次数改成固定的60 也失败; 实验四 、 在实验三的基础上,把MqttConnectUtil.getMqttClient( 改成this.openInternal(1, 1); 也就是完全的第一个版本。 运行没有问题,超过1d 之前测试都7d了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这些是我实践的结果,但是我不理解,为什么按照你说的行不通,我也觉得很合理!!!!!如果想明白为什么,可以告诉我,我再测试看看,

@FlechazoW FlechazoW merged commit 618f9ad into DTStack:master Sep 1, 2022
@15656215623
Copy link
Contributor Author

目前我用的是第一版本,因为稳定性是我现在最需要的

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants