Skip to content

Commit 864a908

Browse files
authored
Fix insertOrUpdate plugin may failed due to concurrent operation (#11471)
1 parent d833a28 commit 864a908

2 files changed

Lines changed: 29 additions & 10 deletions

File tree

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,7 @@ public void run(ApplicationReadyEvent readyEvent) {
6868
String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
6969

7070
PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson);
71-
int count = pluginDao.addOrUpdatePluginDefine(pluginDefine);
72-
if (count <= 0) {
73-
throw new TaskPluginException("Failed to update task plugin: " + taskPluginName);
74-
}
71+
pluginDao.addOrUpdatePluginDefine(pluginDefine);
7572
}
7673
}
7774
}

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/PluginDao.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,17 @@
2121

2222
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
2323
import org.apache.dolphinscheduler.dao.mapper.PluginDefineMapper;
24+
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
25+
26+
import java.util.Objects;
27+
28+
import lombok.NonNull;
29+
import lombok.extern.slf4j.Slf4j;
2430

2531
import org.springframework.beans.factory.annotation.Autowired;
2632
import org.springframework.stereotype.Component;
2733

34+
@Slf4j
2835
@Component
2936
public class PluginDao {
3037

@@ -44,21 +51,36 @@ public boolean checkPluginDefineTableExist() {
4451
* add or update plugin define
4552
*
4653
* @param pluginDefine new pluginDefine
54+
* @return plugin id
4755
*/
48-
public int addOrUpdatePluginDefine(PluginDefine pluginDefine) {
49-
requireNonNull(pluginDefine, "pluginDefine is null");
56+
public int addOrUpdatePluginDefine(@NonNull PluginDefine pluginDefine) {
5057
requireNonNull(pluginDefine.getPluginName(), "pluginName is null");
5158
requireNonNull(pluginDefine.getPluginType(), "pluginType is null");
5259

5360
PluginDefine currPluginDefine =
5461
pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(), pluginDefine.getPluginType());
5562
if (currPluginDefine == null) {
56-
if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() != null) {
57-
return pluginDefine.getId();
63+
try {
64+
if (pluginDefineMapper.insert(pluginDefine) == 1 && pluginDefine.getId() != null) {
65+
return pluginDefine.getId();
66+
}
67+
throw new TaskPluginException(
68+
String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s",
69+
pluginDefine.getPluginName(), pluginDefine.getPluginType()));
70+
} catch (TaskPluginException ex) {
71+
throw ex;
72+
} catch (Exception ex) {
73+
log.error("Insert plugin definition error, there may already exist a plugin", ex);
74+
currPluginDefine = pluginDefineMapper.queryByNameAndType(pluginDefine.getPluginName(),
75+
pluginDefine.getPluginType());
76+
if (currPluginDefine == null) {
77+
throw new TaskPluginException(
78+
String.format("Failed to insert plugin definition, pluginName: %s, pluginType: %s",
79+
pluginDefine.getPluginName(), pluginDefine.getPluginType()));
80+
}
5881
}
59-
throw new IllegalStateException("Failed to insert plugin definition");
6082
}
61-
if (!currPluginDefine.getPluginParams().equals(pluginDefine.getPluginParams())) {
83+
if (!Objects.equals(currPluginDefine.getPluginParams(), pluginDefine.getPluginParams())) {
6284
currPluginDefine.setUpdateTime(pluginDefine.getUpdateTime());
6385
currPluginDefine.setPluginParams(pluginDefine.getPluginParams());
6486
pluginDefineMapper.updateById(currPluginDefine);

0 commit comments

Comments
 (0)