Skip to content

refactor: queue pr part 1#808

Merged
devhaozi merged 51 commits intomasterfrom
haozi/queue1
Feb 7, 2025
Merged

refactor: queue pr part 1#808
devhaozi merged 51 commits intomasterfrom
haozi/queue1

Conversation

@devhaozi
Copy link
Member

@devhaozi devhaozi commented Jan 5, 2025

📑 Description

Closes goravel/goravel#153

This is the first part of the queue refactoring.

Summary by CodeRabbit

  • New Features

    • Introduced enhanced queue drivers with both synchronous and asynchronous support for immediate, delayed, and bulk job processing.
    • Enabled improved job chaining and more flexible job management with robust failure detection.
    • Added new configuration options for debugging, driver selection, and failed job queries.
  • Refactor

    • Streamlined configuration and task dispatch systems, consolidating job management into a simplified interface.
    • Updated worker processes for graceful shutdown and more consistent error reporting.
    • Enhanced mock implementations for testing job and queue functionalities.
  • Chores

    • Cleaned up dependencies and removed deprecated components for increased stability and performance.
    • Removed unused test files and irrelevant Redis dependencies from the test suite.

✅ Checks

  • Added test cases for my code

@devhaozi devhaozi requested a review from a team as a code owner January 5, 2025 07:54
@devhaozi devhaozi self-assigned this Jan 5, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 5, 2025

Walkthrough

This pull request refactors the queue facades by introducing new interfaces and updating method signatures. Major changes include defining new interfaces (e.g., Config, Driver, JobRepository), reworking the Queue, Task, and Worker implementations, and replacing strongly typed queue argument structures with variadic []any. The updates remove obsolete dependencies (such as Machinery-related code), simplify error handling through new error variables, and adjust the testing suites to remove unnecessary Redis and Docker dependencies. Both asynchronous and synchronous dispatch mechanisms have been revamped for improved clarity and control flow.

Changes

