Trabajos en segundo plano en Node.js a escala

trabajos en segundo plano

Cómo ejecutamos trabajos en segundo plano en Node.js a escala

Este artículo lo ayudará a configurar el trabajos en segundo plano al estilo de Sidekiq y la capacidad de procesamiento cron en Node.js usando Bull mientras mantiene la simplicidad y la escala en el centro.

 

Configuración de Node.js Bull, Sidekiq Style

 


Antecedentes: Acerca de Sidekiq

Cualquiera que haya usado Sidekiq en Ruby junto con las aplicaciones Rails sabe que es una de las mejores herramientas asincrónicas, de procesamiento en segundo plano y cron que existen. Es una joya de procesamiento rápido para trabajos en segundo plano, liviana y fácil de configurar.

A continuación, se muestran los casos de uso comunes y Sidekiq proporciona soluciones listas para usar para ellos.

  • Trabajos cron fáciles de interconectar: ​​con Sidekiq + Sidekiq Scheduler, puede configurar un montón de trabajos cron describiendo el cronograma cron y los trabajadores ejecutores.
  • Activación de trabajadores asincrónicos: para activar comunicaciones/trabajos menos críticos desde las API principales para procesarlos de forma asíncrona (para reducir la latencia + cómputo). Para desencadenar trabajos retrasados.
  • Proceso secundario fácil de configurar: Ejecute la ejecución del trabajo como un proceso/servicio separado para que pueda administrarse y escalarse de forma independiente. Los servicios de activación y ejecución de trabajos se comunican a través de Redis.
  • Consola web: para ver, desencadenar y monitorear trabajos.

Sidekiq se encarga de toda la configuración, las operaciones y el cableado de las colas (solo tiene que especificar los nombres de las colas). También le brinda interfaces en su código de Rails para activar trabajos asíncronos. Eso es lo que la convierte en una gran biblioteca.

Node.js Bull

Si busca paquetes similares en Node.js, encontrará bull, node-cron, node-schedule entre los primeros. Si bien la programación de nodos es más popular, solo proporciona una solución cron. Bull es la herramienta de procesamiento de colas en segundo plano más utilizada. Pero no proporciona una interfaz tan fácil y limpia como Sidekiq y tiene que configurar muchas cosas manualmente. Debe configurar cada cola de forma independiente y conectarla en el servicio del procesador. Así que proporcionaré una guía paso a paso para cada uno de los casos de uso anteriores a continuación.

La puesta en marcha

Configuración de colas:

Configure todas las colas, tanto para crons como para trabajos asincrónicos en un solo archivo de configuración. Para compartir colas entre el servicio/API principal (para activar trabajos) y el servicio de procesador (para ejecutar trabajos), debe inicializar la cola en ambos servicios. Duplicar esto es propenso a errores.

Así que una práctica que estamos siguiendo

  1. Inicialice y exporte todas las colas (crones y trabajos asíncronos por igual) en un archivo de configuración
  2. Definir sus funciones de trabajador de procesamiento asignadas con colas
  3. Importe las colas en el servicio primario (API) para activar trabajos (“el servicio”)
  4. Importar y “procesar” todas las colas en el servicio de procesamiento (procesador en segundo plano, “el procesador”)

La configuración de colas

Todas las colas se definen en un archivo de configuración. Esta configuración se compartirá entre el servicio y el procesador.

/* bull_config.js */
const queueOpts = { prefix: `my_service_${APP_ENV}:bull` }; // prefix for Redis keys
export const notificationQueue = new Queue('notification_queue', process.env.REDIS_URL, queueOpts); // job queue
export const somethingQueue = new Queue('something_queue', process.env.REDIS_URL, queueOpts); // job queue
const reminderQueue = new Queue('reminder_cron_queue', process.env.REDIS_URL, queueOpts); // cron queue

export backgroundQueues = [
  {queue: notificationQueue, worker: notificationWorker },
  {queue: somethingQueue, worker: somethingWorker }
]

export cronQueues = [
  {queue: reminderQueue, worker: reminderCronWorker, cronExpression: '1 * * * *'} 
  // Runs on 1st minute, every hour
]

El uso

El servicio utilizará las colas exportadas para activar trabajos:

/* express_app.js */
import express from 'express';
import { notificationQueue } from  './config/bull_config';
// ... ...
const router = express.Router();

//... ...
router.get('/signup', function (req, res, next) {
  //... the business logic ...
  let user; /* ... ...*/
  await notificationQueue.add({message: `Welcome ${user.name}!`, phone: user.phone}); 
  res.json({success: true});
});

