2525import java .io .Closeable ;
2626import java .io .StringReader ;
2727import java .time .Duration ;
28- import java .util .ArrayList ;
2928import java .util .HashMap ;
3029import java .util .List ;
3130import java .util .Map ;
@@ -66,11 +65,12 @@ public class Connection implements Closeable {
6665 return thread ;
6766 });
6867 private static final AtomicLong NEXT_ID = new AtomicLong (1L );
68+ private final AtomicLong EVENT_CALLBACK_ID = new AtomicLong (1 );
6969 private WebSocket socket ;
7070 private final Map <Long , Consumer <Either <Throwable , JsonInput >>> methodCallbacks =
7171 new ConcurrentHashMap <>();
7272 private final ReadWriteLock callbacksLock = new ReentrantReadWriteLock (true );
73- private final Map <Event <?>, List < Consumer <?>>> eventCallbacks = new HashMap <>();
73+ private final Map <Event <?>, Map < Long , Consumer <?>>> eventCallbacks = new HashMap <>();
7474 private final HttpClient client ;
7575 private final AtomicBoolean underlyingSocketClosed = new AtomicBoolean ();
7676
@@ -180,17 +180,26 @@ public <X> X sendAndWait(Command<X> command, Duration timeout) {
180180 }
181181 }
182182
183- public <X > void addListener (Event <X > event , Consumer <X > handler ) {
183+ public <X > long addListener (Event <X > event , Consumer <X > handler ) {
184184 Require .nonNull ("Event to listen for" , event );
185185 Require .nonNull ("Handler to call" , handler );
186186
187+ long id = EVENT_CALLBACK_ID .getAndIncrement ();
188+
187189 Lock lock = callbacksLock .writeLock ();
188190 lock .lock ();
189191 try {
190- eventCallbacks .computeIfAbsent (event , (key ) -> new ArrayList <>()).add (handler );
192+ eventCallbacks .computeIfAbsent (
193+ event ,
194+ key -> {
195+ HashMap <Long , Consumer <?>> map = new HashMap <>();
196+ map .put (id , handler );
197+ return map ;
198+ });
191199 } finally {
192200 lock .unlock ();
193201 }
202+ return id ;
194203 }
195204
196205 public <X > void clearListener (Event <X > event ) {
@@ -203,6 +212,23 @@ public <X> void clearListener(Event<X> event) {
203212 }
204213 }
205214
215+ public void removeListener (long id ) {
216+ Lock lock = callbacksLock .writeLock ();
217+ lock .lock ();
218+ try {
219+ eventCallbacks .forEach ((k , v ) -> v .remove (id ));
220+ eventCallbacks .forEach (
221+ (k , v ) -> {
222+ v .remove (id );
223+ if (v .isEmpty ()) {
224+ eventCallbacks .remove (k );
225+ }
226+ });
227+ } finally {
228+ lock .unlock ();
229+ }
230+ }
231+
206232 public <X > boolean isEventSubscribed (Event <X > event ) {
207233 Lock lock = callbacksLock .writeLock ();
208234 lock .lock ();
@@ -354,7 +380,7 @@ private void handleEventResponse(Map<String, Object> rawDataMap) {
354380
355381 final Object finalValue = value ;
356382
357- for (Consumer <?> action : event .getValue ()) {
383+ for (Consumer <?> action : event .getValue (). values () ) {
358384 @ SuppressWarnings ("unchecked" )
359385 Consumer <Object > obj = (Consumer <Object >) action ;
360386 LOG .log (
0 commit comments