Blogpost

12 minute read

Handling jobs with unpredictable execution time using RabbitMQ

RabbitMQ is a great queueing server but handling tasks with an unpredictable execution time poses a few challenges.

Preface

This article assumes you already know how to setup and configure RabbitMQ: you know what the terms AMQP, producer, consumer, exchange, queue and channel mean. You should also know that messages are posted to an exchange which will then route them to the correct queue based on the type of exchange and/or routing key.

Processing simple jobs

The basic RabbitMQ message consumption pattern is as follows:

  1. your consumer receives a message from the queue it is subscribed to
  2. it processes the message: this is of course a custom implementation for your application
  3. depending on the outcome of step 2, it will ack, nack or reject the message.

Step 3 is important here: notify the RabbitMQ server after you have handled the message! This is the only way you can correctly handle the message because the outcome of handling the message matters: should it be requeued or not. You are after all using a queuing mechanism for a reason: to offload longer running tasks from the main process and to introduce retries when things go wrong.

Note that RabbitMQ has an automatic acknowledgement mode whereby the server considers the message as acknowledged immediately after successful delivery to the consumer. If you are using this feature, then the topic of this article is of no use whatsoever in such a setup.

RabbitMQ timeouts

Timeouts are a matter of course in computing. We don’t want things to be running forever, hogging resources and slowly grinding our system to a halt. In a queueing system, you don’t want to wait forever before a consumer accepts a message: you want to deliver it as fast as you can and you also don’t want to wait too long for the consumer to be finished with it. You want to get all of your messages dealt with ASAP.

Therefore, if a consumer is too slow you just redeliver it to one that does play nice and discard the obnoxious one, so you don’t have to deal with it any more (if only we could all to that sometimes).

For this reason, RabbitMQ has several timeouts allowing it to detect stale consumers, wonky connections, slow consumers and many more.

Delivery acknowledgement timeout

The most important timeout we will be dealing with in this article is the delivery acknowledgement timeout. In a default RabbitMQ setup this is set to 30 minutes. It is by far the longest default timeout I have ever encountered. It is probably a good default average to cover a wide range of jobs being processed. Remember that we’re using a queueing mechanism to offload long running jobs from the main process.

Long running in computing can obviously mean lots of things. Since we’re usually working with nanoseconds, a single second will in some cases be considered extremely slow. Think of an SQL query for example: one second is immediately showing you have a potential problem! So in computing terms we’re dealing with really very long jobs in this article.

Important to know is that we do not recommend changing this timeout at all!

Unpredictability

As the title of this article suggests, we are dealing with jobs that have an unpredictable execution time: some of them are dealt with in a matter of seconds, others can take up to over an hour! The reason for this is that in our use case, we are ensuring files are stored redundantly and some of these files can be several hundreds of gigabytes in size. Add a completely unpredictable network throughput to the mix (yes, we do have throughput guarantees, but you just never know what’s currently using how much bandwidth and how long it will last), and you will understand that there is no way of knowing how long it will take to process any given job. Not even when you know the filesize beforehand.

If you look back at the simple job processing example above keeping the delivery acknowledgement timeout in mind as well, you should immediately realise the serious problem we are facing:

  1. it is simply impossible to set a sane delivery acknowledgement timeout that will cover all use cases
  2. not changing the timeout will cause RabbitMQ to consider your consumer to (be) s(t)uck and close its channel to be rid of it while delivering the message to another consumer which will soon face the same doom.

I just cannot help but remember a good old Simsons quote right now:

Lisa: Oh my God! I’m losing my perspicacity!

Homer: Well it’s always in the last place you look.

And I must say, Homer Simpson was very right. As usual.

A solution to the problem

Let us turn to a practical way of tackling this problem in Go, handling all timeouts we will encounter with proper reconnection strategies and a clean exit on shutdown or simply bailing out should every attempt to play nice fail miserably.

We will be using the rabbitmq/amqp091-go package which is a fork from the excellent, albeit no longer maintained, streadway/amqp package.

