sqliteq

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2025 License: MIT Imports: 7 Imported by: 0

README ¶

SQLiteQ: A SQLite-Based Persistent Queue for Go

Go Reference Go Report Card Go Version CI codecov

SQLiteQ is a persistent queue implementation in Go using SQLite as the storage backend. It provides efficient enqueue and dequeue operations and maintains persistence across application restarts.

Features

  • Efficient enqueue and dequeue operations
  • Persistence via SQLite storage
  • Support for acknowledgment-based processing
  • Simple and clean API following the Queue interface
  • Built as a persistence adapter for varmq

Installation

go get github.com/goptics/sqliteq

Usage

package main

import (
    "fmt"
    "log"

    "github.com/goptics/sqliteq"
)

func main() {
    // Create a new SQLite queues manager
    // The parameter is the path to the SQLite database file
    queuesManager := sqliteq.New("queue.db")
    defer queuesManager.Close()

    // Create a new queue
    // The parameter is the name of the table to use for the queue
    queue, err := queuesManager.NewQueue("my_queue")
    if err != nil {
        log.Fatalf("Failed to create queue: %v", err)
    }

    // You can also create a queue with custom options
    // For example, to keep acknowledged items in the database:
    queueWithOptions, err := queuesManager.NewQueue("my_other_queue",
        sqliteq.WithRemoveOnComplete(false)) // Set to false to keep acknowledged items
    if err != nil {
        log.Fatalf("Failed to create queue: %v", err)
    }

    // Create a priority queue
    priorityQueue, err := queuesManager.NewPriorityQueue("my_priority_queue")
    if err != nil {
        log.Fatalf("Failed to create priority queue: %v", err)
    }

    // Enqueue items
    queue.Enqueue([]byte("item 1"))
    queue.Enqueue([]byte("item 2"))

    // Get queue length
    fmt.Printf("Queue length: %d\n", queue.Len())

    // Get all pending items
    items := queue.Values()
    fmt.Printf("All items: %v\n", items)

    // Simple dequeue
    item, success := queue.Dequeue()
    if success {
        fmt.Printf("Dequeued item: %v\n", string(item.([]byte)))
    }

    // Dequeue with acknowledgment
    item, success, ackID := queue.DequeueWithAckId()
    if success {
        fmt.Printf("Dequeued item: %v with ack ID: %s\n", string(item.([]byte)), ackID)

        // Process the item...

        // Acknowledge the item after processing
        acknowledged := queue.Acknowledge(ackID)
        fmt.Printf("Item acknowledged: %v\n", acknowledged)

        // Note: By default, acknowledged items are removed from the database
        // With WithRemoveOnComplete(false), they would be marked as completed instead
    }

    // Purge the queue
    queue.Purge()
}

How It Works

SQLiteQ uses a SQLite database to store queue items with the following schema:

  • id: Unique identifier for each item (autoincrement primary key)
  • data: The serialized item data (stored as a JSON blob)
  • status: The status of the item ("pending", "processing", or "completed")
  • ack_id: A unique ID for acknowledging processed items
  • created_at: When the item was added to the queue
  • updated_at: When the item was last updated

NOTE: By default, when an item is acknowledged, it is removed from the database. However, you can configure the queue to keep acknowledged items by using the WithRemoveOnComplete(false) option when creating the queue. In this case, acknowledged items will be marked as "completed" but will remain in the database.

Performance Considerations

  • The queue is optimized for efficient enqueue and dequeue operations that scale well with queue size
  • Operations leverage SQLite's indexing for logarithmic time complexity rather than true constant-time
  • SQLite's WAL (Write-Ahead Logging) mode is enabled for better concurrent access
  • Proper indexing is set up on the status and creation time columns for efficient querying

👤 Author

Documentation ¶

Index ¶

Constants ¶

This section is empty.

Variables ¶

This section is empty.

Functions ¶

This section is empty.

Types ¶

type Option ¶

type Option func(*Queue)

Option is a function type that can be used to configure a Queue

func WithRemoveOnComplete ¶

func WithRemoveOnComplete(remove bool) Option

WithRemoveOnComplete sets whether acknowledged items should be deleted from the database when true, or just marked as completed when false

type PriorityQueue ¶

type PriorityQueue struct {
	*Queue
}

PriorityQueue extends Queue with priority-based dequeuing

func (*PriorityQueue) Dequeue ¶

func (pq *PriorityQueue) Dequeue() (any, bool)

Dequeue overrides the base Dequeue method to use priority-based dequeuing

func (*PriorityQueue) DequeueWithAckId ¶

func (pq *PriorityQueue) DequeueWithAckId() (any, bool, string)

DequeueWithAckId overrides the base DequeueWithAckId method to use priority-based dequeuing

func (*PriorityQueue) Enqueue ¶

func (pq *PriorityQueue) Enqueue(item any, priority int) bool

Enqueue adds an item to the queue with a specified priority Lower priority numbers will be dequeued first (0 is highest priority) Returns true if the operation was successful

type Queue ¶

type Queue struct {
	// contains filtered or unexported fields
}

Queue implements the Queue interface using SQLite as the storage backend

func (*Queue) Acknowledge ¶

func (q *Queue) Acknowledge(ackID string) bool

Acknowledge marks an item as completed Returns true if the item was successfully acknowledged, false otherwise

func (*Queue) Close ¶

func (q *Queue) Close() error

Close closes the queue and its database connection

func (*Queue) Dequeue ¶

func (q *Queue) Dequeue() (any, bool)

Dequeue removes and returns the next item from the queue Returns the item and a boolean indicating if the operation was successful

func (*Queue) DequeueWithAckId ¶

func (q *Queue) DequeueWithAckId() (any, bool, string)

DequeueWithAckId removes and returns the next item from the queue with an acknowledgment ID Returns the item, a boolean indicating if the operation was successful, and the acknowledgment ID

func (*Queue) Enqueue ¶

func (q *Queue) Enqueue(item any) bool

Enqueue adds an item to the queue It serializes the item to JSON and stores it in the database Returns true if the operation was successful

func (*Queue) Len ¶

func (q *Queue) Len() int

Len returns the number of pending items in the queue

func (*Queue) Purge ¶

func (q *Queue) Purge()

Purge removes all items from the queue

func (*Queue) RequeueNoAckRows ¶ added in v0.2.1

func (q *Queue) RequeueNoAckRows()

func (*Queue) Values ¶

func (q *Queue) Values() []any

Values returns all pending items in the queue

type Queues ¶

type Queues interface {
	NewQueue(queueKey string, opts ...Option) (*Queue, error)
	NewPriorityQueue(queueKey string, opts ...Option) (*PriorityQueue, error)
	Close() error
}

func New ¶

func New(dbPath string) Queues

Directories ¶

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL