ZeroMQ in Node.js: Examples & Patterns

ZeroMQ in Node.js: Practical Examples

This document provides practical examples of using ZeroMQ (zeromq) in Node.js. It covers several common messaging patterns, including Publish/Subscribe, Push/Pull, Router/Dealer, and Request/Reply. Each example is presented with corrected code and explanations.

Publish/Subscribe Pattern (Pub/Sub)

This example demonstrates a publisher sending messages on different topics, and a subscriber listening to a specific topic (“Politica”).

const zmq = require('zeromq');
const port = process.argv[2];
const numMensajes = process.argv[3];
let contador = 0;
let pub = zmq.socket('pub');
let msg = process.argv.slice(4);
let contTema = [];

for (let i = 0; i < msg.length; i++) {
  contTema[i] = 0;
}

pub.bind('tcp://*:' + port);

function emite() {
  let m = msg[0];
  let c = ++contTema[0];
  contador++;
  pub.send(m + ' ' + 'tras ' + contador + ' segundos, se muestra ' + c + ' veces.');
  console.log('Tras ' + contador + ' segundo: ' + contador + ' ' + m + ' ' + c);
  msg.shift();
  contTema.shift();
  msg.push(m);  // rotatorio
  contTema.push(c);
  if (numMensajes == contador) {
    clearInterval(myVar);
    process.exit(0);
  }
}

var myVar = setInterval(emite, 1000); // every second

Subscriber:

const zmq = require('zeromq');
let sub = zmq.socket('sub');
sub.connect('tcp://127.0.0.1:8888');
sub.subscribe('Politica');
sub.on('message', (m) => {
  console.log(m + '');
});

Chat Application (Pub/Sub and Push/Pull)

This example shows a simple chat application using ZeroMQ. It uses a combination of Publish/Subscribe for broadcasting messages and Push/Pull for handling user input.

const zmq = require('zeromq');
const nick = 'Ana';
let sub = zmq.socket('sub');
let psh = zmq.socket('push');
sub.connect('tcp://127.0.0.1:9998');
psh.connect('tcp://127.0.0.1:9999');
sub.subscribe('');
sub.on('message', (nick, m) => {
  console.log('[' + nick + ']' + m);
});
process.stdin.resume();
process.stdin.setEncoding('utf8');
process.stdin.on('data', (str) => {
  psh.send([nick, str.slice(0, -1)]);
});
process.stdin.on('end', () => {
  psh.send([nick, 'BYE']);
  sub.close();
  psh.close();
});
process.on('SIGINT', () => {
  process.stdin.end();
});
psh.send([nick, 'HI']);

Server:

const zmq = require('zeromq');
let pub = zmq.socket('pub');
let pull = zmq.socket('pull');

pub.bind('tcp://*:9998');
pull.bind('tcp://*:9999');

pull.on('message', (id, txt) => {
  switch (txt.toString()) {
    case 'HI':
      pub.send(['server', id + ' connected']);
      break;
    case 'BYE':
      pub.send(['server', id + ' disconnected']);
      break;
    default:
      pub.send([id, txt]);
  }
});

Load Balancing with Router/Dealer

This example demonstrates a load balancing pattern using Router and Dealer sockets. The Router distributes client requests to available workers, and the Dealer sockets handle the requests.

const zmq = require('zeromq');
let cli = [], req = [], workers = [], workerAten = [];
let sc = zmq.socket('router'); // frontend
let sw = zmq.socket('router'); // backend
const portF = process.argv[2];
const portB = process.argv[3];
let contResp = 0;
sc.bindSync('tcp://*:' + portF);
sw.bindSync('tcp://*:' + portB);
sc.on('message', (c, sep, m) => {
  if (workers.length == 0) {
    cli.push(c);
    req.push(m);
  } else {
    sw.send([workers.shift(), '', c, '', m]);
  }
});
sw.on('message', (w, sep, c, sep2, r) => {
  if (c == '') {
    workers.push(w);
    workerAten[w] = 0;
    return;
  }
  if (cli.length > 0) {
    sw.send([w, '',
    cli.shift(), '', req.shift()
    ]);
  } else {
    workers.push(w);
  }
  contResp++;
  sc.send([c, '', r + ' ' + contResp]);
  workerAten[w]++;
});
setInterval(() => {
  console.log('Total respuestas: ', contResp);
  console.log(workerAten);
}, 5000);

Worker (Req/Rep)

A worker using the Request/Reply pattern to process requests.

const zmq = require('zeromq');
let req = zmq.socket('req');
const url = process.argv[2];
const nick = process.argv[3];
req.identity = nick + process.pid;
const respuesta = process.argv[4];
req.connect('tcp://' + url, () => {
  console.log('Error worker');
});
req.on('message', (c, sep, msg) => {
  setTimeout(() => {
    req.send([c, '', respuesta]);
  }, 1000);
});
req.send(['', '', '']);

Client (Req/Rep)

A client using the Request/Reply pattern to send requests.

const zmq = require('zeromq');
let req = zmq.socket('req');
const url = process.argv[2];
// const nick = process.argv[3];
req.identity = /*nick+*/ process.pid.toString();
const peticion = process.argv[4];
req.connect('tcp://' + url, () => {
  console.log('Error cliente');
});
req.on('message', (msg) => {
  console.log('resp: ' + msg);
  process.exit(0);
});
req.send(peticion);