We will be subscribing to a queue to receive messages, and we will open a separate channel to publish response messages so that our main process can receive status updates about the jobs we have handled.

Our setup is a distributed model where multiple nodes process jobs dispatched by a centralised (redundantly setup) process. The dispatched messages are in JSON format and are processed by a single Go binary that handles a single message. If we want more consumers, we run an additional binary.

Consumption strategy

Let us begin by deciding on the consumption strategy which will determine how we need to handle the (re)connections to the RabbitMQ server.

type Message struct {
   // Map your JSON data to fields here.
   Id string `json:"id"`
}

func handleMessage(msg *amqp091.Delivery, pubChan *amqp091.Channel) {
    log.Printf("Received new message: %d - %s", msg.DeliveryTag, msg.Body)
    data := Message{}

    err := json.Unmarshal(msg.Body, &data)
    if err != nil {
        log.Printf("Invalid JSON: %s", err)
        rejectMessage(msg)
        return
    }

    log.Println("New message received:")
    log.Printf(" %+v", data) // Adjust the logging to your needs!

    // Notify the main process we have started so it can keep track.
    updateStatus(data.id, "started", pubChan)

    // Your custom logic goes here.
    s := "finished"
    if someCondition {
        s = "failed"
    }

    // Notify the main process of the outcome.
    updateStatus(data.id, s, pubChan)

    ackMessage(msg)
}

func ackMessage(m *amqp091.Delivery) {
    if err := m.Ack(false); err != nil {
        log.Printf("Error acknowledging message - %d: %s", m.DeliveryTag, err)
        return
    }
    log.Printf("Acknowledged message - %d", m.DeliveryTag)
}

func rejectMessage(m *amqp091.Delivery) {
    if err := m.Reject(false); err != nil {
        log.Printf("Error rejecting message - %d: %s", m.DeliveryTag, err)
        return
    }
    log.Printf("Rejected message - %d", m.DeliveryTag)
}

So this is the simple job processing pattern as described at the very start of this article. We know that this is insufficient because we could be processing this job for a long time causing our channel to the RabbitMQ server to be closed before we can acknowledge the message.

Preventing this timeout is simple, just acknowledge the message sooner:

func handleMessage(msg *amqp091.Delivery, pubChan *amqp091.Channel) {
    log.Printf("Received new message: %d - %s", msg.DeliveryTag, msg.Body)
    data := Message{}

    err := json.Unmarshal(msg.Body, &data)
    if err != nil {
        log.Printf("Invalid JSON: %s", err)
        rejectMessage(msg)
        return
    }

    log.Println("New message received:")
    log.Printf(" %+v", data) // Adjust the logging to your needs!

    // Notify the main process we have started so it can keep track.
    updateStatus(data.id, "started", pubChan)

    // Acknowledge the message as soon as we have managed to notify the main
    // process that we have started.
    ackMessage(msg)

    // Your custom logic goes here.
    s := "finished"
    if someCondition {
        s = "failed"
    }

    // Notify the main process of the outcome.
    updateStatus(data.id, s, pubChan)
}

In this version of the function, we acknowledge the message as soon as we have notified our main process that we have started. But this will bring up another problem: when we acknowledge a message, the RabbitMQ server knows we are ready to accept another one which is not true because we are still processing the one we are dealing with even though we acknowledged it.

It would be easy to say: ‘so process the job in a go routine independently of this function and just spawn go routines for each new message coming in’. But we will be receiving hundreds of messages, and we cannot start an unlimited amount of go routines so at some point we will have to face this problem so better to deal with it now.

If we do not pick up the new message being offered to us by the RabbitMQ server we will be punished and our channel will be closed, so we will have to monitor the channel and reopen it when it gets closed. Let’s make note of that for later.

Let’s think further: is there anything else that can bite us later? The really crucial part in this article is the unpredictable execution time of the jobs we are dealing with, so a lot can go wrong while we’re working. Particularly annoying is connection loss to the RabbitMQ server. So we’ll need to monitor that as well and reconnect to the server when we lose the connection. Another one for later.

