Executando verificação de segurança...
2

Integrando ActiveMq com Nest.js

Um breve tutorial mostrando como você pode integrar facilmente um broker ActiveMQ/AMQP 1.0 com Nest.js usando Decorators

Você talvez se pergunte

Por que um tutorial em vez de publicar uma biblioteca?
Simplificando, é muito simples para justificar uma biblioteca e eu não teria tempo para desenvolvê-la melhor e de uma maneira mais geral. Mas, realmente encorajo qualquer pessoa que tenha interesse e tempo. Uma dica que posso dar é usar a biblioteca golevelup nestjs-rabbitmq como inspiração, assim como fiz para alcançar a solução deste post.
Ao mesmo tempo, acho que este conteúdo que compartilho é muito valioso, já que tive dificuldades para encontrar qualquer coisa sobre isso na internet.

Você tem experiência com ActiveMQ / AMQP 1.0?
Definitivamente não. Na verdade, acabei de descobrir isso há pouco tempo, então não espere um grande conhecimento sendo compartilhado sobre este tópico. Aqui está a documentação oficial do Apache ActiveMQ Classic (o broker usado neste tutorial).

Stack Utilizada

  • Broker AMQP 1.0: ActiveMQ (classic 5.17) do AmazonMQ
  • Nest.js
  • rhea: biblioteca geral que implementa o protocolo AMQP 1.0 (v3.0.2)
    • Provavelmente a melhor (embora não perfeita) que você encontrará no momento em que escrevo isso
  • @golevelup/nestjs-discovery: facilita o trabalho com decoradores no Nest (v4.0.1)
  • Docker: bônus se você quiser rodar uma versão local do ActiveMQ

Configuração do ActiveMQ

Não vou mostrar aqui como configurar um broker ActiveMQ na AWS, pois esse não é o ponto, mas é claro que você precisará de algum broker para conectar e testar sua implementação. Faremos isso com docker compose assim:

services:
  activemq:
    image: apache/activemq-classic:5.18.3
    container_name: myapp_activemq
    restart: unless-stopped
    ports:
      - 5672:5672
      - 8161:8161
    environment:
      ACTIVEMQ_CONNECTION_USER: admin
      ACTIVEMQ_CONNECTION_PASSWORD: admin
      ACTIVEMQ_WEB_USER: admin
      ACTIVEMQ_WEB_PASSWORD: admin
    volumes:
      - ./local-infra/activemq/activemq.xml:/opt/apache-activemq/conf/activemq.xml
      - ./local-infra/activemq/data:/opt/apache-activemq/data

Agora, este trecho também é algo valioso, porque tive que pesquisar bastante para conseguir isso (e talvez ainda esteja faltando algo).
Além disso, cuidado com o seu arquivo activemq.xml, o arquivo de configuração que a AWS fornece não corresponde ao que a imagem do Docker espera. No meu caso, tive que modificar a configuração do broker para habilitar o agendamento/adiamento de mensagens, então acessei o container, copiei o conteúdo do activemq.xml, coloquei no meu arquivo activemq.xml e depois o modifiquei conforme necessário.

Integrando com Nest.js

Vou assumir aqui que você já tem um projeto Nest.js configurado e que também já instalou as bibliotecas mencionadas na stack acima.
Em qual pasta ou estrutura você implementará os arquivos a seguir não importa muito, mas recomendo encapsular os arquivos em uma pasta chamada activemq, talvez.
PS: Você verá que configuraremos uma conexão durável, que permanece aberta o tempo todo. Esse não é o comportamento padrão do rhea, mas acho que será o que a maioria das pessoas precisará.

O ActiveMqModule

Se você está familiarizado com o Nest.js, sabe que este é o nosso arquivo wrapper/raiz para esta implementação especificamente, e talvez você também ache estranho que ele seja muito anêmico:

// ./activemq/activemq.module.ts

import { DiscoveryModule } from '@golevelup/nestjs-discovery';
import { Module } from '@nestjs/common';

import { ActiveMqService } from './activemq.service';

@Module({
  imports: [DiscoveryModule],
  providers: [ActiveMqService],
  exports: [ActiveMqService],
})
export class ActiveMqModule {}

Em alguns casos, você veria muito mais código aqui com o uso de factories nos providers e certamente o handler de bootstrap do modulo.
Mas, neste caso, preferi implementar isso no ActiveMqService. Sinta-se à vontade para mudar, se achar necessário.