File(s) Change Summary
contracts/queue/config.go, contracts/queue/driver.go, contracts/queue/job.go, contracts/queue/queue.go Added new interfaces and constants, updated method signatures (e.g., variadic []any instead of specific types), introduced new methods like Chain and GetJob, removed deprecated types (e.g., Arg).
errors/list.go Introduced several new error variables for queue operations such as missing jobs, invalid drivers, and failure in saving failed jobs.
event/task.go, event/task_test.go Changed the return type from []queuecontract.Arg to []any in argument conversion functions and updated tests to reflect the generic argument type.
mail/application.go, mail/application_test.go Modified job dispatch logic to use []any instead of queuecontract.Arg, removed Redis/Docker related test fields and dependencies, and dropped logging mock references.
mocks/queue/*.go (Config.go, Driver.go, JobRepository.go, Queue.go, Worker.go) Added generated mocks for new interfaces; updated mock method signatures to reflect variadic parameters and introduced new methods like Shutdown.
queue/* (config.go, config_test.go, driver.go, driver_async.go, driver_async_test.go, driver_sync.go, driver_sync_test.go, job.go, job_test.go, machinery.go [removed], service_provider.go, task.go, task_test.go, utils.go [removed], worker.go, worker_test.go) Complete refactoring of the queue handling: introduced NewDriver, new async/sync implementations, consolidated job management (e.g., All, Chain, GetJob methods), restructured task dispatch and worker processing logic, and removed obsolete machinery and utility files.
go.mod Removed numerous third-party dependencies (including Machinery, cloud packages, and others) and added new ones (github.com/rogpeppe/go-internal, gopkg.in/check.v1), reflecting a shift towards an in-house solution.

Sequence Diagram(s)

sequenceDiagram
    participant T as Task
    participant D as NewDriver
    participant R as Driver (Async/Sync)
    participant J as Job Repository
    participant F as FailedJob Channel

    T->>D: Invoke NewDriver(connection, config)
    D-->>T: Return appropriate Driver (Async/Sync/Custom)
    T->>R: Dispatch job (via Bulk/Later/Push)
    alt Synchronous Driver
        R->>J: Directly call Handle(job, args)
    else Asynchronous Driver
        R->>T: Enqueue job (with optional delay)
    end
    alt Error in job handling
        T->>F: Push FailedJob to channel
    end
Loading
sequenceDiagram
    participant W as Worker
    participant D as NewDriver
    participant J as Job Repository
    participant F as FailedJob Channel

    W->>D: Retrieve driver instance
    loop While running and not shutdown
       W->>D: Pop job from queue
       alt Job found
         W->>J: Call(job signature, args)
         alt Job call fails
           W->>F: Send FailedJob to channel
         end
       else No job found
         W-->>W: Sleep for retry interval
       end
    end
    W->>W: Gracefully shutdown (via Shutdown method)
Loading

Assessment against linked issues

Objective Addressed Explanation
Refactor the queue facades to support in-house async/sync drivers and remove third-party dependencies (#153, #131, #96, #37)

Possibly related PRs

Suggested labels

❗ Breaking Change


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2af088c and 50d90c8.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (1)
  • go.mod (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • go.mod
⏰ Context from checks skipped due to timeout of 300000ms (1)
  • GitHub Check: test / windows (1.22)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@devhaozi devhaozi closed this Jan 5, 2025
@devhaozi devhaozi reopened this Jan 5, 2025
Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.50.

Benchmark suite Current: ca55bef Previous: dc45fb9 Ratio
Benchmark_DecryptString 4958 ns/op 1592 B/op 17 allocs/op 2180 ns/op 1592 B/op 17 allocs/op 2.27
Benchmark_DecryptString - ns/op 4958 ns/op 2180 ns/op 2.27

This comment was automatically generated by workflow using github-action-benchmark.

@devhaozi devhaozi changed the title refactor: queue pr 1 refactor: queue pr part 1 Jan 5, 2025
@codecov
Copy link

codecov bot commented Jan 5, 2025

Codecov Report

Attention: Patch coverage is 68.52590% with 79 lines in your changes missing coverage. Please review.

Project coverage is 67.54%. Comparing base (de582d3) to head (50d90c8).

Files with missing lines Patch % Lines
queue/driver_sync.go 34.78% 14 Missing and 1 partial ⚠️
queue/job.go 54.16% 10 Missing and 1 partial ⚠️
queue/config.go 50.00% 9 Missing and 1 partial ⚠️
mail/application.go 0.00% 9 Missing ⚠️
queue/driver.go 47.05% 9 Missing ⚠️
queue/service_provider.go 0.00% 8 Missing ⚠️
queue/task.go 81.25% 4 Missing and 2 partials ⚠️
queue/worker.go 89.58% 3 Missing and 2 partials ⚠️
queue/application.go 82.60% 4 Missing ⚠️
queue/driver_async.go 95.45% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #808      +/-   ##
==========================================
+ Coverage   67.22%   67.54%   +0.32%     
==========================================
  Files         150      151       +1     
  Lines       10449    10343     -106     
==========================================
- Hits         7024     6986      -38     
+ Misses       3052     2990      -62     
+ Partials      373      367       -6     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@hwbrzzl hwbrzzl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great PR 👍

queue/driver.go Outdated
const DriverSync string = "sync"
const DriverASync string = "async"
const DriverMachinery string = "machinery" // TODO: Will be removed in v1.17
const DriverCustom string = "custom"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add the DB driver in the next PR? And can we move these constants to contracts? The same with DB: https://github.com/goravel/framework/blob/master/contracts/database/config.go#L3-L10

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will a new redis driver be implemented?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (40)
queue/application.go (2)

23-25: Use of NewChainTask clarifies job chaining.
This approach uses the new chain-based logic. Make sure to confirm that chaining does not break any existing concurrency or error-handling patterns.


36-36: Bulk job registration.
Invoking app.job.Register(jobs) centralizes job registration. Consider logging or error handling if registration fails.

queue/task.go (1)

66-77: DispatchSync elegantly handles sequential job execution.
If there's a large number of chained jobs, consider the possibility of partial failures interrupting subsequent tasks. If desired, you could add a rollback mechanism or partial success logging.

queue/worker.go (4)

14-21: New fields in Worker struct.

  • driver is introduced but overshadowed by local usage; consider eliminating or setting it once to avoid redundancy.
  • failedJobChan is a neat concurrency channel for handling job failures asynchronously.
- driver queue.Driver
+ // Potentially remove or populate 'driver' if you want to store it globally
🧰 Tools
🪛 golangci-lint (1.62.2)

17-17: field driver is unused

(unused)


35-59: Run method main loop.

  1. Marking isShutdown = false is straightforward.
  2. Checking driver.Driver() for DriverSync is consistent with the existing error approach.
  3. Spawning multiple goroutines (up to r.concurrent) for queue consumption is typical but consider adding more robust error logging or metrics.

61-70: Failure channel handling.
Using r.failedJobChan to push failed jobs is a solid design. Consider adding a retry mechanism if certain failures are transient.


86-87: Shutdown method toggles isShutdown.
Graceful shutdown is valuable. Potentially wait for in-progress jobs to complete if that’s desired.

contracts/queue/job.go (1)

12-17: New JobRepository interface.
Centralizing job management methods is a significant architectural improvement. Great step toward maintainability.

contracts/queue/config.go (2)

11-11: Plan for the deprecation of Redis method.

The comment suggests this method will be removed in v1.17. Make sure to track usage of this method throughout the codebase and provide a clear migration path before removal.


12-13: Consider error handling for Size and Via.

Returning an int or any without an error leaves callers no direct way to address misconfiguration. You might want to offer a more resilient interface that also reports errors if needed.

contracts/queue/queue.go (3)

4-5: Naming clarity for All.

While All is concise, consider something more descriptive like AllJobs to avoid confusion with future expansions in the interface.


10-11: Method name Job could be more expressive.

Job(job Job, args []any) Task” might be misread. Consider renaming it to something like Enqueue or PushJob to clarify its intent.


14-15: Double-check usage of Worker(payloads ...Args) Worker.

If Args were to expand in functionality, a more direct approach (e.g., a single configuration object) might be clearer. Evaluate if there’s any confusion from multiple potential Args.

queue/driver.go (1)

15-15: Remove deprecated driver code on schedule

There's a TODO comment indicating that DriverMachinery will be removed in v1.17. Make sure to track this in the project backlog, so the deprecated code is removed at the correct version.

queue/driver_sync.go (1)

31-42: Interrupting bulk jobs on first error

If any job fails in the Bulk method, the entire batch halts. If that's intended, it’s fine; otherwise, consider accumulating errors so that other jobs can continue.

queue/job_test.go (4)

11-14: Consider zero-value initialization.
Instead of storing a pointer to JobImpl, you could store the struct value directly. This reduces pointer-chasing overhead and can simplify some test logic if you do not need to mutate the pointer itself.

- jobManager *JobImpl
+ jobManager JobImpl

24-32: Expand coverage for partial registration scenarios.
Currently, the test only registers two jobs and inspects len(registeredJobs). Consider adding a scenario where the array includes nil or duplicate jobs to confirm how the system behaves in edge cases.


43-46: Add negative test for invalid argument types.
This test verifies an unregistered job fails, but you may also want to test a registered job that receives arguments of unexpected types.


48-55: Minimal duplication.
The logic here is quite similar to CallRegisteredJobSuccessfully, so the tests are consistent. If these tests grow in complexity, consider extracting shared logic (e.g., registering a job) into helper methods.

queue/driver_machinery_test.go (2)

1-1: Reminder to remove the TODO.
The file has a TODO comment stating "Will be removed in v1.17". Please ensure that this is tracked and removed before or in v1.17 to avoid stale comments in the codebase.


26-27: Use inlined initialization for test setup if it’s short.
Consider inlining the creation of mocksconfig.Config and mockslog.Log if you want to reduce the number of lines in setups. That’s just a style preference though.

queue/driver_async.go (4)

11-11: Potential for concurrency conflicts.
asyncQueues is a global sync.Map. In high-throughput scenarios, the concurrency overhead could be considerable. Consider a more localized or instance-based approach if you foresee scaling issues.


13-16: Avoid storing non-essential fields.
If connection or size are derivable from a config object, consider removing them from the struct to reduce duplication of state.


38-52: Consider reusing worker pools or goroutines.
Spawning a goroutine inside a loop for each delayed job can be expensive under heavy load. A worker pool approach or shared scheduler might reduce overhead, but that depends on concurrency needs.


54-61: Delayed scheduling approach.
Scheduling using time.Sleep inside a goroutine is straightforward but can cause a flood of idle goroutines if many delayed jobs are queued. Consider a more centralized scheduling approach if large volumes of delayed tasks are expected.

queue/job.go (3)

31-40: Check potential performance overhead of repeatedly appending in All().

Each time r.jobs.Range(...) iterates, appending to jobs can force array resizing if it grows too large. For typical usage, this is probably fine, but keep an eye on performance if the job list grows significantly. You may also consider a pre-allocation strategy if the collection size is known.


42-50: Handling of job invocation errors.

The current logic returns err if the job signature is not found, but if job.Handle(args...) fails, the panic or other error-handling strategy must be clarified. Consider adding structured logging or additional checks to differentiate between “job not found” and “job execution failed” errors.


52-59: Graceful handling of missing job signatures.

When the signature is missing from the sync.Map, errors.QueueJobNotFound.Args(...) is thrown. This approach is valid for a minimal viable product. For clarity, you might add a user-friendly error message that references the missing signature to help in debugging or logs the error details more comprehensively.

queue/driver_machinery.go (2)

1-2: Pending removal notice.

A TODO indicates this file will be removed in v1.17. Consider clarifying in the PR description if the deprecation path is clearly communicated and scheduled.


37-40: Provide an explicit implementation or a graceful fallback for Driver().

Currently, the method panics, which is acceptable for a placeholder. However, if Driver() is invoked in production code, consider returning a default driver or a structured error to avoid abrupt process termination.

queue/config.go (4)

21-23: Debug flag is essential for logging improvements.

Retrieving app.debug is standard; consider logging or returning more structured debug info if needed for diagnosing advanced queue errors.


Line range hint 58-75: Upcoming removal notice for Redis(...).

The docstring states “Will be removed in v1.17.” Confirm if the new recommended approach is documented (e.g., usage of alternative queue config). Communicating the deprecation plan helps ensure a smooth transition.


77-83: Provide context on the rationale behind default size of 100 in Size().

If queue.connections.%s.size is omitted, we fallback to 100. This is fine, but clarify whether 100 is an arbitrary guess or an informed default and consider exposing it as a top-level config param for easy tuning.


85-91: Via() usage can be more self-descriptive.

The method name “Via” is a bit ambiguous. If it stands for “which channel or driver is used,” consider a more descriptive naming (e.g., Transport(), Method()). Otherwise, add explanatory documentation around how “via” is used.

mocks/queue/Worker.go (1)

65-81: Ensure consistent return value provisioning for “Shutdown”.
If your tests never specify a return value, the code will panic. Always specify a return value in the test setup or consider defaulting to nil for better resiliency.

func (_m *Worker) Shutdown() error {
    ret := _m.Called()

-   if len(ret) == 0 {
-       panic("no return value specified for Shutdown")
-   }
+   if len(ret) == 0 {
+       return nil
+   }

    var r0 error
    ...
}
queue/driver_sync_test.go (2)

44-53: Sync dispatch test is well-structured.
Verifies that the job increments testSyncJob. Ensure edge cases (e.g., invalid job args) are also handled.


55-76: Chain dispatch logic.
The test chain verifies multiple jobs in sequence. The usage of a short sleep to allow processing is acceptable but can introduce timing flakiness if run on slower systems. Consider a more robust approach (e.g., a wait group or callback) to confirm job completion.

// Example approach using channels or wait groups (pseudo-code):
- time.Sleep(2 * time.Second)
+ waitGroup.Wait() // or channel signal
mocks/queue/JobRepository.go (1)

70-86: Consider factoring out repeated logic for a more readable test.

Multiple mock methods use the same pattern of retrieving and validating the returned error. You could introduce utility methods to reduce code duplication and improve clarity.

queue/driver_async_test.go (2)

56-78: Validate concurrency readiness.

Running jobs and listening in a goroutine is good for async simulation. However, to avoid timing-based flakiness, consider an approach that waits for job completion events rather than sleeping.


185-199: Chaining jobs is well-implemented.

Chained job tests verify sequential execution. Consider adding a test scenario where a chained job fails to ensure that the chain halts appropriately.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 273efc8 and f097ecc.

📒 Files selected for processing (34)
  • contracts/queue/config.go (1 hunks)
  • contracts/queue/driver.go (1 hunks)
  • contracts/queue/job.go (1 hunks)
  • contracts/queue/queue.go (1 hunks)
  • errors/list.go (1 hunks)
  • event/task.go (1 hunks)
  • event/task_test.go (2 hunks)
  • mail/application.go (1 hunks)
  • mail/application_test.go (7 hunks)
  • mocks/queue/Config.go (1 hunks)
  • mocks/queue/Driver.go (1 hunks)
  • mocks/queue/JobRepository.go (1 hunks)
  • mocks/queue/Queue.go (7 hunks)
  • mocks/queue/Worker.go (1 hunks)
  • queue/application.go (1 hunks)
  • queue/application_test.go (0 hunks)
  • queue/config.go (3 hunks)
  • queue/config_test.go (1 hunks)
  • queue/driver.go (1 hunks)
  • queue/driver_async.go (1 hunks)
  • queue/driver_async_test.go (1 hunks)
  • queue/driver_machinery.go (1 hunks)
  • queue/driver_machinery_log.go (1 hunks)
  • queue/driver_machinery_test.go (3 hunks)
  • queue/driver_sync.go (1 hunks)
  • queue/driver_sync_test.go (1 hunks)
  • queue/job.go (1 hunks)
  • queue/job_test.go (1 hunks)
  • queue/machinery.go (0 hunks)
  • queue/service_provider.go (2 hunks)
  • queue/task.go (1 hunks)
  • queue/task_test.go (1 hunks)
  • queue/utils_test.go (2 hunks)
  • queue/worker.go (1 hunks)
💤 Files with no reviewable changes (2)
  • queue/machinery.go
  • queue/application_test.go
✅ Files skipped from review due to trivial changes (1)
  • queue/driver_machinery_log.go
🧰 Additional context used
🪛 golangci-lint (1.62.2)
queue/worker.go

17-17: field driver is unused

(unused)

🔇 Additional comments (116)
queue/application.go (6)

9-10: Use of interfaces to decouple job management is commendable.
The switch from a slice of jobs to a JobRepository interface neatly centralizes job management and enhances scalability.


20-22: Method All() fosters consistent job retrieval.
Returning app.job.All() ensures the retrieval of all registered jobs from the repository. Nice streamlined approach.


31-32: Updated Job method aligns with new argument signature.
Switching to args []any helps pass more flexible argument types to queue tasks.


39-52: Refined Worker method logic.
The dynamic fallback to defaultConnection and concurrency defaults is clean. Be sure to handle cases where payloads[0] might be nil (though the code checks for len(payloads) == 0 first, so it seems safe).


13-16: Constructor uses a new JobRepository implementation.
The constructor now cleanly initializes job via NewJobImpl(). Verify that no other parts of the code (in or outside this file) require the old approach.

✅ Verification successful

The switch to NewJobImpl() is safe and consistent with the codebase
The codebase shows consistent usage of NewJobImpl() for job management. Array/slice operations found in the code serve different purposes (task chaining, bulk operations) and don't conflict with this implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify if any references exist to the old approach of handling jobs directly
rg -A 5 "old approach or any direct usage of job slices"

Length of output: 58


Script:

#!/bin/bash
# Search for job-related type definitions and implementations
ast-grep --pattern 'type $_ struct {
  $$$
  jobs $$$
  $$$
}'

# Look for direct slice operations and old patterns
rg -g '*.go' '(\[\].*job|jobs\s*\[|append.*jobs|len\(jobs|range.*jobs)' -i

# Find all references to job-related methods
rg -g '*.go' '(NewJob|GetJob|AddJob|RemoveJob|ProcessJob)' -i

Length of output: 9522


27-29: Convenient GetJob accessor.
Exposing direct access to app.job.Get(signature) is straightforward. Validate correctness of usage in other calls.

✅ Verification successful

Accessor implementation and usage are correct.
The GetJob method properly encapsulates access to the underlying job storage, while internal implementation details correctly use direct access within the same package.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Searching usage of GetJob for correctness
rg -A 5 "GetJob"

Length of output: 2764


Script:

#!/bin/bash
# Search for usages of the underlying Get method and test files using GetJob
echo "=== Searching for Get method usage ==="
rg "\.Get\(signature\)" -A 3

echo -e "\n=== Searching for GetJob in test files ==="
fd -e test.go -x rg "GetJob" -A 3 {}

Length of output: 561


Script:

#!/bin/bash
# Get more context around the Get usage in job.go
echo "=== Context from job.go ==="
rg -B 5 -A 10 "\.Get\(signature\)" queue/job.go

Length of output: 500

queue/task.go (6)

10-13: Refactored fields in Task struct enhance clarity.
Storing the config interface and using time.Time for delay is more direct than pointer-based types.


18-28: NewTask constructor with flexible args.
Initializing jobs inside this constructor is a neat approach. No immediate issues found.


32-38: Chain constructor parallels single-task construction.
Both constructors keep consistent patterns for config, connection, and queue. Well-maintained.


42-46: Intuitive Delay method.
Directly storing a time.Time object for future scheduling is straightforward. Remember to confirm that the queue driver supports delayed execution.


81-90: Connection & queue overrides via OnConnection and OnQueue.
Both are logical and easy to read. The pattern aligns with the rest of the refactoring.


48-63: Dispatch logic gracefully handles chain vs. single job.
Backing off if r.chain is true is correct. However, ensure that referencing r.jobs[0] is safe if jobs might be empty under any condition.

✅ Verification successful

Access to r.jobs[0] is safe by construction
The code is safe because r.jobs[0] is only accessed when chain=false, which means the Task was created via NewTask() constructor that always initializes the jobs array with exactly one job.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if there's any scenario where r.jobs might be empty
rg "NewTask|NewChainTask" -A 5

Length of output: 3523

queue/worker.go (5)

4-5: Imports for time handling.
Importing "time" is consistent with the new scheduling logic. No concerns.


6-6: Addition of github.com/google/uuid.
Usage of UUIDs for fail-tracking is a good choice. Continue ensuring minimal overhead.


8-10: Relevant queue, errors, and carbon imports.
All new import references appear consistent with current usage.


24-31: NewWorker constructor.
Creating a fail channel for each worker is suitable but watch for resource management if a large number of workers is spawned.


75-84: Failed jobs consumer goroutine.
This robust approach ensures that failures don’t block main job consumption. Just ensure LogFacade is defined and safe for concurrency usage.
[approve]

mocks/queue/Queue.go (5)

23-41: New All() mock method.
Panicking when no return value is specified is typical for mock usage, though consider a default or fallback if that better suits your tests.


43-69: Queue_All_Call struct and helpers.
These additions demonstrate a consistent pattern for mocking. Fine for standard mock usage.


118-145: GetJob mock method.
Allows returning both a queue.Job and an error. Ensure test coverage includes both success and failure scenarios.


177-185: Job mock function with flexible []interface{} parameter.
Switching to a broader type is consistent with the rest of the refactor. No immediate issues.


Line range hint 258-293: Enhanced Worker mock.
Adapting to variadic payloads ...queue.Args matches the real code changes in the queue interface. Looks good.

contracts/queue/job.go (2)

3-4: Time import for delay usage.
Importing "time" is aligned with the new Delay field in Jobs.


20-22: Extended Jobs struct with flexible Args and Delay.
Allowing []any for Args broadens usage scenarios. The Delay time.Duration is straightforward for scheduling.

contracts/queue/config.go (1)

1-4: Nice introduction of the Config interface!

Declaring a separate interface for managing queue configurations is a solid choice, as it makes the design cleaner and easier to extend.

contracts/queue/queue.go (2)

8-9: Ensure valid job retrieval.

GetJob(signature string) (Job, error) is a clean, minimal interface. Just confirm you handle an empty or invalid signature in the implementation to avoid nil returns or panics.


20-20: Great addition of Shutdown.

Providing a graceful shutdown is important for controlled job processing. Be sure to handle outstanding tasks to avoid data inconsistency or partial re-queues.

contracts/queue/driver.go (2)

5-8: Use descriptive constants or preserve a migration plan.

DriverMachinery is marked for removal in v1.17—like with Redis, ensure there’s a clear transition strategy for users before dropping support.


10-23: Interface ordering is fine; keep it consistent within the driver suite.

(Some previously suggested sorting is optional. )

queue/task_test.go (1)

30-31: Excellent demonstration of simplified argument usage.

Switching from custom arg structs to []any provides flexibility, though it relaxes type safety. Ensure you have robust internal checks if these arguments can vary in type.

queue/driver.go (3)

1-2: File Header Uniformity

All looks good here. The package declaration and imports are consistent with the project's style.


8-28: Validate initialization order & concurrency

The NewDriver function relies on external resources like LogFacade (in the case of the Machinery driver). Make sure that LogFacade is already initialized within the Boot phase of the service provider before any code path uses DriverMachinery.


21-24: Well-structured error handling

The fallback for custom drivers is well-implemented. Good job returning a clear error for invalid driver types.

queue/driver_sync.go (4)

1-2: Overall Implementation Consistency

The synchronous driver structure and constructor are clear and consistent with the rest of the codebase.


27-29: Immediate job execution is appropriate for sync driver

The Push method immediately calls job.Handle(args...). This is correct for a synchronous driver.


44-47: Potential blocking on large Delay intervals

The time.Sleep(time.Until(delay)) call in Later can block the caller’s goroutine for a potentially long duration. This is acceptable for a sync driver but confirm that it meets your production requirements.


49-52: Pop operation not supported

Good approach returning nil, nil, nil for a feature the sync driver doesn’t support. Just ensure any calling code handles the nil returns without error.

queue/service_provider.go (3)

14-17: Potentially unused global facades

Check if OrmFacade is needed. If it remains unused, consider removing it to keep the code clean.


34-38: Ensure initialization order matches usage

LogFacade = app.MakeLog() and OrmFacade = app.MakeOrm() are in Boot. If any queue driver references them at Register time, you might experience a nil reference. Verify that none of the queue drivers attempt to use these facades before Boot.


40-42: Encapsulated command registration

Creating a dedicated registerCommands method helps keep the boot logic clean. No issues spotted.

event/task.go (1)

64-67: Loss of structured argument typing

Switching from a custom struct to []any in eventArgsToQueueArgs provides flexibility but removes compile-time type checks. Confirm this trade-off is intentional across the codebase.

queue/job_test.go (4)

1-1: Good test package and suite naming.
The package queue and the JobTestSuite struct accurately reflect the tested functionality.


20-22: Test isolation confirmation.
Creating a new JobImpl in SetupTest helps ensure each test has a fresh state, which is good practice.


34-41: Confirm argument usage in job execution.
While you verify that Call can handle arguments, there's no test to confirm that arguments passed into Call are properly used by the job (Handle). Consider adding an assertion on the job’s internal state to ensure arguments are correct.


62-74: Well-structured mock job prioritizes clarity.
Using a MockJob that sets a called boolean is straightforward. This is a suitable pattern for these unit tests.

queue/driver_machinery_test.go (2)

10-11: Mock package rename is consistent.
Renaming from configmock/logmock to mocksconfig/mockslog clarifies the usage of these mocks. Ensure meeting internal naming conventions across all mocks in the project.

Also applies to: 16-17


58-60: Confirm the presence of concurrency or errors.
You replaced direct server initialization with a function call server := s.machinery.server(...). It’s good to confirm if concurrency or other drivers might raise unexpected errors. Ensure they’re covered by integration or upstream tests.

event/task_test.go (1)

34-34: Argument handling improvements.
Replacing []queuecontract.Arg with []any is consistent with the new flexible approach to argument handling. However, consider verifying that the interface usage doesn’t break type-checking logic later in the call chain.

Also applies to: 51-51

queue/driver_async.go (4)

1-1: Package name matches usage.
Naming the package queue is consistent with the async driver’s functionality.


33-36: Push is non-blocking.
Push sends to a buffered channel. If the channel becomes full, the send will block. Confirm that the buffer size is sufficient or that you have a safe strategy for handling bursts.


63-75: Potential race conditions on channel read.
If the queue is closed or replaced concurrently in the future, reading from the channel might panic. Currently, your approach is safe for the tested usage, but be mindful of potential changes in usage patterns.


78-86: Ensure correct channel capacity usage.
LoadOrStore might retrieve an existing channel with a different capacity. If you want dynamic sizing to persist, confirm that a second call to getQueue with a larger size does not overwrite the previously stored channel.

queue/job.go (3)

13-21: Consider adding a corresponding database migration for FailedJob.

This struct is annotated with GORM tags but there's no mention of a corresponding migration file to create the table for failed jobs in the database. If a migration is missing, please add one.


23-29: Validate potential concurrency implications of sync.Map.

The jobs sync.Map is a straightforward approach to concurrency-safe storage. Ensure that:

  1. The job registry won't encounter race conditions in high-concurrency scenarios (frequent reads and writes).
  2. The type-assertions from any to contractsqueue.Job in subsequent methods are safe.

61-66: Confirm no collision on multiple concurrent job registrations.

As Register() loops over the provided jobs and calls r.jobs.Store, it should be OK under concurrent usage. However, if there can be repeated signatures in the jobs slice, carefully consider how overwriting might affect job definitions.

queue/config_test.go (2)

8-9: Nice usage of the new mock naming convention.

Switching to mocksconfig and injecting via mocksconfig.NewConfig(s.T()) for a more flexible test setup is a clean approach. This ensures consistent naming and usage patterns compared to the old configmock.

Also applies to: 14-15


23-23: Validate adequate coverage of queue config test cases.

The test covers the default connection with “redis.” If additional queue types or connections exist (e.g., “sync,” “sqs”), ensure that they are also tested here or in subsequent test methods to confirm broad coverage.

queue/driver_machinery.go (2)

25-31: Configuration injection approach is consistent.

The constructor NewMachinery aligns well with dependency injection for log and config. This is good for maintainability and testability.


62-85: Validate Redis server settings in server(...).

The code constructs a Machinery server with Redis broker/ backend configurations. Ensure:

  1. Proper credentials management if not using password or if partial credentials are present.
  2. A plan to handle ephemeral test environments where Redis might not be available.
queue/config.go (1)

37-41: Check error handling for FailedJobsQuery.

The method always returns a valid orm.Query, but if the connection or table is missing, queries may fail at runtime. Consider verifying the presence of these config values or returning an error if they're invalid.

queue/utils_test.go (4)

9-9: No issues found with the import alias update.
Changing the import alias to contractsqueue improves clarity.


46-46: Good use of the new import alias.
No functional or logical issues are found.


52-52: Proper alias usage.
Continues the same refactoring pattern. No concerns.


59-59: Refactored type references look good.
No regressions introduced.

mocks/queue/Worker.go (1)

83-108: Mock call struct for “Shutdown” adequately follows the pattern.
The approach is consistent with the existing Run call struct. Make sure all tests set expectations and return values correctly.

queue/driver_sync_test.go (5)

19-24: Applicative naming.
Using a clearly named struct (DriverSyncTestSuite) clarifies intent in these tests.


26-37: Registering jobs in Setup looks clean.
All relevant job types are registered before running tests, promoting clarity in the test flow.


39-42: Reset the counters for each test.
Good practice to ensure test isolation.


78-91: TestSyncJob’s signature and handler.
Implementation looks correct. The job counters are incremented as expected.


93-106: TestChainSyncJob’s signature and handler.
No specific issues identified. Code is straightforward and meets its intended functionality.

mail/application.go (1)

73-81: 🛠️ Refactor suggestion

Transition from typed arguments to []any.
The reduced structure eliminates explicit type annotation but may reduce type safety. Ensure downstream usage of these arguments handles type casting gracefully.

-job := r.queue.Job(NewSendMailJob(r.config), []any{
+// Optionally, reintroduce typed checks if clarity or safety are important:
 job := r.queue.Job(NewSendMailJob(r.config), []any{
     ...
 })

Likely invalid or redundant comment.

mocks/queue/JobRepository.go (2)

1-1: Auto-generated code by mockery.

As this is generated code, manual modifications may be overwritten. Any necessary changes should be made in the interface or generated anew.


27-29: Panic on missing return value.

If a test forgets to specify a return value for All(), the code will panic. While this is standard for testify mocks, consider using a default or fail-fast approach for a clearer error message.

mail/application_test.go (3)

36-36: Confirm that 465 is a valid test scenario.

Using port 465 typically implies TLS for sending mail. Ensure that the environment or test credentials support this secured channel.


79-81: Recommend verifying queue setup.

After switching to the newly refactored async queue, ensure that queue.NewApplication(mockConfig) references the correct driver. The test appears to rely on redis in some scenarios. Confirm that all environment variables for Redis are set if needed.


165-172: Good approach to return a unified mockConfig.

Centralizing the environment and config gathering in one function ensures consistency across test cases. Keep in mind that any advanced re-configuration during test runtime might require re-invoking this function or clearing existing mocks.

errors/list.go (1)

107-115: New queue-related errors align well with the refactoring.

These errors cover missing jobs, invalid drivers, etc., providing more granular handling for queue operations.

queue/driver_async_test.go (1)

31-42: Sensible test suite setup.

Initializing the application once and running the suite helps keep the tests cohesive. Ensure that each test re-initializes state as needed in SetupTest.

mocks/queue/Driver.go (17)

1-2: Auto-generated code notice.
This file was generated by mockery; typically, changes to auto-generated code aren't manually edited. Ensure that this file is either kept in sync with its source or excluded from version control if it causes noise in diffs.


12-15: Mock struct for Driver.
The struct correctly embeds mock.Mock to simulate the Driver interface in tests.


17-19: Expecter struct.
This small helper struct for setting up expectations is consistent with typical mock usage.


21-23: EXPECT() usage.
Returns the Driver_Expecter for handling chained mock calls. Straightforward approach.


25-41: Bulk method.
Properly checks the call arguments and uses default panic if no return value is set. This aligns with standard testify/mock patterns.


43-70: Driver_Bulk_Call.
Defines chained methods to set up behaviors for Bulk. Looks consistent with testify/mock chaining.


72-88: Connection method.
Retrieves a string from the mock call. Follows the same pattern as the Bulk method.


90-116: Driver_Connection_Call.
Helper struct for mocking out the Connection() behavior, consistent with the established pattern.


117-133: Driver method.
Another simple mock method returning a string. No issues identified.


135-160: Driver_Driver_Call.
Helper struct for setting test expectations on Driver() calls.


162-178: Later method.
Uses the standard approach to handle the function signature with multiple parameters. Looks correct.


180-210: Driver_Later_Call.
Maintains the same pattern for chained calls with Run, Return, and RunAndReturn.


211-248: Pop method.
Mimics the multi-valued return functionality used in typical queue pop operations. Implementation is consistent.


250-276: Driver_Pop_Call.
Helper struct for mocking Pop. Nothing unusual here.


278-294: Push method.
Handles the push operation with standard testify/mock usage.


296-324: Driver_Push_Call.
A typical call struct for the Push method. No issues detected.


326-338: NewDriver factory method.
Registers the mock with the testing.T and sets up a cleanup function. This ensures mock expectations are properly asserted.

mocks/queue/Config.go (21)

1-2: Auto-generated code notice.
Similar to Driver.go, this file is mockery-generated. Consider excluding this from manual edits.


10-13: Mock struct for Config.
Correct structure for embedding mock.Mock and simulating Config in tests.


15-17: Config_Expecter.
Helper struct, standard pattern for chaining mock calls.


19-21: EXPECT() usage.
Returns the expecter for chaining. No concerns here.


23-39: Debug method.
Correctly panics if no return is defined. This is standard for testify mocks.


41-67: Config_Debug_Call.
Helper for Debug() method. Implementation is consistent with other mocks.


68-84: DefaultConnection method.
Retrieves or panics if no return set. Typical mock approach.


86-112: Config_DefaultConnection_Call.
Helper for the DefaultConnection() call. Implementation is consistent.


113-129: Driver method.
Mocks retrieving a driver string by connection. No issues apparent.


131-158: Config_Driver_Call.
Handle for the Driver() call. Follows the same pattern.


159-177: FailedJobsQuery method.
Returns an orm.Query or panics otherwise. Mock usage is correct.


179-205: Config_FailedJobsQuery_Call.
Helper struct for setting expectations on FailedJobsQuery.


206-222: Queue method.
Mocks returning a queue name for a given connection. No issue identified.


224-251: Config_Queue_Call.
Provides chainable setup for Queue(). Implementation is fine.


253-286: Redis method.
Supports returning triple values: (dsn, database, alias). The multi-return approach is correct.


288-314: Config_Redis_Call.
Handles the chainable mock calls for Redis.


316-332: Size method.
Returns an integer for the specified connection or panics if unset.


334-360: Config_Size_Call.
Defines the chainable API for configuring the Size() mock.


362-380: Via method.
Handles a generic interface return. No logic issues apparent.


382-408: Config_Via_Call.
Same pattern for the Via() method. Straightforward chainable approach.


410-422: NewConfig factory method.
Registers the mock with the testing environment. Good practice to ensure verification.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (4)
queue/driver_machinery_test.go (4)

1-1: Consider adding more context to the TODO comment.

The TODO comment indicates removal in v1.17 but doesn't explain why. Add a brief explanation of the removal reason to help future maintainers understand the deprecation plan.

-// TODO: Will be removed in v1.17
+// TODO: Will be removed in v1.17 as part of the queue system refactoring (see issue #153)

Line range hint 251-281: Enhance error test coverage.

The error test case only verifies that the job wasn't executed. Consider adding tests for:

  • Invalid job registration
  • Job timeout scenarios
  • Redis connection failures

391-394: Add argument validation and documentation.

The job handler accepts variadic arguments without validation or documentation of expected types.

+// Handle Execute the job.
+// Args:
+//   args[0]: string - test name
+//   args[1]: int - test counter
 func (receiver *TestMachineryJob) Handle(args ...any) error {
+    if len(args) < 2 {
+        return errors.New("insufficient arguments: expected test name and counter")
+    }
     testMachineryJob++
     return nil
 }

466-475: Improve error handling in chain job.

The error case in TestChainMachineryJob could be more descriptive and include validation.

 func (receiver *TestChainMachineryJob) Handle(args ...any) error {
+    if len(args) == 0 {
+        return errors.New("missing arguments")
+    }
     if len(args) > 0 && cast.ToBool(args[0]) {
         testChainMachineryJobError++
-        return errors.New("error")
+        return errors.New("chain job failed: error flag was set")
     }
     testChainMachineryJob++
     return nil
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8e8ab94 and 25b57fd.

📒 Files selected for processing (1)
  • queue/driver_machinery_test.go (11 hunks)
⏰ Context from checks skipped due to timeout of 300000ms (2)
  • GitHub Check: test / windows (1.23)
  • GitHub Check: test / windows (1.22)

@hwbrzzl
Copy link
Contributor

hwbrzzl commented Jan 18, 2025

Hey @devhaozi Do you need help with the CI error? And have a question: Will the DB, Redis drivers be implemented?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
queue/driver_async_test.go (2)

34-44: Improve test setup organization and efficiency.

Consider these improvements:

  1. Move job registration to a helper method for better readability
  2. Use a slice to store job types for easier maintenance
+func (s *DriverAsyncTestSuite) getTestJobs() []queue.Job {
+	return []queue.Job{
+		&TestAsyncJob{},
+		&TestDelayAsyncJob{},
+		&TestCustomAsyncJob{},
+		&TestErrorAsyncJob{},
+		&TestChainAsyncJob{},
+	}
+}

 func (s *DriverAsyncTestSuite) SetupTest() {
 	testAsyncJob = 0
 	testDelayAsyncJob = 0
 	testCustomAsyncJob = 0
 	testErrorAsyncJob = 0
 	testChainAsyncJob = 0
 	s.mockQueue = mocksqueue.NewQueue(s.T())
 	s.mockConfig = mocksconfig.NewConfig(s.T())
 	s.app = NewApplication(s.mockConfig)
-	s.app.Register([]queue.Job{&TestAsyncJob{}, &TestDelayAsyncJob{}, &TestCustomAsyncJob{}, &TestErrorAsyncJob{}, &TestChainAsyncJob{}})
+	s.app.Register(s.getTestJobs())
 }

165-238: Reduce duplication in test job implementations.

Consider creating a test helper to generate job implementations:

+type testJob struct {
+	signature string
+	counter   *int
+}
+
+func (j *testJob) Signature() string {
+	return j.signature
+}
+
+func (j *testJob) Handle(args ...any) error {
+	*j.counter++
+	return nil
+}
+
+func newTestJob(signature string, counter *int) queue.Job {
+	return &testJob{signature: signature, counter: counter}
+}

-type TestAsyncJob struct {}
-func (receiver *TestAsyncJob) Signature() string {
-	return "test_async_job"
-}
-func (receiver *TestAsyncJob) Handle(args ...any) error {
-	testAsyncJob++
-	return nil
-}
+var TestAsyncJob = newTestJob("test_async_job", &testAsyncJob)

This approach:

  1. Reduces code duplication
  2. Makes it easier to add new test job types
  3. Centralizes job behavior modifications
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 25b57fd and 3963d57.

⛔ Files ignored due to path filters (1)
  • tests/go.sum is excluded by !**/*.sum
📒 Files selected for processing (3)
  • queue/driver_async_test.go (1 hunks)
  • queue/driver_machinery_test.go (10 hunks)
  • queue/worker_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • queue/worker_test.go
🔇 Additional comments (7)
queue/driver_machinery_test.go (5)

1-2: Remove TODO comment with version information.

The TODO comment indicates this file will be removed in v1.17. Consider adding more context about why it will be removed and what will replace it.


24-32: Add test cleanup for global variables.

Using global variables for test state can lead to test interference. This was previously flagged in past reviews.

Add a TearDownTest method to reset the counters:

+func (s *MachineryTestSuite) TearDownTest() {
+    testMachineryJob = 0
+    testMachineryJobOfDisableDebug = 0
+    testDelayMachineryJob = 0
+    testCustomMachineryJob = 0
+    testErrorMachineryJob = 0
+    testChainMachineryJob = 0
+    testChainMachineryJobError = 0
+}

140-146: Replace sleep with proper synchronization.

Using fixed sleep durations for async operations can make tests flaky. This was previously flagged in past reviews.

Use channels or sync.WaitGroup for proper synchronization. Example implementation:

+var wg sync.WaitGroup
+wg.Add(1)
 s.Nil(s.app.Job(&TestMachineryJob{}, []any{
   "TestDefaultAsyncQueue_EnableDebug",
   1,
-}).OnQueue("debug").Dispatch())
-time.Sleep(2 * time.Second)
+}).OnQueue("debug").Dispatch())
+wg.Wait()