Think even further, beyond the whole process running in a day-to-day setup… Nothing further? Well, what if we need to update our clients because we have a bug or want to introduce new features? We need to introduce a clean shutdown mechanism that will ensure we do not lose messages. We could wait until all jobs have been processed before shutting down. But will we wait over an hour for a job that’s only been running for a few seconds? And what if we have a really serious bug that must be solved now in a distributed system potentially running hundreds of nodes? It could take days for all nodes to be up-to-date which can wreak havoc depending on what we’re dealing with.

Because we are acknowledging the message before we have processed it, we would lose it completely if we perform a shutdown. We were stumped by this for a few minutes until a colleague simply said: ‘well, then just requeue the message before shutting down’. :facepalm: (This is also the moment I realised Homer Simpson was right.) Let’s introduce just that:

func handleMessage(msg *amqp091.Delivery, pubChan *amqp091.Channel, pending chan []byte) {
    log.Printf("Received new message: %d - %s", msg.DeliveryTag, msg.Body)
    data := Message{}

    err := json.Unmarshal(msg.Body, &data)
    if err != nil {
        log.Printf("Invalid JSON: %s", err)
        rejectMessage(msg)
        return
    }

    // Store the body of this message on the internal pending queue.
    pending <- msg.Body
    defer dropPending(pending)

    log.Println("New message received:")
    log.Printf(" %+v", data) // Adjust the logging to your needs!

    // Notify the main process we have started so it can keep track.
    updateStatus(data.id, "started", pubChan)

    // Acknowledge the message as soon as we have managed to notify the main
    // process that we have started.
    ackMessage(msg)

    // Your custom logic goes here.
    s := "finished"
    if someCondition {
        s = "failed"
    }

    // Notify the main process of the outcome.
    updateStatus(data.id, s, pubChan)
}

func dropPending(pending <-chan []byte) {
    <-pending
}

Notice that we added a new parameter pending to the function which is an internal channel with a size of one (1) used to store the current message we’re processing. The defer dropPending() call ensures we remove the pending message when we exit the handleMessage() function.

Our shutdown procedure will then republish all messages on the pending channel and acknowledge that the pending channel is empty upon wich we will simply call an os.Exit(). We do not need cleanup inside our message handler, that will be done externally.

Important to know is that calling os.Exit() will kill go routines and no deferred functions they might have will be executed!

An interesting bit of code from the shutdown procedure is checking if we have messages on the pending channel. This function runs as a go routine:

func (c *RMQClient) handlePending() {
    <-c.shutdown
    // Check to see if we have a pending message. We're NOT closing the c.pending channel and use range to get all
    // messages since this COULD cause a panic if someone still wants to write to it.
    // The chances of this happening are extremely rare but still...
    select {
    case c.pending <- []byte{}: // Try to send to the channel.
        log.Println("No pending messages.")
    default:
        // The channel send above failed, so we have a pending message.
        msg := <-c.pending
        log.Printf("Requeuing pending message: %s", msg)
        if err := publishWithDeferredConfirm(c.subChannel, c.exchange, c.consumeQueue, msg); err != nil {
            log.Printf("Unable to requeue message: %s", err)
            // We do not keep retrying here. It is unfortunately not possible to
            // cover all bases here. The receiving end will handle jobs with a
            // 'stale' status.
        }
    }
    // Signal the main process we're done, so it can continue shutdown.
    close(c.shutdownAck)
}

Whe made notes of the following:

  1. watch our RabbitMQ channels and reopen them when they get closed down
  2. watch our RabbitMQ connection and reopen it when it closes down.

Let’s add one more thing to this list which you already saw popping up in the handlePending() function above: publisher confirms We want to be sure that when we publish a message it is received by the RabbitMQ server.

Step by step

First, we will need to make a connection to the RabbitMQ server for which we need a few user definable parameters: the connection URL, a heartbeat, an AMQP exchange and a queue we will be consuming from. To keep things together let’s make a client stucture to which we will also be adding our receiver functions:

