Skip to content

RqueueMessageManager.deleteMessage(queueName, Id) not deleting the message from the Queue #162

@pcastroadc

Description

@pcastroadc

What's not working?

RqueueMessageManager.deleteMessage(queueName, Id) not deleting the message from the Queue, I basically call it but if I call RqueueMessageManager.getMessage() is still there afterwards

While RqueueMessageManager.deleteAllMessages(queueName) does work

What're application dependencies ?

  • Rqueue Version: 2.10.2-RELEASE
  • Spring Boot Version: 2.6.8
  • Spring Data Redis Version: 2.6.4

Sample code:

@Slf4j
public abstract class AbstractTaskScheduler {
    @Value("${auction-segment.online.rqueue.enabled:true}")
    private boolean isRqueueEnabled;
    @Value("${auction-segment.online.rqueue.slack-offset:100ms}")
    private Duration slackOffset;

    private final RqueueMessageEnqueuer rqueueMessageEnqueuer;
    private final RqueueMessageManager rqueueMessageManager;
    private final String queueName;

    protected final TaskScheduler scheduler;
    protected Map<Long, ScheduledFuture<?>> jobsMap = new HashMap<>();

    public AbstractTaskScheduler(
        TaskScheduler scheduler,
        RqueueMessageEnqueuer rqueueMessageEnqueuer,
        RqueueMessageManager rqueueMessageManager,
        String queueName
    ) {
        this.scheduler = scheduler;
        this.rqueueMessageEnqueuer = rqueueMessageEnqueuer;
        this.rqueueMessageManager = rqueueMessageManager;
        this.queueName = queueName;
    }

    @SuppressWarnings("PMD.DoNotUseThreads")
    protected abstract Runnable getTask(Long id);

    public List<Long> getTaskIds() {
        if (isRqueueEnabled) {
            List<Object> taskIds = rqueueMessageManager.getAllMessages(queueName);
            return !CollectionUtils.isEmpty(taskIds)
                ? taskIds.stream().map(t -> Long.valueOf(t.toString()))
                .collect(Collectors.toList())
                : Collections.emptyList();
        } else {
            return jobsMap != null
                ? new ArrayList<>(jobsMap.keySet())
                : Collections.emptyList();
        }
    }

    public RqueueMessage getTaskById(Long id) {
        if (isRqueueEnabled) {
            return rqueueMessageManager.getRqueueMessage(queueName, String.valueOf(id));
        }
        return null;
    }

    public boolean taskExists(Long id) {
        return getTaskById(id) != null;
    }

    @Retryable(
        maxAttempts = 3,
        backoff = @Backoff(delay = 30),
        value = {Exception.class}
    )
    protected void addTaskToScheduler(long id, Instant startDateTime) {
        if (isRqueueEnabled) {
            log.info("adding message to {} queue - message id {} - to be started at {}", queueName, id, startDateTime);
            boolean success = rqueueMessageEnqueuer.enqueueAt(
                queueName,
                String.valueOf(id),
                id,
                startDateTime.minusMillis(slackOffset.toMillis())
            );

            if (!success) {
               throw new IllegalStateException(String.format("Failed to enqueue message %s for listing %s", queueName, id));
            }
        } else {
            ScheduledFuture<?> scheduledTask = scheduler.schedule(getTask(id), startDateTime);
            jobsMap.put(id, scheduledTask);
        }
    }

    public void removeTaskFromScheduler(long id) {
        if (isRqueueEnabled) {
            log.info("removing message from {} queue - message id {}", queueName, id);
            rqueueMessageManager.deleteMessage(queueName, String.valueOf(id));
        } else {
            ScheduledFuture<?> scheduledTask = jobsMap.get(id);
            if(scheduledTask != null) {
                scheduledTask.cancel(true);
                jobsMap.put(id, null);
            }
        }
    }

    @EventListener({ ContextRefreshedEvent.class })
    protected abstract void contextRefreshedEvent();
}

I am not sure if this is related to this part of your code:

  @Override
  public boolean deleteMessage(String queueName, String messageId, Duration duration) {
    String lockValue = UUID.randomUUID().toString();
    try {
      if (lockManager.acquireLock(messageId, lockValue, Duration.ofSeconds(1))) {
        String id = RqueueMessageUtils.getMessageMetaId(queueName, messageId);
        MessageMetadata messageMetadata = rqueueMessageMetadataDao.get(id);
        if (messageMetadata == null) {
          messageMetadata = new MessageMetadata(id, MessageStatus.DELETED);
        }
        messageMetadata.setDeleted(true);
        messageMetadata.setDeletedOn(System.currentTimeMillis());
        save(messageMetadata, duration);
        return true;
      }
    } finally {
      lockManager.releaseLock(messageId, lockValue);
    }
    return false;
  }

you are not releasing the lock here when it enters in the IF

Metadata

Metadata

Assignees

Labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions