Redis delayed tasks with go

Use Redis as a queue of delayed task

If you work on a distributed system it is a high probability that you need some container, queue for sharing info/tasks between components of your system, or even instances of the same component. Let’s assume that you need not only queueing but delaying as well.

Probably the first that comes to your mind is Amazon Simple Queue Service (SQS). SQS eliminates the complexity and overhead associated with managing and operating message oriented middleware, and empowers developers to focus on differentiating work. But if you already use Redis and have some expertise in it you can consider other option: delayed tasks with Redis.

Delayed task

The main idea is to use ZSET, sorted set. Use time when the an item should be executed as a score, the key. When you enqueueing - you add item to ZSET with a score equal to the delayed time. When you dequeueing you check if there is any available item and score of item is less then now.

Another thing that you need is the distributed locks. It is also possible with Redis(description and algorithm).

Sequence diagram

Go implementation

To implement it with I go need go redis client and go redis distributed lock.

The struct needs redis client, lock, key for ZSET, batch - how many item we want dequeue as a maximum at once, and ttl - when lock will be automatically released:

type RedisQueue struct {
	client *redis.ClusterClient
	locker *redislock.Client
	key    string
	batch  int
	ttl    time.Duration

func NewQueue(client *redis.ClusterClient, locker *redislock.Client, key string, batch int, ttl time.Duration) Queue {
	return &RedisQueue{client: client, locker: locker, key: key, batch: batch, ttl: ttl}


func (rq *RedisQueue) Enqueue(uuid string, delay time.Duration) {
	_ = rq.client.ZAdd(rq.key, &redis.Z{Member: uuid, Score: float64(time.Now().Add(delay).Unix())})


func (rq *RedisQueue) Dequeue() ([]Message, error) {
	var ms []Message
	start := int64(0)
	for i := rq.batch; i >= 0; {
		vals, err := rq.client.ZRangeWithScores(rq.key, start, start).Result()
		if err != nil {
			return nil, errors.Wrap(err, "cannot get range from zset")
		if len(vals) == 0 || vals[0].Score > float64(time.Now().Unix()) {

		id := vals[0].Member.(string)
		lock := rq.acquireLock(id)
		if lock == nil {
		ms = append(ms, Message{Message: id, OnProcessed: func() {
			_ = rq.client.ZRem(rq.key, id)
			if err := lock.Release(); err != nil {
				fmt.Printf("release lock erros = %+v\n", err)


	return ms, nil


You can test it with docker:

version: "2.1"
    image: golang:1.12
    working_dir: /go/src/
      - $PWD:/go/src/
      - go-modules:/go/pkg/mod # Put modules cache into a separate volume
      - testredis
    command: ["/bin/sh", "-c", "GO111MODULE=on go test -v -timeout 30s"]

    image: grokzen/redis-cluster:latest
      driver: "none"

  go-modules: # Define the volume

Just run from terminal:

make test-redis-queue 

Check full code on github