Then modify the job handler:

 func (receiver *TestMachineryJob) Handle(args ...any) error {
   testMachineryJob++
+  wg.Done()
   return nil
 }

Also applies to: 171-177, 204-212, 242-248, 275-281, 309-329, 358-372


58-71: LGTM: Test setup is well-structured.

The test setup properly initializes all required mocks and resets global state.


73-107: LGTM: Server configuration test is comprehensive.

The test case thoroughly verifies the server configuration with Redis connection parameters.

queue/driver_async_test.go (2)

64-163: Address commented test cases.

The file contains several commented test cases that appear to be work in progress. Either:

  1. Enable and fix these tests if they're ready to be implemented
  2. Remove them if they're no longer needed
  3. Add TODO comments explaining why they're commented out and when they'll be enabled

Let's check if these tests are tracked in any issues:


15-21: 🛠️ Refactor suggestion

Move job counters into test suite struct to prevent test pollution.

Global variables can cause issues when tests run in parallel. Consider moving these counters into the test suite struct:

-var (
-	testAsyncJob       = 0
-	testDelayAsyncJob  = 0
-	testCustomAsyncJob = 0
-	testErrorAsyncJob  = 0
-	testChainAsyncJob  = 0
-)

 type DriverAsyncTestSuite struct {
 	suite.Suite
 	app        *Application
 	mockConfig *mocksconfig.Config
 	mockQueue  *mocksqueue.Queue
+	counters   struct {
+		asyncJob       int
+		delayAsyncJob  int
+		customAsyncJob int
+		errorAsyncJob  int
+		chainAsyncJob  int
+	}
 }