O ActiveMqConfig

A declaração de todos os possíveis receptores e remetentes da nossa aplicação:

// ./activemq/activemq.config.ts

export const activeMqReceivers = 
  exampleTopic: {
    name: 'example-topic.receiver',
    source: {
      // A topic declaration
      address: 'topic://example.topic',
      durable: 2,
      expiry_policy: 'never',
    },
  },
  exampleQueue: {
    name: 'example-queue.receiver',
    source: {
      // A queue declaration
      address: 'example.queue',
      durable: 2,
      expiry_policy: 'never',
    },
  },
};

export const activeMqSenders = {
  exampleTopic: {
    name: 'example-topic.sender',
    target: {
      address: 'topic://example.topic',
      durable: 2,
      expiry_policy: 'never',
    },
  },
  exampleQueue: {
    name: 'example-queue.sender',
    target: {
      address: 'example.queue',
      durable: 2,
      expiry_policy: 'never',
    },
  },
};

O Decorator ActiveMqReceiverHandler

Isso marcará as funções que queremos que sejam executadas quando uma mensagem chegar em um determinado receptor:

// ./activemq/receiver-handler.decorator.ts

import { SetMetadata, applyDecorators } from '@nestjs/common';

import { activeMqReceivers } from './activemq.config';

export interface ActiveMqReceiverHandlerOptions {
  receiverKey: keyof typeof activeMqReceivers;
}

export const ACTIVE_MQ_RECEIVER_HANDLER = Symbol('ACTIVE_MQ_RECEIVER_HANDLER');

export const ActiveMqReceiverHandler = (
  options: ActiveMqReceiverHandlerOptions,
) => applyDecorators(SetMetadata(ACTIVE_MQ_RECEIVER_HANDLER, options));

O ActiveMqService

Aqui é onde estará a maior parte da nossa implementação:

// ./activemq/activemq.service.ts

import { DiscoveryService } from '@golevelup/nestjs-discovery';
import {
  Injectable,
  Logger,
  OnModuleDestroy,
  OnModuleInit,
} from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { create_container, Connection, Sender } from 'rhea';

import { activeMqReceivers, activeMqSenders } from './activemq.config';
import {
  ACTIVE_MQ_RECEIVER_HANDLER,
  ActiveMqReceiverHandlerOptions,
} from './receiver-handler.decorator';

@Injectable()
export class ActiveMqService implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(ActiveMqService.name);

  private connection: Connection | null = null;
  private senders: Record<string, Sender> = {};
  private receiverHandlers: Record<
    string,
    (data: Record<string, any>) => void
  > = {};

  constructor(
    private config: ConfigService,
    private discover: DiscoveryService,
  ) {}

  async onModuleInit() {
    this.connect();

    await this.bindReceiverHandlers();
  }

  async onModuleDestroy() {
    this.close();
  }

  private connect() {
    try {
      this.logger.log('Connecting to ActiveMQ...');

      const containerId = `server_${Math.random().toString(16).slice(3)}`;

      const container = create_container({ id: containerId });

      container.on('error', (error) => {
        this.logger.error('General error', error);
      });

      container.on('connection_error', (error) => {
        this.logger.error('Connection error', error);
      });

      container.on('connection_open', () => {
        this.logger.log('Connection opened');
      });

      container.on('message', (context) => {
        this.logger.log(
          `Received message on ${context.receiver.name} ~ ${context.message.body}`,
        );

        if (context.message.body === 'detach') {
          // detaching leaves the subscription active, so messages sent
          // while detached are kept until we attach again
          context.receiver.detach();
          context.connection.close();
        } else if (context.message.body === 'close') {
          // closing cancels the subscription
          context.receiver.close();
          context.connection.close();
        } else {
          const bodyObject = JSON.parse(context.message.body);

          const handler = this.receiverHandlers[context.receiver.name];

          if (handler) {
            handler(bodyObject);
          }
        }
      });

      container.on('sendable', (context) => {
        this.senders[context.sender.name] = context.sender;
      });

      this.connection = container.connect({
        transport: this.config.getOrThrow<any>('activemq.transport'),
        host: this.config.getOrThrow('activemq.host'),
        port: this.config.getOrThrow('activemq.port'),
        username: this.config.getOrThrow('activemq.user'),
        password: this.config.getOrThrow('activemq.password'),
        reconnect: 10000, // 10s between reconnects
      });

      Object.values(activeMqSenders).forEach((sender) => {
        this.connection?.open_sender(sender);
      });
    } catch (error) {
      this.logger.error('Error connecting', error);
    }
  }

  private close() {
    this.logger.log('Closing connection...');

    this.connection?.close();
  }

  /** Find methods marked by `@ActiveMqReceiverHandler`, set the respective handler and opens it */
  private async bindReceiverHandlers() {
    const receiversMeta =
      await this.discover.providerMethodsWithMetaAtKey<ActiveMqReceiverHandlerOptions>(
        ACTIVE_MQ_RECEIVER_HANDLER,
      );

    receiversMeta.forEach(({ meta, discoveredMethod }) => {
      const receiverConfig = activeMqReceivers[meta.receiverKey];

      this.logger.log(
        `Binding handler ${discoveredMethod.parentClass.name}.${discoveredMethod.methodName} to receiver ${receiverConfig.name}`,
      );

      this.receiverHandlers[receiverConfig.name] =
        discoveredMethod.handler.bind(discoveredMethod.parentClass.instance);

      // When open the receiver only after its handler is set
      this.connection?.open_receiver(receiverConfig);
    });
  }

  send(senderKey: keyof typeof activeMqSenders, data: Record<string, any>) {
    const senderConfig = activeMqSenders[senderKey];

    this.logger.log('Sending message...', senderConfig.name, data);

    const sender = this.senders[senderConfig.name];

    if (sender) {
      sender.send({
        body: JSON.stringify(data),
      });
    }
  }
}