package main

import (
    "time"

    "github.com/rabbitmq/amqp091-go"
)

type RMQClient struct {
    connectionUrl string
    heartbeat     time.Duration
    consumerTag   string

    exchange     string
    consumeQueue string

    conn               *amqp091.Connection
    notifyConnClose    chan *amqp091.Error
}

The consumer tag is also a vital piece of the puzzle: it must be unique so the RabbitMQ server will be able to identify each and every consumer separately allowing proper deliveries to each consumer. A good way to make them unique is to get the hostname, append the process ID you’re running under and add another human-readable value that allows you to easily trace which consumer it is. You would end up with something like node1.example.com-1983-replicate.

To stick with common Go practices, add a Dial receiver that will set up the connection:

func (c *RMQClient) Dial() error {
    if err := c.connect(); err != nil {
        return err
    }

    go c.reconnect()

    return nil
}

You can already see where we’re going with this: we’ll attempt to create a connection and when successful we’ll be starting a connection monitor routine that will attempt to reconnect us when the connection is lost.

Reconnecting on a lost connection is vital since we will be making use of Go’s multithreading capabilities allowing us to process jobs in a separate go routine that can keep crunching away even when the connection to RabbitMQ is lost.

The connection method is quite simple:

func (c *RMQClient) connect() error {
    var err error
    log.Printf("Creating new RabbitMQ connection with a heartbeat of %d seconds for consumer %s", int(c.heartbeat.Seconds()), c.consumerTag)
    c.conn, err = amqp091.DialConfig(
        c.connectionUrl,
        amqp091.Config{
            Heartbeat: c.heartbeat,
            Locale:    "en_US", // Default in the package, but unfortunately not exposed as constant :/
        },
    )
    if err != nil {
        return err
    }

    // This channel MUST be (re)declared here since it will be closed when a connection gets closed!
    c.notifyConnClose = make(chan *amqp091.Error)
    // Allow reconnecting on fail. WARNING: you MUST read from this channel to prevent deadlocks!
    c.conn.NotifyClose(c.notifyConnClose)

    return c.openChannels()
}

As you can see we’re registering a notification channel that will inform us when the connection is closed for whatever reason. When registering such a channel it is imperative that you consume its messages to prevent deadlocks!

When we have the connection, we have to open the necessary channels which will allow us to communicate with the RabbitMQ server. We’ll tackle those later. Let us have a look at the reconnection handler first:

func (c *RMQClient) reconnect() {
    log.Println("Starting connection monitor thread...")
    for {
        err := <-c.notifyConnClose
        log.Printf("Reconnecting after lost RabbitMQ connection: %s", err)

        ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour)
        for {
            select {
            case <-ctx.Done():
                if ctx.Err() == context.DeadlineExceeded {
                    log.Fatal("Timeout reached when trying to reconnect.")
                    // We do not wait for goroutines to finish and simply bail out! There is nothing else we can do
                    // without a RabbitMQ connection.
                }
            default:
                u, _ := url.Parse(c.connectionUrl)
                log.Printf("Trying to reconnect to RabbitMQ at %s", u.Host)
                if err := c.connect(); err == nil {
                    cancel()
                } else {
                    log.Printf("Error while trying to reconnect: '%s', waiting before retry", err)
                    time.Sleep(1 * time.Second)
                }
            }
        }
    }
}

As you saw earlier, this function runs in a go routine. As soon as a message is received on the c.notifyConnClose channel, we start another loop that will attempt to re-establish a connection for one full hour. If a new connection cannot be established after an hour, it will simply bail out hard. If you do need to clean up some stuff, you would obviously send a signal instead of the log.Fatal() call used here.

Notice that we’re calling the c.connect() function here which will make sure everything is set up as we need it to be: connection and channels.

Next up, the c.openChannels() function. But let’s handle this in the reverse way from how it will be called in code. Setting up a channel is done like this:

