# RabbitMQ

## RabbitMQ Channel Integration

The `@nestjstools/messaging-rabbitmq-extension` provides seamless integration with **RabbitMQ** for asynchronous and synchronous message processing in NestJS applications.

### 📦 Installation

Install both core messaging and the RabbitMQ extension:

```bash
npm install @nestjstools/messaging @nestjstools/messaging-rabbitmq-extension
# or
yarn add @nestjstools/messaging @nestjstools/messaging-rabbitmq-extension
```

***

### 🧩 Basic Configuration Example

```ts
import { MessagingModule } from '@nestjstools/messaging';
import { MessagingRabbitmqExtensionModule, RmqChannelConfig, ExchangeType } from '@nestjstools/messaging-rabbitmq-extension';
import { SendMessageHandler } from './handlers/send-message.handler';

@Module({
  imports: [
    MessagingRabbitmqExtensionModule,
    MessagingModule.forRoot({
      messageHandlers: [SendMessageHandler],
      buses: [
        { name: 'message.bus', channels: ['my-channel'] },
        { name: 'command-bus', channels: ['amqp-command'] },
        { name: 'event-bus', channels: ['amqp-event'] },
      ],
      channels: [
        new InMemoryChannelConfig({ name: 'my-channel' }),
        new RmqChannelConfig({
          name: 'amqp-command',
          connectionUri: 'amqp://guest:guest@localhost:5672/',
          exchangeName: 'my_app_command.exchange',
          exchangeType: ExchangeType.TOPIC,
          queue: 'my_app.command',
          bindingKeys: ['my_app.command.#'],
          autoCreate: true,
        }),
        new RmqChannelConfig({
          name: 'amqp-event',
          connectionUri: 'amqp://guest:guest@localhost:5672/',
          exchangeName: 'my_app_event.exchange',
          exchangeType: ExchangeType.TOPIC,
          queue: 'my_app.event',
          bindingKeys: ['my_app_event.#'],
          autoCreate: true,
          avoidErrorsForNotExistedHandlers: true,
        }),
      ],
      debug: true,
    }),
  ],
})
export class AppModule {}
```

***

### 🛠 Exchange Types

| Exchange Type | Description                                                                         |
| ------------- | ----------------------------------------------------------------------------------- |
| `TOPIC`       | Route messages using wildcard-based routing keys (`my_app.command.#`).              |
| `DIRECT`      | Exact routing match. You must define matching `bindingKeys`.                        |
| `FANOUT`      | Broadcasts messages to all queues bound to the exchange, regardless of routing key. |

***

### 🔁 Cross-Language Messaging

You can publish messages from other services (non-NestJS apps) by following these rules:

1. **Send a Message** to the appropriate queue.
2. **Set Header:** `messaging-routing-key` should match the handler:

```ts
@MessageHandler('my_app_command.create_user')
```

***

### 🪦 Dead Letter Queue (DLQ) – How It Works

When `deadLetterQueueFeature: true` is enabled on an `AmqpChannelConfig`, the system automatically handles **failed messages** by routing them to a **dedicated "dead letter" queue** instead of discarding them or causing application crashes.

#### Behavior:

1. **Message Handling Fails**\
   If a message handler **throws an unhandled exception**, the message is not acknowledged (`nack`) and is redirected to the **DLQ**.
2. **DLQ Naming Convention**\
   The DLQ is created automatically and typically named by appending `dead_letter_queue`to the original queue name.\
   Example:\
   If your queue is `my_app.command`, the dead letter queue will be `my_app.command.dead_letter_queue`.
3. **Message Retention**\
   Failed messages remain in the DLQ until manually processed, examined, or retried.
4. **Retry Strategy**\
   You can **manually re-publish** messages from the DLQ back to the original exchange with the same routing key (or via tooling or scripts) when you're ready to retry.

***

#### 🔁 Example Use Case:

```ts
new RmqChannelConfig({
  name: 'amqp-command',
  connectionUri: 'amqp://guest:guest@localhost:5672/',
  exchangeName: 'my_app_command.exchange',
  exchangeType: ExchangeType.TOPIC,
  queue: 'my_app.command',
  bindingKeys: ['my_app.command.#'],
  autoCreate: true,
  deadLetterQueueFeature: true, // ✅ Enable DLQ
});
```

### 🔧 Configuration Table: `AmqpChannelConfig`

