Skip to content

Production-grade distributed task queue in Go with priority queues, job scheduling, worker pools, WebSocket, and REST API

License

Notifications You must be signed in to change notification settings

AndyBodnar/GoQueue

Repository files navigation

GoQueue

CI Go Report Card GoDoc License: MIT

A production-grade distributed task queue written in Go. GoQueue provides reliable job processing with priority queues, delayed jobs, automatic retries with exponential backoff, dead letter queues, and real-time monitoring.

Features

  • Distributed Processing: Scale horizontally with multiple workers across machines
  • Priority Queues: High, medium, and low priority levels for job scheduling
  • Delayed/Scheduled Jobs: Schedule jobs for future execution
  • Automatic Retries: Exponential backoff with configurable retry limits
  • Dead Letter Queue: Failed jobs are preserved for inspection and manual retry
  • Real-time Updates: WebSocket support for live job status monitoring
  • Persistence: BoltDB for reliable job storage (in-memory option for testing)
  • Prometheus Metrics: Built-in metrics endpoint for monitoring
  • REST API: Full-featured API for job management
  • Client SDK: Go client library for easy integration
  • CLI Tool: Command-line interface for queue management
  • Graceful Shutdown: Clean worker shutdown without losing jobs
  • Docker Ready: Multi-stage Dockerfile with optimized images

Architecture

                                    +------------------+
                                    |    Prometheus    |
                                    |    (Metrics)     |
                                    +--------+---------+
                                             |
                                             | scrape
                                             v
+-------------+     +-----------------+     +------------------+
|   Client    |---->|   REST API      |---->|                  |
|   (SDK)     |     |   + WebSocket   |     |   Queue Server   |
+-------------+     +-----------------+     |                  |
                             |              |  - Job Manager   |
                             |              |  - Scheduler     |
                             v              |  - Event Hub     |
                    +-----------------+     +--------+---------+
                    |   Web Dashboard |              |
                    |   (Optional)    |              |
                    +-----------------+              |
                                                    |
                         +-------------+------------+------------+
                         |             |            |            |
                         v             v            v            v
                    +--------+    +--------+   +--------+   +--------+
                    |Worker 1|    |Worker 2|   |Worker 3|   |Worker N|
                    +---+----+    +---+----+   +---+----+   +---+----+
                        |             |            |            |
                        +-------------+------------+------------+
                                      |
                                      v
                              +---------------+
                              |   BoltDB      |
                              |  (Storage)    |
                              +---------------+

Quick Start

Installation

# Clone the repository
git clone https://github.com/goqueue/goqueue.git
cd goqueue

# Build all binaries
make build

# Or install globally
make install

Using Docker

# Start server and worker
docker compose -f docker/docker-compose.yml up -d

# View logs
docker compose -f docker/docker-compose.yml logs -f

# Stop services
docker compose -f docker/docker-compose.yml down

Running Locally

# Start the server
./bin/goqueue-server --log-format=console

# In another terminal, start workers
./bin/goqueue-worker --concurrency=4 --log-format=console

Usage

REST API

Create a Job

curl -X POST http://localhost:8080/api/v1/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "type": "email.send",
    "queue": "emails",
    "priority": "high",
    "payload": {
      "to": "user@example.com",
      "subject": "Welcome!",
      "body": "Hello from GoQueue"
    },
    "max_retries": 5,
    "timeout": "5m"
  }'

Get Job Status

curl http://localhost:8080/api/v1/jobs/{job_id}

List Jobs

curl "http://localhost:8080/api/v1/jobs?queue=emails&status=pending&limit=50"

Get Queue Statistics

curl http://localhost:8080/api/v1/stats?queue=emails

Go Client SDK

package main

import (
    "context"
    "log"

    "github.com/goqueue/goqueue/pkg/client"
)

func main() {
    // Create client
    c := client.New("http://localhost:8080")

    // Enqueue a job
    job, err := c.Enqueue(context.Background(), "email.send",
        map[string]interface{}{
            "to": "user@example.com",
            "subject": "Hello!",
        },
        &client.EnqueueOptions{
            Queue:      "emails",
            Priority:   "high",
            MaxRetries: 5,
        },
    )
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Created job: %s", job.ID)

    // Get queue stats
    stats, _ := c.GetStats(context.Background(), "emails")
    log.Printf("Pending: %d, Completed: %d", stats.Pending, stats.Completed)
}

Creating Workers

package main

import (
    "context"
    "encoding/json"
    "log"

    "github.com/goqueue/goqueue/pkg/queue"
    "github.com/goqueue/goqueue/pkg/storage"
    "github.com/goqueue/goqueue/pkg/worker"
)

type EmailPayload struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}

