A production-grade distributed task queue written in Go. GoQueue provides reliable job processing with priority queues, delayed jobs, automatic retries with exponential backoff, dead letter queues, and real-time monitoring.
- Distributed Processing: Scale horizontally with multiple workers across machines
- Priority Queues: High, medium, and low priority levels for job scheduling
- Delayed/Scheduled Jobs: Schedule jobs for future execution
- Automatic Retries: Exponential backoff with configurable retry limits
- Dead Letter Queue: Failed jobs are preserved for inspection and manual retry
- Real-time Updates: WebSocket support for live job status monitoring
- Persistence: BoltDB for reliable job storage (in-memory option for testing)
- Prometheus Metrics: Built-in metrics endpoint for monitoring
- REST API: Full-featured API for job management
- Client SDK: Go client library for easy integration
- CLI Tool: Command-line interface for queue management
- Graceful Shutdown: Clean worker shutdown without losing jobs
- Docker Ready: Multi-stage Dockerfile with optimized images
+------------------+
| Prometheus |
| (Metrics) |
+--------+---------+
|
| scrape
v
+-------------+ +-----------------+ +------------------+
| Client |---->| REST API |---->| |
| (SDK) | | + WebSocket | | Queue Server |
+-------------+ +-----------------+ | |
| | - Job Manager |
| | - Scheduler |
v | - Event Hub |
+-----------------+ +--------+---------+
| Web Dashboard | |
| (Optional) | |
+-----------------+ |
|
+-------------+------------+------------+
| | | |
v v v v
+--------+ +--------+ +--------+ +--------+
|Worker 1| |Worker 2| |Worker 3| |Worker N|
+---+----+ +---+----+ +---+----+ +---+----+
| | | |
+-------------+------------+------------+
|
v
+---------------+
| BoltDB |
| (Storage) |
+---------------+
# Clone the repository
git clone https://github.com/goqueue/goqueue.git
cd goqueue
# Build all binaries
make build
# Or install globally
make install# Start server and worker
docker compose -f docker/docker-compose.yml up -d
# View logs
docker compose -f docker/docker-compose.yml logs -f
# Stop services
docker compose -f docker/docker-compose.yml down# Start the server
./bin/goqueue-server --log-format=console
# In another terminal, start workers
./bin/goqueue-worker --concurrency=4 --log-format=consolecurl -X POST http://localhost:8080/api/v1/jobs \
-H "Content-Type: application/json" \
-d '{
"type": "email.send",
"queue": "emails",
"priority": "high",
"payload": {
"to": "user@example.com",
"subject": "Welcome!",
"body": "Hello from GoQueue"
},
"max_retries": 5,
"timeout": "5m"
}'curl http://localhost:8080/api/v1/jobs/{job_id}curl "http://localhost:8080/api/v1/jobs?queue=emails&status=pending&limit=50"curl http://localhost:8080/api/v1/stats?queue=emailspackage main
import (
"context"
"log"
"github.com/goqueue/goqueue/pkg/client"
)
func main() {
// Create client
c := client.New("http://localhost:8080")
// Enqueue a job
job, err := c.Enqueue(context.Background(), "email.send",
map[string]interface{}{
"to": "user@example.com",
"subject": "Hello!",
},
&client.EnqueueOptions{
Queue: "emails",
Priority: "high",
MaxRetries: 5,
},
)
if err != nil {
log.Fatal(err)
}
log.Printf("Created job: %s", job.ID)
// Get queue stats
stats, _ := c.GetStats(context.Background(), "emails")
log.Printf("Pending: %d, Completed: %d", stats.Pending, stats.Completed)
}package main
import (
"context"
"encoding/json"
"log"
"github.com/goqueue/goqueue/pkg/queue"
"github.com/goqueue/goqueue/pkg/storage"
"github.com/goqueue/goqueue/pkg/worker"
)
type EmailPayload struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
func main() {
// Initialize storage
store, _ := storage.NewBoltStorage(&storage.BoltOptions{
Path: "goqueue.db",
})
defer store.Close()
// Initialize queue
q := queue.NewQueue(store)
// Create worker pool
pool := worker.NewPool(q, &worker.PoolConfig{
Workers: 4,
Queues: []string{"emails", "default"},
PollInterval: time.Second,
})
// Register job handlers
pool.RegisterHandler("email.send", func(ctx context.Context, job *queue.Job) (interface{}, error) {
var payload EmailPayload
json.Unmarshal(job.Payload, &payload)
// Send email...
log.Printf("Sending email to %s: %s", payload.To, payload.Subject)
return map[string]string{"status": "sent"}, nil
})
// Start processing
pool.Start(context.Background())
// Wait for shutdown signal...
}# Enqueue a job
goqueue enqueue email.send --queue=emails --priority=high \
--payload='{"to":"user@example.com","subject":"Hello"}'
# List jobs
goqueue list --queue=emails --status=pending --limit=10
# Get job details
goqueue get {job_id}
# Cancel a job
goqueue cancel {job_id}
# View queue statistics
goqueue stats --queue=emails
# Manage dead letter queue
goqueue dead list
goqueue dead retry {job_id}
goqueue dead purge
# Health check
goqueue healthGoQueue can be configured via config file, environment variables, or command-line flags.
server:
address: ":8080"
read_timeout: 30s
write_timeout: 30s
enable_websocket: true
queue:
default_queue: "default"
max_retries: 3
retry_delay: 5s
job_timeout: 5m
worker:
concurrency: 4
queues:
- "critical"
- "default"
- "low"
poll_interval: 1s
storage:
type: "boltdb"
path: "goqueue.db"
logging:
level: "info"
format: "json"
metrics:
enabled: trueAll configuration options can be set via environment variables with the GOQUEUE_ prefix:
export GOQUEUE_SERVER_ADDRESS=":8080"
export GOQUEUE_WORKER_CONCURRENCY=8
export GOQUEUE_STORAGE_PATH="/data/goqueue.db"
export GOQUEUE_LOGGING_LEVEL="debug"| Method | Endpoint | Description |
|---|---|---|
| POST | /api/v1/jobs | Create a new job |
| GET | /api/v1/jobs | List jobs with filters |
| GET | /api/v1/jobs/:id | Get job details |
| POST | /api/v1/jobs/:id/cancel | Cancel a job |
| DELETE | /api/v1/jobs/:id | Delete a job |
| POST | /api/v1/jobs/batch | Create multiple jobs |
| POST | /api/v1/jobs/bulk | Bulk actions (cancel, delete, retry) |
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/stats | Get queue statistics |
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/v1/dead | List dead jobs |
| POST | /api/v1/dead/:id/retry | Retry a dead job |
| DELETE | /api/v1/dead | Purge dead jobs |
| Method | Endpoint | Description |
|---|---|---|
| GET | /health | Health check |
| GET | /ready | Readiness check |
| GET | /metrics | Prometheus metrics |
| GET | /api/v1/ws | WebSocket connection |
GoQueue exposes the following metrics at /metrics:
goqueue_jobs_enqueued_total- Total jobs enqueuedgoqueue_jobs_processed_total- Total jobs processed successfullygoqueue_jobs_failed_total- Total jobs failedgoqueue_jobs_dead_total- Total jobs moved to dead letter queuegoqueue_job_duration_seconds- Job processing duration histogramgoqueue_queue_depth- Current queue depth by prioritygoqueue_workers_active- Number of active workersgoqueue_websocket_clients- Number of WebSocket clients
A sample Grafana dashboard is available in docker/grafana-dashboard.json.
- Go 1.21+
- Make
- Docker (optional)
# Build all binaries
make build
# Run tests
make test
# Run tests with coverage
make test-coverage
# Run linter
make lint
# Format code
make fmtgoqueue/
├── cmd/
│ ├── server/ # Queue server binary
│ ├── worker/ # Worker binary
│ └── cli/ # CLI tool
├── pkg/
│ ├── queue/ # Core queue logic
│ ├── storage/ # Storage backends
│ ├── worker/ # Worker pool
│ ├── api/ # REST API handlers
│ └── client/ # Go client SDK
├── internal/
│ ├── config/ # Configuration
│ ├── logger/ # Logging
│ └── metrics/ # Prometheus metrics
├── examples/ # Example usage
├── docker/ # Docker configuration
└── tests/ # Integration tests
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Please ensure:
- All tests pass (
make test) - Code is formatted (
make fmt) - Linter passes (
make lint) - New features have tests
This project is licensed under the MIT License - see the LICENSE file for details.