I’ve been doing some work recently with ZeroMQ and Go and found the pattern for designing multi-threaded ZeroMQ servers quite interesting. ZeroMQ sockets are not thread-safe except under specific circumstances, but you can use features of ZeroMQ to work around this and also improve scalability.

A single threaded server

Here’s a snippet of code from a single-threaded implementation of the server.

const client_bind string = "tcp://*:5555"

func main() {
  zmqs, _ := zmq.NewSocket(zmq.REP)
  defer zmqs.Close()
  zmqs.Bind(client_bind)

  for {
    log.Println("Waiting for message")
    data, _ := zmqs.RecvMessage(0)
    log.Printf("Received message: %s", data)
    reply := Reverse(data[0])
    zmqs.SendMessage(reply)
    log.Printf("Sent reply: %s", reply)
  }
}

In this example, I open a REP type socket with connection parameters defined in client_bind and then listen for responses and send replies in an infinite loop.

Moving to a multi-threaded implentation

In the multithreaded implementation I split the functionality of the single-threaded main function out into a seperate function, which can be started as a seperate goroutine.

const worker_bind string = "inproc://workers"

func worker(worker_id int) {
  log.Printf("Worker %d: Started", worker_id)

  zmqs, _ := zmq.NewSocket(zmq.REP)
  defer zmqs.Close()
  zmqs.Connect(worker_bind)

  for {
    log.Printf("Worker %d: Waiting for message", worker_id)
    data, _ := zmqs.RecvMessage(0)
    log.Printf("Worker %d: Received message: %s", worker_id, data)
    reply := Reverse(data[0])
    zmqs.SendMessage(reply)
    log.Printf("Worker %d: Sent reply: %s", worker_id, reply)
  }
}

In this example worker is passed an integer which is used as an goroutine identifier for logging purposes. The code is very similar to the single-threaded implementation and we can see there is still a REP type queue being opened, but in this case it has a different connection string.

The important detail in here is the "inproc://workers" part. This tells ZeroMQ to connect to an in-process queue. In the main function we have the Bind counterpart…

const client_bind string = "tcp://*:5555"

func main() {

  log.Println("Starting workers")
  for i := 0; i != 5; i = i + 1 {
    log.Printf("Starting worker %d", i)
    go worker(i)
  }

  clients, _ := zmq.NewSocket(zmq.ROUTER)
  defer clients.Close()
  clients.Bind(client_bind)
  log.Println("Client listener started")

  workers, _ := zmq.NewSocket(zmq.DEALER)
  defer workers.Close()
  workers.Bind(worker_bind)
  log.Println("Worker listener started")

  log.Println("Starting proxy")
  err := zmq.Proxy(clients, workers, nil)
  log.Fatalf("Proxy interrupted:", err)
  
}

Here we can see a ROUTER type socket being created, then a DEALER type socket and finally a Proxy being used to connect the two together. There’s some good documentation in the ZeroMQ guide about why it’s necessary to use a ROUTER-DEALER socket combination to forward the requests on to the workers and the replies back to the clients. The short version is that the ROUTER type adds the required identity information to the data sent to the worker so that the responses are routed back to the correct client.

The source code for these examples is available at github.com/aviancarrier/0mq-multithreaded.