Explicando brevemente o que acontece aqui:

  • Por padrão, o Nest.js criará apenas uma instância de ActiveMqModule e ActiveMqService, então usamos os handlers the life-cycle onModuleInit e onModuleDestroy para configurar a conexão.
  • Após iniciar a conexão, abrimos os remetentes declarados em activemq.config.ts.
  • Em sequência, bindReceiverHandlers() encontrará os métodos marcados com nosso decorador, registrará eles em nossos manipuladores locais e abrirá o receptor respectivo. Dessa forma, só receberemos novas mensagens se houver um manipulador configurado para o receptor dado.
  • Com tudo isso, podemos expor um método send() mais simples para enviar mensagens para remetentes específicos.

Declarando Publishers e Subscribers

Agora, vamos usar isso em nossa aplicação. Vou assumir uma pasta chamada publisher e outra chamada subscriber, por exemplo.

O Publisher

Só precisamos declarar um módulo que importe nosso ActiveMqModule:

// ./publisher/publisher.module.ts

import { Module } from '@nestjs/common';

import { ActiveMqModule } from './activemq/activemq.module';
import { PublisherService } from './publisher.service';

@Module({
  imports: [ActiveMqModule],
  providers: [PublisherService],
  // Exporting so you can use it in another service if needed
  exports: [PublisherService],
})
export class PublisherModule {}

E então usar o ActiveMqService para enviar uma mensagem para algum sender configurado

// ./publisher/publisher.service.ts

import { Injectable, Logger } from '@nestjs/common';

import { ActiveMqService } from './activemq/activemq.service';

@Injectable()
export class PublisherService {
  private logger = new Logger(PublisherService.name);

  constructor(private readonly activeMqService: ActiveMqService) {}

  exampleEvent(msg: any) {
    try {
      this.activeMqService.send('exampleQueue', msg);
    } catch (error) {
      this.logger.error(`exampleEvent~ ${error.message}`);
    }
  }
}

O Subscriber

Neste caso, temos que usar nosso decorator para marcar qualquer método de algum service como um manipulador de um receiver específico

// ./subscriber/subscriber.service.ts

import { Injectable, Logger } from '@nestjs/common';

import { ActiveMqReceiverHandler } from './activemq/receiver-handler.decorator';

@Injectable()
export class SubscriberService {
  private logger = new Logger(SubscriberService.name);

  @ActiveMqReceiverHandler({
    receiverKey: 'exampleQueue',
  })
  async onExampleEvent(msg: any) {
    // Be sure to wrap with try-catch to avoid unexpected errors
    try {
      this.logger.log(`Message received`, msg);
    } catch (error) {
      this.logger.error(error.message);
    }
  }
}

Tudo Pronto!

E isso seria tudo para continuar mantendo a simplicidade. Claro, será sensato modificar as implementações para o seu caso de uso. Mas espero que este post tenha ajudado você de alguma forma.

Obrigado por ler até aqui.

Carregando publicação patrocinada...