Wie kann ich manuell das Publishing von Messages an den RabbitMQ Broker testen, wenn ich mein RabbitTemplate1 wie folgt dargestellt für Reliable Messaging konfiguriert habe?

new CachingConnectionFactory(
    //...
    publisherConfirms: true,
    publisherReturns: true)

new RabbitTemplate(
    //...
    mandatory: true)

Was macht mandatory und publisherReturns?

Die Kombination aus mandatory=true und publisherReturns=true sorgt dafür, dass geprüft wird, ob die gesendete Message an eine Queue weitergeleitet wird. Ist dies nicht der Fall, wird die Message per return an den Aufrufer zurückgesendet.

To ensure messages are routed to a single known queue, the producer can just declare a destination queue and publish directly to it. If messages may be routed in more complex ways but the producer still needs to know if they reached at least one queue, it can set the mandatory flag on a basic.publish, ensuring that a basic.return (containing a reply code and some textual explanation) will be sent back to the client if no queues were appropriately bound.

Quelle: https://www.rabbitmq.com/reliability.html

Der Aufrufer muss für den Fall des return einen ReturnCallback am RabbitTemplate registrieren und erhält über diese Implementierung jede nicht routebare Nachricht und kann dann entsprechend darauf reagieren.

Wie kann ich das testen?

Um dieses Verhalten zu testen genügt es also eine Message an einen nicht vorhandenen Exchange oder einen Exchange ohne definierte Bindings zu senden.

Dabei ist zu beachten, dass z.B durch Policies auch ein alternativer Exchange definiert sein kann, wodurch die Message dorthin geroutet wird und es nicht zu einem return kommt.

Was bewirken publisher confirms?

Publisher confirms sind eine Erweiterung in RabbitMQ und stellen sicher, dass eine Message in eine Queue erfolgreich zugestellt wird (leichtgewichtige Alternative zu Transactions).

Once a channel is in confirm mode, both the broker and the client count messages (counting starts at 1 on the first confirm.select). The broker then confirms messages as it handles them by sending a basic.ack on the same channel. The delivery-tag field contains the sequence number of the confirmed message. The broker may also set the multiple field in basic.ack to indicate that all messages up to and including the one with the sequence number have been handled. In exceptional cases when the broker is unable to handle messages successfully, instead of a basic.ack, the broker will send a basic.nack. In this context, fields of the basic.nack have the same meaning as the corresponding ones in basic.ack and the requeue field should be ignored. By nack’ing one or more messages, the broker indicates that it was unable to process the messages and refuses responsibility for them; at that point, the client may choose to re-publish the messages.

Quelle: https://www.rabbitmq.com/confirms.html

Der Aufrufer muss für den Fall des confirms einen ConfirmCallback am RabbitTemplate registrieren und erhält über diese Implementierung für jede Nachricht ein ack oder nack und kann dann entsprechend darauf reagieren.

Eine einfache Variante ist es, auf die Bestätigung für alle versendeten Nachrichten mittels waitForConfirmsOrDie2 wie im folgendem Beispiel zu warten.

rabbitTemplate.invoke({ RabbitOperations operations ->
  hochzuladeneDokumente.each { dokument ->
    operations.convertAndSend(DOKUMENTE_UPLOAD_EXCHANGE, routingKey, dokument, messagePostProcessor)
  }
  operations.waitForConfirmsOrDie(WAIT_TIMEOUT_IN_MS)
  return true
})

Wie kann ich das testen?

Um confirms zu testen, muss der Broker die Message in eine Queue weiterleiten können, diese dann jedoch ablehnen. Dies lässt sich erreichen, indem man z.B. per Policy die Länge einer Queue begrenzt (siehe unten) und dann mehrere Messages sendet.

max-length: 1
overflow: reject-publish

Fazit

Um Reliable Messaging korrekt umzusetzen muss die CachingConnectionFactory und das RabbitTemplate richtig konfiguriert werden. Weiterhin müssen confirms und returns über die ensprechenden Callbacks behandelt werden. Ob alles richtig funktioniert lässt sich dann einfach testen, indem man sich eine RabbitMQ Instanz wie oben beschrieben konfiguriert.


  1. org.springframework.amqp.rabbit.core.RabbitTemplate ↩︎

  2. verfügbar ab spring-amqp:2.0 ↩︎