What's not working?
RqueueMessageManager.deleteMessage(queueName, Id) not deleting the message from the Queue, I basically call it but if I call RqueueMessageManager.getMessage() is still there afterwards
While RqueueMessageManager.deleteAllMessages(queueName) does work
What're application dependencies ?
- Rqueue Version: 2.10.2-RELEASE
- Spring Boot Version: 2.6.8
- Spring Data Redis Version: 2.6.4
Sample code:
@Slf4j
public abstract class AbstractTaskScheduler {
@Value("${auction-segment.online.rqueue.enabled:true}")
private boolean isRqueueEnabled;
@Value("${auction-segment.online.rqueue.slack-offset:100ms}")
private Duration slackOffset;
private final RqueueMessageEnqueuer rqueueMessageEnqueuer;
private final RqueueMessageManager rqueueMessageManager;
private final String queueName;
protected final TaskScheduler scheduler;
protected Map<Long, ScheduledFuture<?>> jobsMap = new HashMap<>();
public AbstractTaskScheduler(
TaskScheduler scheduler,
RqueueMessageEnqueuer rqueueMessageEnqueuer,
RqueueMessageManager rqueueMessageManager,
String queueName
) {
this.scheduler = scheduler;
this.rqueueMessageEnqueuer = rqueueMessageEnqueuer;
this.rqueueMessageManager = rqueueMessageManager;
this.queueName = queueName;
}
@SuppressWarnings("PMD.DoNotUseThreads")
protected abstract Runnable getTask(Long id);
public List<Long> getTaskIds() {
if (isRqueueEnabled) {
List<Object> taskIds = rqueueMessageManager.getAllMessages(queueName);
return !CollectionUtils.isEmpty(taskIds)
? taskIds.stream().map(t -> Long.valueOf(t.toString()))
.collect(Collectors.toList())
: Collections.emptyList();
} else {
return jobsMap != null
? new ArrayList<>(jobsMap.keySet())
: Collections.emptyList();
}
}
public RqueueMessage getTaskById(Long id) {
if (isRqueueEnabled) {
return rqueueMessageManager.getRqueueMessage(queueName, String.valueOf(id));
}
return null;
}
public boolean taskExists(Long id) {
return getTaskById(id) != null;
}
@Retryable(
maxAttempts = 3,
backoff = @Backoff(delay = 30),
value = {Exception.class}
)
protected void addTaskToScheduler(long id, Instant startDateTime) {
if (isRqueueEnabled) {
log.info("adding message to {} queue - message id {} - to be started at {}", queueName, id, startDateTime);
boolean success = rqueueMessageEnqueuer.enqueueAt(
queueName,
String.valueOf(id),
id,
startDateTime.minusMillis(slackOffset.toMillis())
);
if (!success) {
throw new IllegalStateException(String.format("Failed to enqueue message %s for listing %s", queueName, id));
}
} else {
ScheduledFuture<?> scheduledTask = scheduler.schedule(getTask(id), startDateTime);
jobsMap.put(id, scheduledTask);
}
}
public void removeTaskFromScheduler(long id) {
if (isRqueueEnabled) {
log.info("removing message from {} queue - message id {}", queueName, id);
rqueueMessageManager.deleteMessage(queueName, String.valueOf(id));
} else {
ScheduledFuture<?> scheduledTask = jobsMap.get(id);
if(scheduledTask != null) {
scheduledTask.cancel(true);
jobsMap.put(id, null);
}
}
}
@EventListener({ ContextRefreshedEvent.class })
protected abstract void contextRefreshedEvent();
}
I am not sure if this is related to this part of your code:
@Override
public boolean deleteMessage(String queueName, String messageId, Duration duration) {
String lockValue = UUID.randomUUID().toString();
try {
if (lockManager.acquireLock(messageId, lockValue, Duration.ofSeconds(1))) {
String id = RqueueMessageUtils.getMessageMetaId(queueName, messageId);
MessageMetadata messageMetadata = rqueueMessageMetadataDao.get(id);
if (messageMetadata == null) {
messageMetadata = new MessageMetadata(id, MessageStatus.DELETED);
}
messageMetadata.setDeleted(true);
messageMetadata.setDeletedOn(System.currentTimeMillis());
save(messageMetadata, duration);
return true;
}
} finally {
lockManager.releaseLock(messageId, lockValue);
}
return false;
}
you are not releasing the lock here when it enters in the IF
What's not working?
RqueueMessageManager.deleteMessage(queueName, Id) not deleting the message from the Queue, I basically call it but if I call RqueueMessageManager.getMessage() is still there afterwards
While RqueueMessageManager.deleteAllMessages(queueName) does work
What're application dependencies ?
Sample code:
I am not sure if this is related to this part of your code:
you are not releasing the lock here when it enters in the IF