ZeroMQ in Node.js: Examples & Patterns
ZeroMQ in Node.js: Practical Examples
This document provides practical examples of using ZeroMQ (zeromq
) in Node.js. It covers several common messaging patterns, including Publish/Subscribe, Push/Pull, Router/Dealer, and Request/Reply. Each example is presented with corrected code and explanations.
Publish/Subscribe Pattern (Pub/Sub)
This example demonstrates a publisher sending messages on different topics, and a subscriber listening to a specific topic (“Politica”).
const zmq = require('zeromq');
const port = process.argv[2];
const numMensajes = process.argv[3];
let contador = 0;
let pub = zmq.socket('pub');
let msg = process.argv.slice(4);
let contTema = [];
for (let i = 0; i < msg.length; i++) {
contTema[i] = 0;
}
pub.bind('tcp://*:' + port);
function emite() {
let m = msg[0];
let c = ++contTema[0];
contador++;
pub.send(m + ' ' + 'tras ' + contador + ' segundos, se muestra ' + c + ' veces.');
console.log('Tras ' + contador + ' segundo: ' + contador + ' ' + m + ' ' + c);
msg.shift();
contTema.shift();
msg.push(m); // rotatorio
contTema.push(c);
if (numMensajes == contador) {
clearInterval(myVar);
process.exit(0);
}
}
var myVar = setInterval(emite, 1000); // every second
Subscriber:
const zmq = require('zeromq');
let sub = zmq.socket('sub');
sub.connect('tcp://127.0.0.1:8888');
sub.subscribe('Politica');
sub.on('message', (m) => {
console.log(m + '');
});
Chat Application (Pub/Sub and Push/Pull)
This example shows a simple chat application using ZeroMQ. It uses a combination of Publish/Subscribe for broadcasting messages and Push/Pull for handling user input.
const zmq = require('zeromq');
const nick = 'Ana';
let sub = zmq.socket('sub');
let psh = zmq.socket('push');
sub.connect('tcp://127.0.0.1:9998');
psh.connect('tcp://127.0.0.1:9999');
sub.subscribe('');
sub.on('message', (nick, m) => {
console.log('[' + nick + ']' + m);
});
process.stdin.resume();
process.stdin.setEncoding('utf8');
process.stdin.on('data', (str) => {
psh.send([nick, str.slice(0, -1)]);
});
process.stdin.on('end', () => {
psh.send([nick, 'BYE']);
sub.close();
psh.close();
});
process.on('SIGINT', () => {
process.stdin.end();
});
psh.send([nick, 'HI']);
Server:
const zmq = require('zeromq');
let pub = zmq.socket('pub');
let pull = zmq.socket('pull');
pub.bind('tcp://*:9998');
pull.bind('tcp://*:9999');
pull.on('message', (id, txt) => {
switch (txt.toString()) {
case 'HI':
pub.send(['server', id + ' connected']);
break;
case 'BYE':
pub.send(['server', id + ' disconnected']);
break;
default:
pub.send([id, txt]);
}
});
Load Balancing with Router/Dealer
This example demonstrates a load balancing pattern using Router and Dealer sockets. The Router distributes client requests to available workers, and the Dealer sockets handle the requests.
const zmq = require('zeromq');
let cli = [], req = [], workers = [], workerAten = [];
let sc = zmq.socket('router'); // frontend
let sw = zmq.socket('router'); // backend
const portF = process.argv[2];
const portB = process.argv[3];
let contResp = 0;
sc.bindSync('tcp://*:' + portF);
sw.bindSync('tcp://*:' + portB);
sc.on('message', (c, sep, m) => {
if (workers.length == 0) {
cli.push(c);
req.push(m);
} else {
sw.send([workers.shift(), '', c, '', m]);
}
});
sw.on('message', (w, sep, c, sep2, r) => {
if (c == '') {
workers.push(w);
workerAten[w] = 0;
return;
}
if (cli.length > 0) {
sw.send([w, '',
cli.shift(), '', req.shift()
]);
} else {
workers.push(w);
}
contResp++;
sc.send([c, '', r + ' ' + contResp]);
workerAten[w]++;
});
setInterval(() => {
console.log('Total respuestas: ', contResp);
console.log(workerAten);
}, 5000);
Worker (Req/Rep)
A worker using the Request/Reply pattern to process requests.
const zmq = require('zeromq');
let req = zmq.socket('req');
const url = process.argv[2];
const nick = process.argv[3];
req.identity = nick + process.pid;
const respuesta = process.argv[4];
req.connect('tcp://' + url, () => {
console.log('Error worker');
});
req.on('message', (c, sep, msg) => {
setTimeout(() => {
req.send([c, '', respuesta]);
}, 1000);
});
req.send(['', '', '']);
Client (Req/Rep)
A client using the Request/Reply pattern to send requests.
const zmq = require('zeromq');
let req = zmq.socket('req');
const url = process.argv[2];
// const nick = process.argv[3];
req.identity = /*nick+*/ process.pid.toString();
const peticion = process.argv[4];
req.connect('tcp://' + url, () => {
console.log('Error cliente');
});
req.on('message', (msg) => {
console.log('resp: ' + msg);
process.exit(0);
});
req.send(peticion);