Queue Job AMQP (RabbitMQ) y Api web usando Nodejs (Hapi)

En este tutorial estaremos viendo como crear un job worker Cliente y Servidor del mismo, usando HapiJs (Nodejs Framework) y RabbitMQ como nuestra cola de trabajo.

Esto de las colas de trabajos es muy util a la hora de quitarle carga a nuestras aplicaciones y volverlas asíncronas en torno a cuando se realiza se realiza una petición de realizar cierta acción que podría comprometer el rendimiento de nuestra aplicación, se puede decir que es una forma de comunicar sistemas.

Las colas de trabajos suelen usarse mucho cuando nuestros sistemas tienen que interactuar con sistemas de terceros y los cuales no tenemos el control de disponibilidad / rendimiento cosa que si le agregamos la capacidad de trabajar procesos en background con una cola nuestra aplicación no sufrirá ningún inconveniente si estas aplicaciones de terceros esta dando problemas.

Entre las aplicaciones mas comunes de los jobs queue se encuentran las siguientes:

Enviar Mail:
Es lo primero que encontraran que es usado esto de las colas de trabajo, esto de enviar mail es algo que consume mucho recurso como parar realizarlo en real time desde un request web.

Genrar reportes y análisis de datos:
Para generar un reporte y/o analizar datos es requerido mucho procesamiento por lo cual es mejor que otro procesos incluso en otras maquinas diferentes a donde están nuestras aplicaciones corriendo.

Chat:
Los sistemas de chat normalmente envian esos mensajes a un terceros y este es quien los procesa y los envia a sus destinatarios.

En fin Cualquier interacción con una aplicación de terceros o cualquier proceso que consuma muchos recursos.

Manos a la obra..

Nuestro ejemplo podriamos definirlo con este diagrama:

Blank_Flowchart__Lucidchart

Un productor (Server.js) que envia los mensajes a el broker AMQP y un consumidor (Worker.js) que es quien procesa esos mensajes.

Primero hay que instalar RabbitMQ (Es bastante facil de instalar y correr este servicio)

Luego instalar las dependencia que estaremos usando que son:

    npm install hapi -g
    npm install amqp

Una vez todo lo necesario instalado, proceder a crear nuestra aplicación web que sera una web api encargada de recibir los mensajes a enviar a nuestra cola de trabajo (RabbitMQ server).

server.js


/** server.js **/

      var Hapi = require('hapi');
      var server = new Hapi.Server(4444);

      server.route({
          method: 'GET',
          path: '/',
          handler: function (request, reply) {
              reply('Hola Mundo');
          }
      });

      server.route({
          method: 'POST',
          path: '/message/',
          handler: function (request, reply) {

              var data = request.payload;

              reply('Data recibida ' + data);
          }
      });

      server.route({
          method: 'GET',
          path: '/{cola}/{mensaje}',
          handler: function (request, reply) {
              reply('Hello, ' + encodeURIComponent(request.params.name) + '!');
          }
      });

      server.start(function () {
          console.log('Servidor arriba y corriendo:', server.info.uri);
      });

/** End File **/

En estas lineas de código lo unico que tenemos es 3 metodos de los cuales 1 de ellos es un metodo dummy «/» Para decir digamos que el servicio esta funcionando. Los demás metodos hasta el momento solo están imprimiendo la data que se les esta enviando.

Asi que si realizamos los siquientes request, estan seran las respuestas: (Usando curl)

curl localhost:4444
"Hola Mundo"

curl localhost:4444/cola/mensaje
"Cola: cola, mensaje:mensaje"

curl -i -X POST -H "Content-Type: application/json" -d '{"message":"MESSAGE","cola":"COLA"}' http://localhost:3333/message/
Data recibida {"message":"MESSAGE","cola":"COLA"}

Una vez obtenido estos resultados nuestra api esta funcionando como lo esperado.

Ahora pasar a la parte de AMQP.

Por defecto AMQP (RabbitMQ) usa el usuario guest y la clave guest por lo que no tenemos que especificarlo, Pueden agregar este trozo de código al final el archivo Server.js


//Server.js
    var amqp = require('amqp');

    var connection = amqp.createConnection({
        host:'localhost',
        //login:'guest',
        //password:'guest'
    });

    connection.on('ready',function(){
        console.log('Conexión hecha con RabbitMQ y lista para ser usada.');
    });

    connection.on('error',function(){
        console.log('Error conectando con RabbitMQ',arguments);
    });

Si todo salió bien (Lo instalaron bien y el servicio esta corriendo) tendrán esto una vez inicien su servidor web.