Likely invalid or redundant comment.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
queue/driver_async_test.go (3)

15-21: 🛠️ Refactor suggestion

Move test counters to suite struct to support parallel testing.

Global variables can cause test pollution when tests run in parallel. Consider moving these counters into the test suite struct.

-var (
-	testAsyncJob       = 0
-	testDelayAsyncJob  = 0
-	testCustomAsyncJob = 0
-	testErrorAsyncJob  = 0
-	testChainAsyncJob  = 0
-)

 type DriverAsyncTestSuite struct {
 	suite.Suite
 	app        *Application
 	mockConfig *mocksconfig.Config
 	mockQueue  *mocksqueue.Queue
+	counters   struct {
+		asyncJob       int
+		delayAsyncJob  int
+		customAsyncJob int
+		errorAsyncJob  int
+		chainAsyncJob  int
+	}
 }

46-62: 🛠️ Refactor suggestion

Replace sleep with sync primitives for reliable testing.

Using fixed sleep durations can make tests flaky. Consider using channels or sync.WaitGroup for synchronization.

 func (s *DriverAsyncTestSuite) TestDefaultAsyncQueue() {
+	done := make(chan struct{})
 	s.mockConfig.EXPECT().GetString("queue.default").Return("async").Times(3)
 	s.mockConfig.EXPECT().GetString("app.name").Return("goravel").Times(2)
 	s.mockConfig.EXPECT().GetString("queue.connections.async.queue", "default").Return("default").Twice()
 	s.mockConfig.EXPECT().GetString("queue.connections.async.driver").Return("async").Twice()
 	s.mockConfig.EXPECT().GetInt("queue.connections.async.size", 100).Return(10).Twice()

 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 	go func(ctx context.Context) {
 		s.Nil(s.app.Worker().Run())
 	}(ctx)
-	time.Sleep(1 * time.Second)
+	// Wait for worker to start
+	time.Sleep(100 * time.Millisecond)
+
+	// Modify TestAsyncJob.Handle to signal completion
+	oldHandle := TestAsyncJob{}.Handle
+	defer func() { TestAsyncJob{}.Handle = oldHandle }()
+	TestAsyncJob{}.Handle = func(args ...any) error {
+		err := oldHandle(args...)
+		done <- struct{}{}
+		return err
+	}
+
 	s.Nil(s.app.Job(&TestAsyncJob{}, []any{"TestDefaultAsyncQueue", 1}).Dispatch())
-	time.Sleep(2 * time.Second)
+	
+	select {
+	case <-done:
+		// Job completed successfully
+	case <-time.After(2 * time.Second):
+		s.Fail("Job did not complete within timeout")
+	}
+
 	s.Equal(1, testAsyncJob)
 }