| Property                           | Description                                                                                                                                                                                                                                         | Default      |
| ---------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------ |
| `name`                             | Name of the channel (e.g., `'amqp-command'`).                                                                                                                                                                                                       | *(required)* |
| `connectionUri`                    | RabbitMQ connection URI (e.g., `'amqp://guest:guest@localhost:5672/'`).                                                                                                                                                                             | *(required)* |
| `exchangeName`                     | Exchange name in RabbitMQ.                                                                                                                                                                                                                          | *(required)* |
| `bindingKeys`                      | Routing keys for queue bindings (e.g., `['my_app.command.#']`).                                                                                                                                                                                     | `[]`         |
| `exchangeType`                     | Type of RabbitMQ exchange (`TOPIC`, `FANOUT`, `DIRECT`).                                                                                                                                                                                            | *(required)* |
| `queue`                            | Name of the queue to consume from.                                                                                                                                                                                                                  | *(required)* |
| `autoCreate`                       | Automatically create exchanges, queues, and bindings if missing.                                                                                                                                                                                    | `true`       |
| `enableConsumer`                   | Enable message consumption from this channel.                                                                                                                                                                                                       | `true`       |
| `enableWorker`                     | Enables the internal worker that processes messages. If `false`, messages are ignored.                                                                                                                                                              | `true`       |
| `avoidErrorsForNotExistedHandlers` | Skip errors when no handler exists for a routed message. Useful for optional event handlers.                                                                                                                                                        | `false`      |
| `middlewares`                      | Middleware pipeline for pre-processing messages.                                                                                                                                                                                                    | `[]`         |
| `normalizer`                       | Attach a normalizer for custom serialization (e.g., Protobuf, Base64).                                                                                                                                                                              | `undefined`  |
| `deadLetterQueueFeature`           | Enables capturing failed messages into a DLQ.                                                                                                                                                                                                       | `false`      |
| **`retryMessage`**                 | Number of times to retry a message before sending it to the dead letter queue. Only applicable if `deadLetterQueueFeature` is enabled.                                                                                                              |              |
| **`retryMessageTtl`**              | Time to live for retry messages in milliseconds. After this time, messages will be moved from the retry queue back to the main exchange for reprocessing. Only applicable if `retryMessage` is set.                                                 | `1000`       |
| **`forceRecreateRetryQueue`**      | Whether to forcefully recreate the retry queue on application startup. This can be useful during development to ensure a clean state, but should be used with caution in production environments as it will delete all messages in the retry queue. |              |

***

### ✉️ Custom Routing with AmqpMessageOptions

You can customize routing at dispatch time:

```ts
this.messageBus.dispatch(
  new RoutingMessage(
    new SendMessage('Hello Rabbit!'),
    'app.command.execute',
    new AmqpMessageOptions('exchange_name', 'rabbitmq_routing_key_to_queue')
  ),
);
```

### Mapping Messages in RabbitMQ Channels

RabbitMQ uses different **exchange types** to route messages based on routing keys and bindings. Here’s how message routing works for each exchange type in the context of messaging channels:

#### Topic Exchange

Topic exchanges route messages based on pattern-matching in the routing key.

* Use **wildcards** like `#` and `*` in your **binding keys** for flexible routing.
* **Example**: If you bind your queue with `my_app.command.#`, messages with routing keys such as `my_app.command.user.create` or `my_app.command.system.shutdown` will be routed to that queue.
* ✅ This is ideal for structured, hierarchical routing across many message types.

#### Direct Exchange

Direct exchanges use **exact matching** between the routing key and the binding key.

* Ensure that your queue has binding keys explicitly defined.
* If no binding key is provided, RabbitMQ defaults to the **routing key specified in the message handler**.
* Use this when you need precise, one-to-one message routing.

#### Fanout Exchange

Fanout exchanges **broadcast** messages to **all queues bound to the exchange**, **ignoring routing keys** entirely.

* Every bound queue receives the message.
* Best used for scenarios like logging, notifications, or pub-sub events where all consumers should receive the message.

### Retry messages flow

```
          publish (routingKey = "orders.created")
Producer  --------------------------------------------+
                                                       |
                                                       v
                                             +-------------------+
                                             | your.exchange    |
                                             | (topic/direct)    |
                                             +-------------------+
                                                       |
                                                       | bind: "orders.created" (or "#")
                                                       v
                                             +-------------------+
                                             | your_delay_queue       |
                                             | x-message-ttl=3000|
                                             | x-dead-letter-    |
                                             |   exchange=main.ex|
                                             +-------------------+
                                                       |
                             (after 3s TTL expires)     |
                                                       v
                                             +-------------------+
                                             | your.exchange     |
                                             | (topic)           |
                                             +-------------------+
                                                       |
                                                       | bind patterns:
                                                       |  "orders.*" / "orders.#"
                                                       v
                                             +-------------------+
                                             | your.queue        |
                                             +-------------------+
                                                       |
                                                       v
                                                   Consumer

```
