Integrando ActiveMq com Nest.js
Um breve tutorial mostrando como você pode integrar facilmente um broker ActiveMQ/AMQP 1.0 com Nest.js usando Decorators
Você talvez se pergunte
Por que um tutorial em vez de publicar uma biblioteca?
Simplificando, é muito simples para justificar uma biblioteca e eu não teria tempo para desenvolvê-la melhor e de uma maneira mais geral. Mas, realmente encorajo qualquer pessoa que tenha interesse e tempo. Uma dica que posso dar é usar a biblioteca golevelup nestjs-rabbitmq como inspiração, assim como fiz para alcançar a solução deste post.
Ao mesmo tempo, acho que este conteúdo que compartilho é muito valioso, já que tive dificuldades para encontrar qualquer coisa sobre isso na internet.
Você tem experiência com ActiveMQ / AMQP 1.0?
Definitivamente não. Na verdade, acabei de descobrir isso há pouco tempo, então não espere um grande conhecimento sendo compartilhado sobre este tópico. Aqui está a documentação oficial do Apache ActiveMQ Classic (o broker usado neste tutorial).
Stack Utilizada
- Broker AMQP 1.0: ActiveMQ (classic 5.17) do AmazonMQ
- Nest.js
- rhea: biblioteca geral que implementa o protocolo AMQP 1.0 (v3.0.2)
- Provavelmente a melhor (embora não perfeita) que você encontrará no momento em que escrevo isso
- @golevelup/nestjs-discovery: facilita o trabalho com decoradores no Nest (v4.0.1)
- Docker: bônus se você quiser rodar uma versão local do ActiveMQ
Configuração do ActiveMQ
Não vou mostrar aqui como configurar um broker ActiveMQ na AWS, pois esse não é o ponto, mas é claro que você precisará de algum broker para conectar e testar sua implementação. Faremos isso com docker compose assim:
services:
activemq:
image: apache/activemq-classic:5.18.3
container_name: myapp_activemq
restart: unless-stopped
ports:
- 5672:5672
- 8161:8161
environment:
ACTIVEMQ_CONNECTION_USER: admin
ACTIVEMQ_CONNECTION_PASSWORD: admin
ACTIVEMQ_WEB_USER: admin
ACTIVEMQ_WEB_PASSWORD: admin
volumes:
- ./local-infra/activemq/activemq.xml:/opt/apache-activemq/conf/activemq.xml
- ./local-infra/activemq/data:/opt/apache-activemq/data
Agora, este trecho também é algo valioso, porque tive que pesquisar bastante para conseguir isso (e talvez ainda esteja faltando algo).
Além disso, cuidado com o seu arquivo activemq.xml, o arquivo de configuração que a AWS fornece não corresponde ao que a imagem do Docker espera. No meu caso, tive que modificar a configuração do broker para habilitar o agendamento/adiamento de mensagens, então acessei o container, copiei o conteúdo do activemq.xml, coloquei no meu arquivo activemq.xml e depois o modifiquei conforme necessário.
Integrando com Nest.js
Vou assumir aqui que você já tem um projeto Nest.js configurado e que também já instalou as bibliotecas mencionadas na stack acima.
Em qual pasta ou estrutura você implementará os arquivos a seguir não importa muito, mas recomendo encapsular os arquivos em uma pasta chamada activemq, talvez.
PS: Você verá que configuraremos uma conexão durável, que permanece aberta o tempo todo. Esse não é o comportamento padrão do rhea, mas acho que será o que a maioria das pessoas precisará.
O ActiveMqModule
Se você está familiarizado com o Nest.js, sabe que este é o nosso arquivo wrapper/raiz para esta implementação especificamente, e talvez você também ache estranho que ele seja muito anêmico:
// ./activemq/activemq.module.ts
import { DiscoveryModule } from '@golevelup/nestjs-discovery';
import { Module } from '@nestjs/common';
import { ActiveMqService } from './activemq.service';
@Module({
imports: [DiscoveryModule],
providers: [ActiveMqService],
exports: [ActiveMqService],
})
export class ActiveMqModule {}
Em alguns casos, você veria muito mais código aqui com o uso de factories nos providers e certamente o handler de bootstrap do modulo.
Mas, neste caso, preferi implementar isso no ActiveMqService. Sinta-se à vontade para mudar, se achar necessário.
O ActiveMqConfig
A declaração de todos os possíveis receptores e remetentes da nossa aplicação:
// ./activemq/activemq.config.ts
export const activeMqReceivers =
exampleTopic: {
name: 'example-topic.receiver',
source: {
// A topic declaration
address: 'topic://example.topic',
durable: 2,
expiry_policy: 'never',
},
},
exampleQueue: {
name: 'example-queue.receiver',
source: {
// A queue declaration
address: 'example.queue',
durable: 2,
expiry_policy: 'never',
},
},
};
export const activeMqSenders = {
exampleTopic: {
name: 'example-topic.sender',
target: {
address: 'topic://example.topic',
durable: 2,
expiry_policy: 'never',
},
},
exampleQueue: {
name: 'example-queue.sender',
target: {
address: 'example.queue',
durable: 2,
expiry_policy: 'never',
},
},
};
O Decorator ActiveMqReceiverHandler
Isso marcará as funções que queremos que sejam executadas quando uma mensagem chegar em um determinado receptor:
// ./activemq/receiver-handler.decorator.ts
import { SetMetadata, applyDecorators } from '@nestjs/common';
import { activeMqReceivers } from './activemq.config';
export interface ActiveMqReceiverHandlerOptions {
receiverKey: keyof typeof activeMqReceivers;
}
export const ACTIVE_MQ_RECEIVER_HANDLER = Symbol('ACTIVE_MQ_RECEIVER_HANDLER');
export const ActiveMqReceiverHandler = (
options: ActiveMqReceiverHandlerOptions,
) => applyDecorators(SetMetadata(ACTIVE_MQ_RECEIVER_HANDLER, options));
O ActiveMqService
Aqui é onde estará a maior parte da nossa implementação:
// ./activemq/activemq.service.ts
import { DiscoveryService } from '@golevelup/nestjs-discovery';
import {
Injectable,
Logger,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { create_container, Connection, Sender } from 'rhea';
import { activeMqReceivers, activeMqSenders } from './activemq.config';
import {
ACTIVE_MQ_RECEIVER_HANDLER,
ActiveMqReceiverHandlerOptions,
} from './receiver-handler.decorator';
@Injectable()
export class ActiveMqService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(ActiveMqService.name);
private connection: Connection | null = null;
private senders: Record<string, Sender> = {};
private receiverHandlers: Record<
string,
(data: Record<string, any>) => void
> = {};
constructor(
private config: ConfigService,
private discover: DiscoveryService,
) {}
async onModuleInit() {
this.connect();
await this.bindReceiverHandlers();
}
async onModuleDestroy() {
this.close();
}
private connect() {
try {
this.logger.log('Connecting to ActiveMQ...');
const containerId = `server_${Math.random().toString(16).slice(3)}`;
const container = create_container({ id: containerId });
container.on('error', (error) => {
this.logger.error('General error', error);
});
container.on('connection_error', (error) => {
this.logger.error('Connection error', error);
});
container.on('connection_open', () => {
this.logger.log('Connection opened');
});
container.on('message', (context) => {
this.logger.log(
`Received message on ${context.receiver.name} ~ ${context.message.body}`,
);
if (context.message.body === 'detach') {
// detaching leaves the subscription active, so messages sent
// while detached are kept until we attach again
context.receiver.detach();
context.connection.close();
} else if (context.message.body === 'close') {
// closing cancels the subscription
context.receiver.close();
context.connection.close();
} else {
const bodyObject = JSON.parse(context.message.body);
const handler = this.receiverHandlers[context.receiver.name];
if (handler) {
handler(bodyObject);
}
}
});
container.on('sendable', (context) => {
this.senders[context.sender.name] = context.sender;
});
this.connection = container.connect({
transport: this.config.getOrThrow<any>('activemq.transport'),
host: this.config.getOrThrow('activemq.host'),
port: this.config.getOrThrow('activemq.port'),
username: this.config.getOrThrow('activemq.user'),
password: this.config.getOrThrow('activemq.password'),
reconnect: 10000, // 10s between reconnects
});
Object.values(activeMqSenders).forEach((sender) => {
this.connection?.open_sender(sender);
});
} catch (error) {
this.logger.error('Error connecting', error);
}
}
private close() {
this.logger.log('Closing connection...');
this.connection?.close();
}
/** Find methods marked by `@ActiveMqReceiverHandler`, set the respective handler and opens it */
private async bindReceiverHandlers() {
const receiversMeta =
await this.discover.providerMethodsWithMetaAtKey<ActiveMqReceiverHandlerOptions>(
ACTIVE_MQ_RECEIVER_HANDLER,
);
receiversMeta.forEach(({ meta, discoveredMethod }) => {
const receiverConfig = activeMqReceivers[meta.receiverKey];
this.logger.log(
`Binding handler ${discoveredMethod.parentClass.name}.${discoveredMethod.methodName} to receiver ${receiverConfig.name}`,
);
this.receiverHandlers[receiverConfig.name] =
discoveredMethod.handler.bind(discoveredMethod.parentClass.instance);
// When open the receiver only after its handler is set
this.connection?.open_receiver(receiverConfig);
});
}
send(senderKey: keyof typeof activeMqSenders, data: Record<string, any>) {
const senderConfig = activeMqSenders[senderKey];
this.logger.log('Sending message...', senderConfig.name, data);
const sender = this.senders[senderConfig.name];
if (sender) {
sender.send({
body: JSON.stringify(data),
});
}
}
}
Explicando brevemente o que acontece aqui:
- Por padrão, o Nest.js criará apenas uma instância de ActiveMqModule e ActiveMqService, então usamos os handlers the life-cycle onModuleInit e onModuleDestroy para configurar a conexão.
- Após iniciar a conexão, abrimos os remetentes declarados em activemq.config.ts.
- Em sequência, bindReceiverHandlers() encontrará os métodos marcados com nosso decorador, registrará eles em nossos manipuladores locais e abrirá o receptor respectivo. Dessa forma, só receberemos novas mensagens se houver um manipulador configurado para o receptor dado.
- Com tudo isso, podemos expor um método send() mais simples para enviar mensagens para remetentes específicos.
Declarando Publishers e Subscribers
Agora, vamos usar isso em nossa aplicação. Vou assumir uma pasta chamada publisher e outra chamada subscriber, por exemplo.
O Publisher
Só precisamos declarar um módulo que importe nosso ActiveMqModule:
// ./publisher/publisher.module.ts
import { Module } from '@nestjs/common';
import { ActiveMqModule } from './activemq/activemq.module';
import { PublisherService } from './publisher.service';
@Module({
imports: [ActiveMqModule],
providers: [PublisherService],
// Exporting so you can use it in another service if needed
exports: [PublisherService],
})
export class PublisherModule {}
E então usar o ActiveMqService para enviar uma mensagem para algum sender configurado
// ./publisher/publisher.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { ActiveMqService } from './activemq/activemq.service';
@Injectable()
export class PublisherService {
private logger = new Logger(PublisherService.name);
constructor(private readonly activeMqService: ActiveMqService) {}
exampleEvent(msg: any) {
try {
this.activeMqService.send('exampleQueue', msg);
} catch (error) {
this.logger.error(`exampleEvent~ ${error.message}`);
}
}
}
O Subscriber
Neste caso, temos que usar nosso decorator para marcar qualquer método de algum service como um manipulador de um receiver específico
// ./subscriber/subscriber.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { ActiveMqReceiverHandler } from './activemq/receiver-handler.decorator';
@Injectable()
export class SubscriberService {
private logger = new Logger(SubscriberService.name);
@ActiveMqReceiverHandler({
receiverKey: 'exampleQueue',
})
async onExampleEvent(msg: any) {
// Be sure to wrap with try-catch to avoid unexpected errors
try {
this.logger.log(`Message received`, msg);
} catch (error) {
this.logger.error(error.message);
}
}
}
Tudo Pronto!
E isso seria tudo para continuar mantendo a simplicidade. Claro, será sensato modificar as implementações para o seu caso de uso. Mas espero que este post tenha ajudado você de alguma forma.
Obrigado por ler até aqui.