132-163: ⚠️ Potential issue

Fix mock expectations and add order verification in chain test.

The test needs correct mock expectations and should verify job execution order.

 func (s *DriverAsyncTestSuite) TestChainAsyncQueue() {
-	s.mockConfig.EXPECT().GetString("queue.default").Return("async").Times(3)
+	s.mockConfig.EXPECT().GetString("queue.default").Return("async").Times(6)
 	s.mockConfig.EXPECT().GetString("app.name").Return("goravel").Times(3)
-	s.mockConfig.EXPECT().GetString("queue.connections.async.queue", "default").Return("default").Once()
+	s.mockConfig.EXPECT().GetString("queue.connections.async.queue", "default").Return("default").Times(2)
 	s.mockConfig.EXPECT().GetString("queue.connections.async.driver").Return("async").Twice()
-	s.mockConfig.EXPECT().GetInt("queue.connections.async.size", 100).Return(10).Twice()
+	s.mockConfig.EXPECT().GetInt("queue.connections.async.size", 100).Return(10).Times(3)

+	// Track job execution order
+	var executionOrder []string
+	orderMutex := &sync.Mutex{}
+	done := make(chan struct{})
+
+	// Modify job handlers to track order
+	oldChainHandle := TestChainAsyncJob{}.Handle
+	oldAsyncHandle := TestAsyncJob{}.Handle
+	defer func() {
+		TestChainAsyncJob{}.Handle = oldChainHandle
+		TestAsyncJob{}.Handle = oldAsyncHandle
+	}()
+
+	TestChainAsyncJob{}.Handle = func(args ...any) error {
+		err := oldChainHandle(args...)
+		orderMutex.Lock()
+		executionOrder = append(executionOrder, "chain")
+		orderMutex.Unlock()
+		return err
+	}
+
+	TestAsyncJob{}.Handle = func(args ...any) error {
+		err := oldAsyncHandle(args...)
+		orderMutex.Lock()
+		executionOrder = append(executionOrder, "async")
+		orderMutex.Unlock()
+		done <- struct{}{}
+		return err
+	}

 	// ... rest of the test ...

-	time.Sleep(3 * time.Second)
+	select {
+	case <-done:
+		// Verify both execution count and order
+		s.Equal(1, testChainAsyncJob, "First job in chain should execute once")
+		s.Equal(1, testAsyncJob, "Second job in chain should execute once")
+		s.Equal([]string{"chain", "async"}, executionOrder, "Jobs should execute in correct order")
+	case <-time.After(3 * time.Second):
+		s.Fail("Chain did not complete within timeout")
+	}
 }