Servidor arriba y corriendo: http://localhost:4444
Conexión hecha con RabbitMQ y lista para ser usada.

Ahora llega la parte mas difícil de todas y es como poblar nuestra cola de trabajo


    var manejardorDeCola = function(connection,cola,mensaje){
        connection.publish(cola,mensaje);
    }

Creamos una función «manejardorDeCola» la cual se encargara de hacer agregar mensajes a nuestra cola.

Nuestro archivo Server.js quedaria de la siguiente forma:

    /** server.js **/

        var Hapi = require('hapi');
        var server = new Hapi.Server(4444);

        var amqp = require('amqp');

        var connection = amqp.createConnection({
            host:'localhost',
            //login:'guest',
            //password:'guest'
        });

        connection.on('ready',function(){
            console.log('Connection on hermes made and ready to use.');
        });

        connection.on('error',function(){
            console.log('Error connecting to hermes',arguments);
        });

        var manejardorDeCola = function(connection,cola,mensaje){

            connection.publish(cola,mensaje);

            console.log("Mensaje enviado a la cola");
        }

        server.route({
            method: 'GET',
            path: '/',
            handler: function (request, reply) {
                  reply('Hola Mundo');
            }
        });

      server.route({
          method: 'POST',
          path: '/message/',
          handler: function (request, reply) {

            var data = request.payload;
            var cola = data.cola;
            var mensaje = data.mensaje;

            manejardorDeCola(connection,cola,mensaje);

            reply('Data recibida ' + JSON.stringify(data) );
          }
      });

        server.route({
            method: 'GET',
            path: '/{cola}/{mensaje}',
            handler: function (request, reply) {

                var cola = request.params.cola;
                var mensaje = request.params.mensaje;

                manejardorDeCola(connection,cola,mensaje);

                reply('Cola: ' + request.params.cola + ', mensaje:'+request.params.mensaje);
            }
        });

        server.start(function () {
            console.log('Servidor arriba y corriendo:', server.info.uri);
        });

/** End File **/

Ejecutan su server con

    node server.js

Luego realizan algunos request

curl -i -X POST -H "Content-Type: application/json" -d '{"message":"MESSAGE","cola":"COLA"}' http://localhost:3333/message/

Y si verifican en su Broker (RabbitMQ) podran observar los nuevos mensaje enviados ahora mismo.

Para adminsitrar su servidor RabbitMQ yo recomiendo que instalen un «web managment» que ellos brindan y no tendrán que usar el cli.
(Instalar)

rabbitmq-plugins enable rabbitmq_management

Todo esto listo es necesario ahora crear un consumidor de esos mensajes el cual se encargara de procesar esos mensajes y realizar la tarea que tenga que hacer.

En este tutorial muestro como crear un worker que podría ser util para este caso.

Crear un archivo llamado worker.js y guardarlo en el mismo directorio de nuestro server.js. Nuestro worker.js estaria de esta forma.

//worker.js
      var amqp = require('amqp');

      var connection = amqp.createConnection({
          host:'localhost',
          //login:'guest',
          //password:'guest'
      });

      connection.on('ready',function(){
          console.log('Conexion lista worker');
      });

      connection.on('error',function(){
          console.log('Error conectando con el broker',arguments);
      });

      (function(connection){
          setTimeout(function() {

              //nombre de la cola que estaremos consumiendo los mensajes
              var cola = 'cola';

              connection.queue(cola, function (cola) {

                  // comodin para capturar todos los mensajes
                  cola.bind('#');

                  cola.subscribe(function (message) {
                      //{ data:<buffer>,contentType:'application/octet-stream' }
                      var buffer = new Buffer(message.data);

                      console.log("Mensaje optenido ",buf.toString());
                  });
              });
          }, 5000);
      })(connection);

Para que todo funcione tenemos que iniciar este worker (worker.js) podemos hacerlo directamente o podemos iniciarlo desde nuestro server.js. Para iniciarlo desde nuestro server.js tenemos que agregar estas lineas a nuestro archivo server.js

//server.js
        var process = require('child_process');

        // se realiza el fork del nuevo proceso
        var child = process.fork('worker.js');

Este seria nuestro worker el cual toma los mensajes que publiquemos por el api service y lo mostrara en pantalla. Los mensaje que llegan están en un buffer y tenemos que convertir eso a un strong y poder procesarlos o hacer lo que queramos con el mismo.

Referencia

Projecto Completo

Para quienes no les gusta usar curl, pueden usar este cliente web para probar rest apies Postman

Documentación de AMQP Nodejs (Varios push por el mismo creador de nodejs 🙂 )

Documentación de HapiJs