func (c *RMQClient) createChannel(typ string, close chan *amqp091.Error) (*amqp091.Channel, error) {
    log.Printf("Opening %s channel...", typ)
    ch, err := c.conn.Channel()
    if err != nil {
        log.Printf("Failed to open the %s channel", typ)
        return nil, err
    }

    // Subscribe to channel close messages.
    ch.NotifyClose(close)

    log.Printf("Setting %s channel to confirm mode...", typ)
    if err := ch.Confirm(false); err != nil {
        log.Printf("Failed to set %s channel to confirm mode", typ)
        return nil, err
    }

    return ch, nil
}

Having this in a separate function is convenient since we will be setting up multiple channels: one for publishing messages and one for consuming messages. The typ parameter is just for logging, so you can see which channel is being opened. The close parameter is a channel we will be watching for errors so that we can react to the channel being closed. You will also see that we are putting the channel in confirm mode since we really want to be sure that the RabbitMQ server receives our messages.

Don’t forget that we mentioned jobs taking for over an hour in an uninterrupted way, so within that hour, lots of stuff can go wrong, and we want to be as resilient as we can against failures so that we can keep processing the job at hand knowing that the connections will be re-established in the background.

Indeed, we will need a go routine that will monitor channels getting closed and reopen them when needed:

func (c *RMQClient) reopenChannel(typ string) {
    notifyChanClose := c.getNotifyChannelByType(typ)

    log.Printf("Starting %s channel monitor thread...", typ)
    for {
        // We're listening for amqp091.Channel errors here. When subscribing to these, it is VITAL to consume them
        // properly in order not to create deadlocks!
        err, open := <-notifyChanClose
        // When an amqp091.Connection is closed, we also receive amqp091.Channel errors, so we must check to see if
        // our connection is still open. If it is not, we do not need to do anything.
        if !open || c.conn.IsClosed() {
            log.Printf("Channel or connection closed: stopping %s channel monitor thread!", typ)
            return
        }

        log.Printf("Channel error received: %s", err)

        ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour)
        for {
            select {
            case <-ctx.Done():
                if ctx.Err() == context.DeadlineExceeded {
                    log.Fatal("Timeout reached when trying to reopen channel.")
                    // We do not wait for goroutines to finish and simply bail out! There is nothing else we can do
                    // without an AMQP channel.
                }
            default:
                log.Println("Trying to reopen channel...")
                if err := c.reopenChannelByType(typ); err == nil {
                    cancel()
                } else {
                    log.Printf("Error while trying to reopen channel: '%s', waiting before retry", err)
                    time.Sleep(10 * time.Second)
                }
            }
        }
    }
}

The observant reader will have noticed that it is the exact same pattern as the connection monitor and that we snuck in two new functions that have yet to be declared: c.getNotifyChannelByType() and c.reopenChannelByType().

Here they are:

func (c *RMQClient) getNotifyChannelByType(t string) chan *amqp091.Error {
    switch t {
    case pubChan:
        return c.notifyPubChanClose
    case subChan:
        return c.notifySubChanClose
    }
    return nil
}
func (c *RMQClient) reopenChannelByType(t string) error {
    switch t {
    case pubChan:
        return c.createPubChannel()
    case subChan:
        return c.createSubChannel()
    }
    return nil
}

And we’ll need to update our RMQClient struct as well as add some constants:

const (
    pubChan = "pub"
    subChan = "sub"
)

type RMQClient struct {
    connectionUrl string
    heartbeat     time.Duration
    consumerTag   string

    exchange     string
    consumeQueue string

    conn               *amqp091.Connection
    notifyConnClose    chan *amqp091.Error
    notifyPubChanClose chan *amqp091.Error
    notifySubChanClose chan *amqp091.Error
}

And now we can finally see the c.openChannels() receiver take form:

func (c *RMQClient) openChannels() error {
    if err := c.createPubChannel(); err != nil {
        return err
    }
    go c.reopenChannel(pubChan)

    if err := c.createSubChannel(); err != nil {
        return err
    }
    go c.reopenChannel(subChan)
    go c.handleMessages()

    return nil
}