El procesamiento

El mapa de colas será para configurar el procesador:

/* bull_processor.js */
import { backgroundQueues, cronQueues } from './config/bull_config';
/* cronQueues & backgroundQueues are the lists of the queues we defined in the config file
  They're the pair of "queue" & it's "worker" function. We tie them together */

backgroundQueues.forEach(function (e) {
  e.queue.process(4 /* concurrency */, e.worker);
})

cronQueues.forEach(function (e) {
  e.queue.process(e.worker);
  e.queue.add({}, { repeat: { cron: e.cronExpression } }); 
// Note that -- for the cron -- you add a single repeating job
})


/* Optional, wrapper for worker: This will allow us setting up the common activities for all queues
  -- better logging, error handling & reporting -- only once */
function workersWrapper(worker) = () => (async(job, done) => {
        try { var r = await worker(job, done); } catch(e) { /* stuff here; */ done(e); }
    })

Ahora hemos visto el procesamiento básico de trabajos y la configuración de cron. Agregar nuevos crons y colas es bastante fácil. Debe definir nuevas colas en la lista y establecer sus funciones de trabajo.

Ejecutándose como un proceso independiente:

node bull_processor.js: Ejecutar esto iniciará un nuevo proceso que escucha y procesa todas las colas.

Escalada

Estos procesos se ejecutan de forma independiente. También se pueden ejecutar en un contenedor docker para escalar fácil y rápidamente. WE puede activar 10s, 100s de procesos/contenedores con bastante rapidez según las necesidades. También puede publicar las longitudes de la cola y usar métricas/alarmas para escalar automáticamente sus máquinas/contenedores fácilmente con ECS, EKS, Kubernetes, etc. en función del retraso de la cola.


La configuración básica se realiza aquí. El siguiente código es para un caso de uso especial.

Trabajos y trabajadores asíncronos: (los poco comunes)

Hemos visto cómo configurar colas y desencadenar trabajos. Estos son útiles cuando tiene casos de uso comunes, como enviar notificaciones/comunicaciones, ejecutar un trabajo de limpieza, etc. Pero, ¿qué sucede si solo desea ejecutar cualquier función de forma asíncrona? ¡Hacer colas, trabajadores cada vez es un dolor!

Hay una forma complicada de configurar esto. Puede tener una cola dedicada para todos esos trabajos en segundo plano y elegir funciones dinámicamente.

/* The queue definition in bull_config.js */
export const genericAsyncQueue = new Queue('generic_async_queue', process.env.REDIS_URL, queueOpts); 

/* --------------------------------------------------------* /
/* Worker function definition in asyncWorker.js */
import { doSomething } from './somewhere';
import { doSomethingElse } from './somewhere_else';
import { generateUniqueTokenForUser } from './users';
const functionsList = [
  doSomething, doSomethingElse, generateUniqueTokenForUser, /* all such async functions */
]
var functions = {}
functionsList.forEach(function (e) { return functions[e.name] = e })
// We initialise functions as {"functionName": function} map

// This worker takes function name & function args as job data, and executes the functions
async function genericAsyncWorker(job, done) {
  const fname = job.data.function_name;
  await functions[fname].apply(1, job.data.function_args);
  done()
}

/* --------------------------------------------------------* /
/* Trigger job from inside the app */
await genericAsyncQueue.add({
  function_name: 'generateUniqueTokenForUser', function_args: [userId, phoneNumber]
});
await genericAsyncQueue.add({ function_name: 'doSomething', function_args: [] });

Otras opciones útiles

/* Other configurations & options */

// Trigger job with 30 second delay
queue.add({ someKey: "Some Value" }, { delay: 30 * 1000 })

// Trigger job with exponential retries, 30s: 0, 30, 90, 150...
queue.add({ someKey: "Some Value" }, { attempts: 3, backoff: { type: 'exponential', delay: 30000 } })

// Rate limiting a queue (5 per second)
queue = new Queue('some_queue', process.env.REDIS_URL, { /* other opts, */ limiter: { max: 5, duration: 1000 } });

// Retries & delays, for the queue
queue = new Queue('some_queue', process.env.REDIS_URL, { /* other opts, */ defaultJobOptions: { delay: 30 * 1000 } });

¡Eso es todo! Espero que esto lo ayude a configurar y poner en funcionamiento su aplicación rápidamente.

Recent Post