🧹 Nitpick comments (7)
queue/task.go (1)

18-43: Simplify the args parameter in NewTask.

The current implementation accepts variadic slice args but only uses the first slice. This could be confusing for users.

Consider simplifying the signature to:

-func NewTask(config queue.Config, job queue.Job, args ...[]any) *Task {
-	if len(args) == 0 {
-		args = append(args, []any{})
-	}
+func NewTask(config queue.Config, job queue.Job, args []any) *Task {
+	if args == nil {
+		args = []any{}
+	}

And update the struct initialization:

-				Args: args[0],
+				Args: args,
contracts/queue/queue.go (3)

4-5: Complete the Chain method documentation.

The comment for the Chain method is incomplete and doesn't explain what happens after jobs are processed one by one.

-	// Chain creates a chain of jobs to be processed one by one, passing
+	// Chain creates a chain of jobs to be processed one by one, passing the result of each job to the next one in the chain.

19-21: Add documentation for the Shutdown method.

The Shutdown method in the Worker interface lacks documentation explaining its purpose and behavior.

 type Worker interface {
 	Run() error
+	// Shutdown gracefully stops the worker and waits for ongoing jobs to complete
 	Shutdown() error
 }

23-30: Improve documentation for Args struct fields.

The field comments in the Args struct should follow Go's documentation conventions by starting with the field name.

 type Args struct {
-	// Specify connection
+	// Connection specifies the queue connection to use
 	Connection string
-	// Specify queue
+	// Queue specifies the queue name to process
 	Queue string
-	// Concurrent num
+	// Concurrent specifies the number of concurrent workers
 	Concurrent int
 }
queue/application.go (2)

40-54: Simplify Worker method's default value handling.

The Worker method's default value handling could be simplified by using a helper function or struct initialization.

 func (app *Application) Worker(payloads ...queue.Args) queue.Worker {
-	defaultConnection := app.config.DefaultConnection()
-
-	if len(payloads) == 0 {
-		return NewWorker(app.config, 1, defaultConnection, app.config.Queue(defaultConnection, ""), app.job)
-	}
-	if payloads[0].Connection == "" {
-		payloads[0].Connection = defaultConnection
-	}
-	if payloads[0].Concurrent == 0 {
-		payloads[0].Concurrent = 1
-	}
-
-	return NewWorker(app.config, payloads[0].Concurrent, payloads[0].Connection, app.config.Queue(payloads[0].Connection, payloads[0].Queue), app.job)
+	payload := queue.Args{
+		Connection: app.config.DefaultConnection(),
+		Queue:     "",
+		Concurrent: 1,
+	}
+	
+	if len(payloads) > 0 {
+		if payloads[0].Connection != "" {
+			payload.Connection = payloads[0].Connection
+		}
+		if payloads[0].Queue != "" {
+			payload.Queue = payloads[0].Queue
+		}
+		if payloads[0].Concurrent > 0 {
+			payload.Concurrent = payloads[0].Concurrent
+		}
+	}
+	
+	return NewWorker(app.config, payload.Concurrent, payload.Connection, app.config.Queue(payload.Connection, payload.Queue), app.job)

28-30: Consider renaming GetJobs to All for consistency.

The GetJobs method could be renamed to All to maintain consistency with the repository's method naming.

-func (app *Application) GetJobs() []queue.Job {
+func (app *Application) All() []queue.Job {
 	return app.job.All()
 }
queue/driver_async_test.go (1)

174-178: Add input validation in Handle methods.

The Handle methods should validate their input arguments before processing.

 func (receiver *TestAsyncJob) Handle(args ...any) error {
+	if len(args) < 2 {
+		return fmt.Errorf("expected at least 2 arguments, got %d", len(args))
+	}
 	testAsyncJob++
 	return nil
 }

Apply similar changes to other job types.

Also applies to: 189-193, 204-208, 219-223, 234-238

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3963d57 and ac43ad3.

📒 Files selected for processing (5)
  • contracts/queue/queue.go (1 hunks)
  • mocks/queue/Queue.go (7 hunks)
  • queue/application.go (1 hunks)
  • queue/driver_async_test.go (1 hunks)
  • queue/task.go (1 hunks)
🔇 Additional comments (4)
queue/task.go (3)

9-16: LGTM! Good improvements to the struct fields.

The changes to use interface type for config and value type for time.Time align with Go best practices.


45-49: LGTM! Good improvements to task management methods.

The changes improve consistency and maintainability:

  • Consistent receiver naming
  • Simplified implementations
  • Proper use of config interface methods

Also applies to: 84-94


69-82: Consider handling delay in DispatchSync.

The synchronous dispatch ignores the delay setting, which could lead to unexpected behavior if users expect the delay to be honored even in synchronous execution.

Consider either:

  1. Honoring the delay using time.Sleep
  2. Documenting that delays are ignored in synchronous execution
  3. Returning an error if delay is set for synchronous execution

Would you like me to propose an implementation for any of these options?

mocks/queue/Queue.go (1)

1-346: LGTM! Auto-generated mock implementation.

The mock implementation correctly reflects the Queue interface changes and includes proper type assertions and error handling.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (5)
queue/driver_machinery_test.go (3)

134-140: ⚠️ Potential issue

Replace time.Sleep with proper synchronization.

Using fixed sleep durations for async operations can make tests flaky. Consider using channels or sync.WaitGroup for proper synchronization.

This is a duplicate of a previous review comment. The suggestion to use channels or sync.WaitGroup for proper synchronization is still valid and should be implemented.

Also applies to: 161-166, 190-196, 224-229, 253-258, 283-300, 328-340


355-359: ⚠️ Potential issue

Add input validation in job handlers.

The job handlers don't validate their input arguments before using them. This could lead to runtime panics if the wrong number or type of arguments are passed.

This is a duplicate of a previous review comment. The suggestion to add argument validation in job handlers is still valid and should be implemented.

Also applies to: 370-374, 385-389, 400-404, 415-419, 430-440


57-70: ⚠️ Potential issue

Add test cleanup for global variables.

The test setup resets global variables but there's no cleanup in TearDownTest. This could lead to test interference.

This is a duplicate of a previous review comment. The suggestion to add a TearDownTest method to reset the counters is still valid and should be implemented.

queue/driver_async_test.go (2)

14-20: 🛠️ Refactor suggestion

Move test counters into test suite struct.

Global variables can lead to test pollution when tests run in parallel. Consider moving these counters into the test suite struct.

-var (
-	testAsyncJob       = 0
-	testDelayAsyncJob  = 0
-	testCustomAsyncJob = 0
-	testErrorAsyncJob  = 0
-	testChainAsyncJob  = 0
-)

 type DriverAsyncTestSuite struct {
 	suite.Suite
 	app        *Application
 	mockConfig *mocksconfig.Config
 	mockQueue  *mocksqueue.Queue
+	counters   struct {
+		asyncJob       int
+		delayAsyncJob  int
+		customAsyncJob int
+		errorAsyncJob  int
+		chainAsyncJob  int
+	}
 }

52-61: 🛠️ Refactor suggestion

Replace sleep-based synchronization with channels or WaitGroup.

Using fixed sleep durations can make tests flaky. Consider using channels or sync.WaitGroup for more reliable synchronization.

Here's an example refactor for one of the test methods:

 func (s *DriverAsyncTestSuite) TestDefaultAsyncQueue() {
+	done := make(chan struct{})
+	defer close(done)
+
 	worker := s.app.Worker()
 	go func() {
 		s.Nil(worker.Run())
 	}()
-	time.Sleep(1 * time.Second)
+
+	// Wait for worker to start
+	time.Sleep(100 * time.Millisecond)
+
+	// Create a job completion channel
+	jobDone := make(chan struct{})
+	defer close(jobDone)
+
+	// Wrap the job handler to signal completion
+	originalHandle := TestAsyncJob{}.Handle
+	TestAsyncJob{}.Handle = func(args ...any) error {
+		err := originalHandle(args...)
+		jobDone <- struct{}{}
+		return err
+	}
+	defer func() { TestAsyncJob{}.Handle = originalHandle }()
+
 	s.Nil(s.app.Job(&TestAsyncJob{}, []any{"TestDefaultAsyncQueue", 1}).Dispatch())
-	time.Sleep(2 * time.Second)
-	s.Equal(1, testAsyncJob)
+
+	// Wait for job completion with timeout
+	select {
+	case <-jobDone:
+		s.Equal(1, s.counters.asyncJob)
+	case <-time.After(2 * time.Second):
+		s.Fail("Job did not complete within timeout")
+	}
+
 	s.NoError(worker.Shutdown())
 }

Also applies to: 73-83, 98-105, 119-126, 139-158

🧹 Nitpick comments (3)
queue/driver_machinery_test.go (2)

1-2: Add more context to the TODO comment.

The TODO comment lacks context about why the file will be removed and what will replace it. This information is crucial for future maintenance.

-// TODO: Will be removed in v1.17
+// TODO: Will be removed in v1.17 as part of the queue system refactoring.
+// This implementation will be replaced by <replacement details>.
+// See issue #<issue-number> for more details.

108-142: Add negative test cases.

The test suite lacks negative test cases for error scenarios such as:

  • Invalid job registration
  • Invalid queue names
  • Network failures
  • Redis connection issues

Consider adding test cases to verify error handling in these scenarios. Would you like me to help generate these test cases?

Also applies to: 144-169, 171-200, 202-232, 234-261, 263-305, 307-344

queue/driver_async_test.go (1)

160-173: Improve job implementations with better documentation and error handling.

The job implementations could benefit from better documentation and error handling.

 type TestAsyncJob struct {
+	// Add fields if needed for test configuration
 }

-// Signature The name and signature of the job.
+// Signature returns the unique identifier for this job type.
+// This signature is used by the queue system to match jobs with their handlers.
 func (receiver *TestAsyncJob) Signature() string {
 	return "test_async_job"
 }

-// Handle Execute the job.
+// Handle processes the job with the given arguments.
+// It increments the test counter to track execution.
+// Args:
+//   - args: Variable number of arguments passed to the job
+// Returns:
+//   - error: nil on success, error on failure
 func (receiver *TestAsyncJob) Handle(args ...any) error {
+	if len(args) < 2 {
+		return fmt.Errorf("insufficient arguments: expected at least 2, got %d", len(args))
+	}
-	testAsyncJob++
+	s.counters.asyncJob++
 	return nil
 }

Also applies to: 175-188, 190-203, 205-218, 220-233

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d157f3c and ca55bef.

📒 Files selected for processing (3)
  • queue/driver_async_test.go (1 hunks)
  • queue/driver_machinery_test.go (3 hunks)
  • queue/worker_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • queue/worker_test.go
⏰ Context from checks skipped due to timeout of 300000ms (2)
  • GitHub Check: test / windows (1.23)
  • GitHub Check: test / windows (1.22)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
queue/config.go (1)

37-41: Add validation for failed jobs table configuration.

The FailedJobsQuery method should validate that the required configuration exists.

 func (r *Config) FailedJobsQuery() orm.Query {
 	connection := r.config.GetString("queue.failed.database")
 	table := r.config.GetString("queue.failed.table")
+	if table == "" {
+		table = "failed_jobs" // Default table name
+	}
 	return OrmFacade.Connection(connection).Query().Table(table)
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ca55bef and 2af088c.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (10)
  • contracts/queue/config.go (1 hunks)
  • contracts/queue/driver.go (1 hunks)
  • go.mod (2 hunks)
  • mocks/queue/Config.go (1 hunks)
  • queue/config.go (2 hunks)
  • queue/config_test.go (1 hunks)
  • queue/driver.go (1 hunks)
  • queue/utils.go (0 hunks)
  • queue/utils_test.go (0 hunks)
  • queue/worker.go (1 hunks)
💤 Files with no reviewable changes (2)
  • queue/utils_test.go
  • queue/utils.go
🚧 Files skipped from review as they are similar to previous changes (4)
  • queue/driver.go
  • contracts/queue/config.go
  • contracts/queue/driver.go
  • mocks/queue/Config.go
🔇 Additional comments (6)
queue/config_test.go (1)

8-9: LGTM!

The changes to import paths and mock usage are correct, and the test coverage remains comprehensive.

Also applies to: 14-15, 23-24

queue/config.go (1)

29-35: 🛠️ Refactor suggestion

Add error handling for missing or invalid driver configuration.

The Driver method should handle cases where the configuration is missing or invalid.

 func (r *Config) Driver(connection string) string {
 	if connection == "" {
 		connection = r.DefaultConnection()
 	}
 
-	return r.config.GetString(fmt.Sprintf("queue.connections.%s.driver", connection))
+	driver := r.config.GetString(fmt.Sprintf("queue.connections.%s.driver", connection))
+	if driver == "" {
+		return "sync" // Default to sync driver if not configured
+	}
+	return driver

Likely invalid or redundant comment.

queue/worker.go (3)

57-61: 🛠️ Refactor suggestion

Add error logging for job Pop failures.

Silent failures in job processing can make debugging difficult.

 				job, args, err := driver.Pop(r.queue)
 				if err != nil {
+					LogFacade.Error(errors.Wrap(err, "failed to pop job from queue"))
 					time.Sleep(1 * time.Second)
 					continue
 				}

Likely invalid or redundant comment.


77-85: ⚠️ Potential issue

Fix race condition with shared error variable.

Using the same error variable across goroutines can lead to race conditions.

 	r.wg.Add(1)
 	go func() {
 		defer r.wg.Done()
 		for job := range r.failedJobChan {
-			if err = r.config.FailedJobsQuery().Create(&job); err != nil {
+			if createErr := r.config.FailedJobsQuery().Create(&job); createErr != nil {
-				LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(err))
+				LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(createErr))
 			}
 		}
 	}()

Likely invalid or redundant comment.


92-96: ⚠️ Potential issue

Fix potential deadlock in shutdown sequence.

The current shutdown sequence might lead to deadlocks or lost jobs.

 func (r *Worker) Shutdown() error {
 	r.isShutdown.Store(true)
+	r.wg.Wait() // Wait for all workers to finish
 	close(r.failedJobChan)
 	return nil
 }

Likely invalid or redundant comment.

go.mod (1)

3-3: Verify Go version compatibility.

The Go version specified (1.22.7) seems incorrect as it's beyond the current latest stable version.

hwbrzzl
hwbrzzl previously approved these changes Feb 6, 2025
Copy link
Contributor

@hwbrzzl hwbrzzl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing PR, we can merge it first, I'll make a deep test when I have a chance. 👍

@devhaozi devhaozi merged commit b777507 into master Feb 7, 2025
14 of 15 checks passed
@devhaozi devhaozi deleted the haozi/queue1 branch February 7, 2025 08:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

✨ [Feature] Refactor the queue facades

3 participants