The most important parts are still missing though: actually subscribing to a queue to consume and publish messages. Publish is the easiest one:

func (c *RMQClient) createPubChannel() error {
    var err error

    // We MUST create this channel here because it gets closed when an amqp091.Channel gets closed.
    c.notifyPubChanClose = make(chan *amqp091.Error)
    c.pubChannel, err = c.createChannel(pubChan, c.notifyPubChanClose)
    if err != nil {
        return err
    }

    return nil
}

The subscribe channel used to consume messages is more complex:


func (c *RMQClient) createSubChannel() error {
    var err error

    // We MUST create this channel here because it gets closed when an amqp091.Channel gets closed.
    c.notifySubChanClose = make(chan *amqp091.Error)
    c.subChannel, err = c.createChannel(subChan, c.notifySubChanClose)
    if err != nil {
        return err
    }

    log.Printf("Declaring new queue '%s'...", c.consumeQueue)
    queue, err := c.subChannel.QueueDeclare(c.consumeQueue, true, false, false, false, nil)
    if err != nil {
        log.Println("Failed to declare queue")
        return err
    }

    log.Println("Setting prefetch count to 1")
    // To get round-robin behavior between consumers consuming from the same queue on different connections, set the
    // prefetch count to 1 (1st parameter), and the next available message on the server will be delivered to the next
    // available consumer.
    // Global is set to true (3rd parameter), so these Qos settings apply to all existing and future consumers on all
    // channels on the same connection.
    err = c.subChannel.Qos(1, 0, true)
    if err != nil {
        log.Println("Failed to set QoS")
        return err
    }

    log.Printf("Binding queue '%s' to exchange '%s' with routing key '%s'...", c.consumeQueue, c.exchange, c.consumeQueue)
    if err = c.subChannel.QueueBind(queue.Name, queue.Name, c.exchange, false, nil); err != nil {
        log.Println("Failed to bind queue to exchange")
        return err
    }

    log.Println("Registering consumer...")
    c.delivery, err = c.subChannel.Consume(queue.Name, c.consumerTag, false, false, false, false, nil)
    if err != nil {
        log.Println("Failed to register a consumer")
        return err
    }

    return nil
}

And last but not least, our actual message handler routine:


func (c *RMQClient) handleMessages() {
    log.Println("Starting message handler routine...")
    tm := time.Now()
    for {
        if c.delivery == nil || c.conn == nil || c.conn.IsClosed() {
            log.Printf("Connection problem, sleeping message handler routine: delivery = %v, conn = %+v", c.delivery, c.conn)
            if time.Since(tm) > 1*time.Hour {
                log.Fatal("Been asleep for at least an hour, bailing out!")
            }
            time.Sleep(1 * time.Second)
            continue
        }
        log.Println("Waiting for new messages...")
        msg, open := <-c.delivery
        // We check if the channel is open because it will be closed when there are connection issues.
        if open {
            // Reset timer.
            tm = time.Now()
            // We're passing a pointer to the MessagingChannels struct so that when the channels are
            // re-opened in case of problems, those changes will be reflected in the handler function.
            c.handler(&msg, c.pubChannel)
        } else {
            log.Println("Delivery channel closed!")
            time.Sleep(1 * time.Second)
        }
    }
}

All of these new functions need changes to our RMQClient:

const (
    pubChan = "pub"
    subChan = "sub"
)

type messageHandler func(msg *amqp091.Delivery, pub *amqp091.Channel, pending chan []byte)

type RMQClient struct {
    connectionUrl string
    heartbeat     time.Duration
    consumerTag   string

    exchange     string
    consumeQueue string

    handler messageHandler

    conn               *amqp091.Connection
    notifyConnClose    chan *amqp091.Error
    pubChannel         *amqp091.Channel
    notifyPubChanClose chan *amqp091.Error
    subChannel         *amqp091.Channel
    notifySubChanClose chan *amqp091.Error
    delivery           <-chan amqp091.Delivery
}

Want to find out what we can do for you?