func main() {
    // Initialize storage
    store, _ := storage.NewBoltStorage(&storage.BoltOptions{
        Path: "goqueue.db",
    })
    defer store.Close()

    // Initialize queue
    q := queue.NewQueue(store)

    // Create worker pool
    pool := worker.NewPool(q, &worker.PoolConfig{
        Workers:      4,
        Queues:       []string{"emails", "default"},
        PollInterval: time.Second,
    })

    // Register job handlers
    pool.RegisterHandler("email.send", func(ctx context.Context, job *queue.Job) (interface{}, error) {
        var payload EmailPayload
        json.Unmarshal(job.Payload, &payload)

        // Send email...
        log.Printf("Sending email to %s: %s", payload.To, payload.Subject)

        return map[string]string{"status": "sent"}, nil
    })

    // Start processing
    pool.Start(context.Background())

    // Wait for shutdown signal...
}

CLI Tool

# Enqueue a job
goqueue enqueue email.send --queue=emails --priority=high \
  --payload='{"to":"user@example.com","subject":"Hello"}'

# List jobs
goqueue list --queue=emails --status=pending --limit=10

# Get job details
goqueue get {job_id}

# Cancel a job
goqueue cancel {job_id}

# View queue statistics
goqueue stats --queue=emails

# Manage dead letter queue
goqueue dead list
goqueue dead retry {job_id}
goqueue dead purge

# Health check
goqueue health

Configuration

GoQueue can be configured via config file, environment variables, or command-line flags.

Config File (config.yaml)

server:
  address: ":8080"
  read_timeout: 30s
  write_timeout: 30s
  enable_websocket: true

queue:
  default_queue: "default"
  max_retries: 3
  retry_delay: 5s
  job_timeout: 5m

worker:
  concurrency: 4
  queues:
    - "critical"
    - "default"
    - "low"
  poll_interval: 1s

storage:
  type: "boltdb"
  path: "goqueue.db"

logging:
  level: "info"
  format: "json"

metrics:
  enabled: true

Environment Variables

All configuration options can be set via environment variables with the GOQUEUE_ prefix:

export GOQUEUE_SERVER_ADDRESS=":8080"
export GOQUEUE_WORKER_CONCURRENCY=8
export GOQUEUE_STORAGE_PATH="/data/goqueue.db"
export GOQUEUE_LOGGING_LEVEL="debug"

API Reference

Jobs

Method Endpoint Description
POST /api/v1/jobs Create a new job
GET /api/v1/jobs List jobs with filters
GET /api/v1/jobs/:id Get job details
POST /api/v1/jobs/:id/cancel Cancel a job
DELETE /api/v1/jobs/:id Delete a job
POST /api/v1/jobs/batch Create multiple jobs
POST /api/v1/jobs/bulk Bulk actions (cancel, delete, retry)

Statistics

Method Endpoint Description
GET /api/v1/stats Get queue statistics

Dead Letter Queue

Method Endpoint Description
GET /api/v1/dead List dead jobs
POST /api/v1/dead/:id/retry Retry a dead job
DELETE /api/v1/dead Purge dead jobs

Health & Metrics

Method Endpoint Description
GET /health Health check
GET /ready Readiness check
GET /metrics Prometheus metrics
GET /api/v1/ws WebSocket connection

Monitoring

Prometheus Metrics

GoQueue exposes the following metrics at /metrics:

  • goqueue_jobs_enqueued_total - Total jobs enqueued
  • goqueue_jobs_processed_total - Total jobs processed successfully
  • goqueue_jobs_failed_total - Total jobs failed
  • goqueue_jobs_dead_total - Total jobs moved to dead letter queue
  • goqueue_job_duration_seconds - Job processing duration histogram
  • goqueue_queue_depth - Current queue depth by priority
  • goqueue_workers_active - Number of active workers
  • goqueue_websocket_clients - Number of WebSocket clients

Grafana Dashboard

A sample Grafana dashboard is available in docker/grafana-dashboard.json.

Development

Prerequisites

  • Go 1.21+
  • Make
  • Docker (optional)

Building

# Build all binaries
make build

# Run tests
make test

# Run tests with coverage
make test-coverage

# Run linter
make lint

# Format code
make fmt

Project Structure

goqueue/
├── cmd/
│   ├── server/     # Queue server binary
│   ├── worker/     # Worker binary
│   └── cli/        # CLI tool
├── pkg/
│   ├── queue/      # Core queue logic
│   ├── storage/    # Storage backends
│   ├── worker/     # Worker pool
│   ├── api/        # REST API handlers
│   └── client/     # Go client SDK
├── internal/
│   ├── config/     # Configuration
│   ├── logger/     # Logging
│   └── metrics/    # Prometheus metrics
├── examples/       # Example usage
├── docker/         # Docker configuration
└── tests/          # Integration tests

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

Please ensure:

  • All tests pass (make test)
  • Code is formatted (make fmt)
  • Linter passes (make lint)
  • New features have tests

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

  • BoltDB - Embedded key/value database
  • Gin - HTTP web framework
  • Zerolog - Zero allocation JSON logger
  • Cobra - CLI library
  • Viper - Configuration library

About

Production-grade distributed task queue in Go with priority queues, job scheduling, worker pools, WebSocket, and REST API

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published