Skip to content

Commit fbd0d51

Browse files
committed
Revisit changes against before I broke it.
1 parent 7292d95 commit fbd0d51

File tree

8 files changed

+78
-35
lines changed

8 files changed

+78
-35
lines changed

src/examples/java/io/nats/examples/jetstream/simple/OrderedMessageConsumerExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2023 The NATS Authors
1+
// Copyright 2025 The NATS Authors
22
// Licensed under the Apache License, Version 2.0 (the "License");
33
// you may not use this file except in compliance with the License.
44
// You may obtain a copy of the License at:

src/main/java/io/nats/client/impl/MessageManager.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,23 +141,31 @@ protected void initOrResetHeartbeatTimer() {
141141
try {
142142
ScheduledTask hbTask = heartbeatTask.get();
143143
if (hbTask != null) {
144+
// Same settings, just reuse the existing scheduled task
145+
if (hbTask.getPeriodNanos() == alarmPeriodSettingNanos.get()) {
146+
updateLastMessageReceived();
147+
return;
148+
}
149+
150+
// Replace timer since settings have changed
144151
hbTask.shutdown();
145152
}
146153

147-
// do this before so the alarm doesn't trigger to soon
154+
// so the alarm doesn't trigger too soon
148155
updateLastMessageReceived();
149156

150157
// replacement or new comes here
151-
heartbeatTask.set(new ScheduledTask(conn.getScheduledExecutor(), alarmPeriodSettingNanos.get(), TimeUnit.NANOSECONDS,
158+
heartbeatTask.set(new ScheduledTask(
159+
conn.getScheduledExecutor(),
160+
alarmPeriodSettingNanos.get(), TimeUnit.NANOSECONDS,
152161
() -> {
153162
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
154163
if (sinceLast > alarmPeriodSettingNanos.get()) {
155-
// updates lastMsgReceivedNanoTime so this doesn't so this
156-
// alarm won't be triggered to soon
157-
updateLastMessageReceived();
164+
updateLastMessageReceived(); // allow the system time to re-sub before alarming
158165
handleHeartbeatError();
159166
}
160167
}));
168+
161169
}
162170
finally {
163171
stateChangeLock.unlock();

src/main/java/io/nats/client/impl/NatsConsumerContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ private void checkState() throws IOException {
129129
}
130130
}
131131
if (lastCon.finished.get() && !lastCon.stopped.get()) {
132-
lastCon.lenientClose(); // finished, might as well make sure the sub is closed.
132+
lastCon.shutdownSub(); // finished, might as well make sure the sub is closed.
133133
}
134134
}
135135
}

src/main/java/io/nats/client/impl/NatsFetchConsumer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void pendingUpdated() {}
6767

6868
@Override
6969
public void heartbeatError() {
70-
finishAndClose();
70+
finishAndShutdownSub();
7171
}
7272

7373
@Override
@@ -85,7 +85,7 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
8585
if (m == null) {
8686
// if there are no messages in the internal cache AND there are no more pending,
8787
// they all have been read and we can go ahead and finish
88-
finishAndClose();
88+
finishAndShutdownSub();
8989
}
9090
return m;
9191
}
@@ -103,15 +103,15 @@ public Message nextMessage() throws InterruptedException, JetStreamStatusChecked
103103
Message m = sub._nextUnmanagedNoWait(pullSubject);
104104
if (m == null) {
105105
// no message and no time left, go ahead and finish
106-
finishAndClose();
106+
finishAndShutdownSub();
107107
}
108108
return m;
109109
}
110110

111111
Message m = sub._nextUnmanaged(timeLeftNanos, pullSubject);
112112
if (m == null && isNoWaitNoExpires) {
113113
// no message and no wait, go ahead and finish
114-
finishAndClose();
114+
finishAndShutdownSub();
115115
}
116116
return m;
117117
}

src/main/java/io/nats/client/impl/NatsMessageConsumer.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,36 +52,50 @@ class NatsMessageConsumer extends NatsMessageConsumerBase implements PullManager
5252
@Override
5353
public void heartbeatError() {
5454
try {
55-
lenientClose();
56-
doSub();
55+
if (stopped.get()) {
56+
finishAndShutdownSub();
57+
}
58+
else {
59+
shutdownSub();
60+
doSub();
61+
}
5762
}
5863
catch (JetStreamApiException | IOException e) {
59-
pmm.resetTracking();
60-
pmm.initOrResetHeartbeatTimer();
64+
setupHbAlarmToTrigger();
6165
}
6266
}
6367

6468
void doSub() throws JetStreamApiException, IOException {
6569
MessageHandler mh = userMessageHandler == null ? null : msg -> {
6670
userMessageHandler.onMessage(msg);
6771
if (stopped.get() && pmm.noMorePending()) {
68-
finishAndClose();
72+
finished.set(true);
6973
}
7074
};
71-
super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null));
72-
repull();
73-
stopped.set(false);
74-
finished.set(false);
75+
try {
76+
stopped.set(false);
77+
finished.set(false);
78+
super.initSub(subscriptionMaker.subscribe(mh, userDispatcher, pmm, null));
79+
repull();
80+
}
81+
catch (JetStreamApiException | IOException e) {
82+
setupHbAlarmToTrigger();
83+
}
84+
}
85+
86+
private void setupHbAlarmToTrigger() {
87+
pmm.resetTracking();
88+
pmm.initOrResetHeartbeatTimer();
7589
}
7690

7791
@Override
7892
public void pendingUpdated() {
7993
if (stopped.get()) {
8094
if (pmm.noMorePending()) {
81-
finishAndClose();
95+
finishAndShutdownSub();
8296
}
8397
}
84-
else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes))
98+
else if ((pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.pendingBytes <= thresholdBytes)))
8599
{
86100
repull();
87101
}

src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,21 +101,19 @@ public void stop() {
101101

102102
@Override
103103
public void close() throws Exception {
104-
lenientClose();
104+
stopped.set(true);
105+
shutdownSub();
105106
}
106107

107-
protected void finishAndClose() {
108-
if (pmm != null) {
109-
pmm.shutdownHeartbeatTimer();
110-
}
108+
protected void finishAndShutdownSub() {
109+
stopped.set(true);
111110
finished.set(true);
112-
lenientClose();
111+
shutdownSub();
113112
}
114113

115-
protected void lenientClose() {
114+
protected void shutdownSub() {
116115
try {
117-
if (!stopped.get() || sub.isActive()) {
118-
stopped.set(true);
116+
if (sub.isActive()) {
119117
if (sub.getNatsDispatcher() != null) {
120118
sub.getDispatcher().unsubscribe(sub);
121119
}
@@ -127,5 +125,13 @@ protected void lenientClose() {
127125
catch (Throwable ignore) {
128126
// nothing to do
129127
}
128+
if (pmm != null) {
129+
try {
130+
pmm.shutdownHeartbeatTimer();
131+
}
132+
catch (Throwable ignore) {
133+
// nothing to do
134+
}
135+
}
130136
}
131137
}

src/main/java/io/nats/client/support/ScheduledTask.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class ScheduledTask implements Runnable {
3636

3737
protected final AtomicBoolean notShutdown;
3838
protected final AtomicBoolean executing;
39+
protected final long initialDelayNanos;
40+
protected final long periodNanos;
3941

4042
public ScheduledTask(ScheduledExecutorService ses, long initialAndPeriodMillis, Runnable runnable) {
4143
this(null, ses, initialAndPeriodMillis, initialAndPeriodMillis, TimeUnit.MILLISECONDS, runnable);
@@ -53,17 +55,27 @@ public ScheduledTask(String id, ScheduledExecutorService ses, long initialAndPer
5355
this(id, ses, initialAndPeriod, initialAndPeriod, unit, runnable);
5456
}
5557

56-
public ScheduledTask(ScheduledExecutorService ses, long initialDelay, long initialAndPeriod, TimeUnit unit, Runnable runnable) {
57-
this(null, ses, initialDelay, initialAndPeriod, unit, runnable);
58+
public ScheduledTask(ScheduledExecutorService ses, long initialDelay, long period, TimeUnit unit, Runnable runnable) {
59+
this(null, ses, initialDelay, period, unit, runnable);
5860
}
5961

60-
public ScheduledTask(String id, ScheduledExecutorService ses, long initialDelay, long initialAndPeriod, TimeUnit unit, Runnable runnable) {
62+
public ScheduledTask(String id, ScheduledExecutorService ses, long initialDelay, long period, TimeUnit unit, Runnable runnable) {
6163
this.id = id == null || id.isEmpty() ? "st-" + ID_GENERATOR.getAndIncrement() : id;
6264
this.runnable = runnable;
6365
notShutdown = new AtomicBoolean(true);
6466
executing = new AtomicBoolean(false);
67+
this.initialDelayNanos = unit.toNanos(initialDelay);
68+
this.periodNanos = unit.toNanos(period);
6569
scheduledFutureRef = new AtomicReference<>(
66-
ses.scheduleAtFixedRate(this, initialDelay, initialAndPeriod, unit));
70+
ses.scheduleAtFixedRate(this, initialDelay, period, unit));
71+
}
72+
73+
public long getInitialDelayNanos() {
74+
return initialDelayNanos;
75+
}
76+
77+
public long getPeriodNanos() {
78+
return periodNanos;
6779
}
6880

6981
@Override

src/test/java/io/nats/client/support/ScheduledTaskTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@ public void testScheduledTask() throws InterruptedException {
2020
AtomicInteger counter400 = new AtomicInteger();
2121
SttRunnable sttr400 = new SttRunnable(100, counter400);
2222
ScheduledTask task400 = new ScheduledTask(stpe, 0, 400, TimeUnit.MILLISECONDS, sttr400);
23+
assertEquals(TimeUnit.MILLISECONDS.toNanos(400), task400.getPeriodNanos());
2324

2425
AtomicInteger counter200 = new AtomicInteger();
2526
SttRunnable sttr200 = new SttRunnable(300, counter200);
2627
ScheduledTask task200 = new ScheduledTask(stpe, 0, 200, TimeUnit.MILLISECONDS, sttr200);
28+
assertEquals(TimeUnit.MILLISECONDS.toNanos(200), task200.getPeriodNanos());
2729

2830
AtomicInteger counter100 = new AtomicInteger();
2931
SttRunnable sttr100 = new SttRunnable(400, counter100);
3032
ScheduledTask task100 = new ScheduledTask(stpe, 0, 100, TimeUnit.MILLISECONDS, sttr100);
33+
assertEquals(TimeUnit.MILLISECONDS.toNanos(100), task100.getPeriodNanos());
3134

3235
validateState(task400, false, false, null);
3336
validateState(task200, false, false, null);

0 commit comments

Comments
 (0)