Сегодня мы рассмотрим простой пример организации pubsub поверх RabbitMQ . Как уже многократно отмечалось в предыдущих заметках, если вашим бэкендам нужно между собой как-то общаться, желательно взять готовую шину, а не писать по сути свою с нуля на каком-нибудь Akka Cluster . Если только, конечно, вы не пишите свой собственный RabbitMQ или там Spark.
По сути, вот что мы хотим получить:
trait PubSubClient {
def publish ( topic : String, msg : String ) : Future [ Unit ]
def subscribe ( topic : String, ref : ActorRef ) : Future [ Unit ]
def unsubscribe ( topic : String, ref : ActorRef ) : Future [ Unit ]
}
Есть топики и есть какие-то сообщения, которые мы публикуем в этих топиках. Акторы могут подписываться на топики и отписываться от них. При публикации сообщения в топике все подписанные на него акторы (в том числе акторы на других экземплярах бэкенда) получают PubSubMessage с этим сообщением.
Несмотря на простоту интерфейса, практика показывает, что с его помощью можно реализовать абсолютно все, что вы могли хотеть реализовать с использованием Akka Cluster. Гарантии на доставку сообщений будут ничем не хуже. Кроме того, приложение будет намного более предсказуемо вести себя при сетевых проблемах.
Переходим к реализации. Подцепляем к проекту клиент для RabbitMQ:
Поскольку этот пакет предоставляет блокирующий API, нам понадобится создать отдельный dispatcher, прописав в application.conf:
type = Dispatcher
executor = «fork-join-executor»
fork-join-executor {
parallelism-min = 1
parallelism-factor = 1.0
parallelism-max = 1
}
}
Соответственно, актор, реализующий весь pubsub, будет запускаться так:
Props ( new BroadcastPubSubClientActor ( /* … args … */ ) )
. withDispatcher ( «pubsub-actor-dispatcher» )
}
val pubSubActorRef = system. actorOf ( props, «pubSubClientActor» )
Основные моменты в реализации BroadcastPubSubClientActor следующие.
private var consumer : QueueingConsumer = _
private var pubsubQueueName : String = _
private var dummyQueueName : String = _
override def preStart ( ) : Unit = {
log. debug ( «preStart() called» )
context. self ! Connect
}
override def postRestart ( e : Throwable ) : Unit = {
log. warning ( s «Reconnecting in $reconnectTimeoutMs ms» )
context. system . scheduler . scheduleOnce (
reconnectTimeoutMs. millis , context. self , Connect
)
}
override def receive : Receive = {
case Connect =>
log. debug ( «Connecting…» )
val connFactory = new ConnectionFactory ( )
connFactory. setHost ( host )
connFactory. setPort ( port )
connFactory. setUsername ( login )
connFactory. setPassword ( password )
connFactory. setVirtualHost ( vhost )
val conn = connFactory. newConnection ( )
chan = conn. createChannel ( )
consumer = new QueueingConsumer ( chan )
dummyQueueName = chan. queueDeclare ( ) . getQueue
pubsubQueueName = chan. queueDeclare ( ) . getQueue
chan. basicConsume ( pubsubQueueName, true , consumer )
scheduleCheckDelivery ( )
context. become ( connected )
}
Тут все довольно просто. Устанавливается соединение с RabbitMQ. Создается новый канал. Внутри одного соединения к RabbitMQ может быть много каналов, каждый из которых имеет свои подписки. Далее создается новая очередь, которую актор будет цеплять к интересующим его exchange’ам и из которой будет получать интересующие его сообщения. Также создается dummy-очередь, которая понадобится чуть ниже. Затем актор переходит в состояние connected. Можете на досуге проверить, что в случае разрыва соединения с RabbitMQ актор действительно будет пытаться время от времени его восстановить. В том числе, после перезапуска актор действительно возвращается назад к состоянию receive.
case r : Publish => // PubSubClient.publish
val exchange = exchangeName ( r. topic )
val fullMsgJson = JArray ( List ( JString ( r. topic ) , JString ( r. msg ) ) )
val fullMsgStr = compact ( fullMsgJson )
declareExchange ( exchange )
chan. basicPublish ( exchange, r. topic , null ,
fullMsgStr. getBytes ( «UTF-8» ) )
sender ( ) ! { }
case r : Subscribe => // PubSubClient.subscribe
context. watch ( r. ref )
updateSubscribers ( r. topic , s => s + r. ref )
sender ( ) ! { }
case r : Unsubsribe => // PubSubClient.unsubscribe
context. unwatch ( r. ref )
updateSubscribers ( r. topic , s => s — r. ref )
sender ( ) ! { }
case r : Terminated =>
for ( topic < — subscriptions. keys ) {
updateSubscribers ( topic, s => s — r. actor )
}
Здесь происходит обработка основных сообщений. При получении Publish определяется имя соответствующего exchange’а и в него происходит публикация сообщения. Притом сообщение оборачивается в JSON и содержит внутри себя имя топика. Трудно сказать заранее, какая реализация будет лучше работать — с одним exchange’ом на все топики, по одному exchange’у на топик, или какой-то вариант посередине. Поэтому в данной реализации программист заранее задает желаемое количество exchange’ей, а отображение топика в exchange происходит по хэшу. Таким образом получаем общее решение.
При получении Subscribe актор делает watch подписчика и добавляет его в Map’у подписчиков на заданный топик. При получении Unsubsribe происходит обратное действие. Terminated приходит, если подписанный актор неожиданно умирает. В этом случае он отписывается от всех топиков.
val nextDelivery = Option ( consumer. nextDelivery ( waitTimeoutMs ) )
nextDelivery match {
case None => scheduleCheckDelivery ( )
case Some ( delivery ) =>
val fullMsg = new String ( delivery. getBody , «UTF-8» )
val JArray ( List ( JString ( topic ) , JString ( msg ) ) ) = parse ( fullMsg )
val pubSubMsg = PubSubMessage ( topic, msg )
for ( ref < — subscriptions. getOrElse ( topic, /* default */ ) . refs ) {
ref ! pubSubMsg
}
context. self ! CheckDelivery
}
Время от времени, например, раз в 10 мс, происходит проверка, нет ли в очереди новых сообщений. Если есть, из сообщения извлекается имя топика и происходит уведомление всех подписанных на этот топик акторов.
Интересно, как реализован метод declareExchange:
chan. exchangeDeclare ( exchange, «topic» , false , true , messageTtlArgs )
chan. queueBind ( dummyQueueName, exchange, «» , null )
}
private val messageTtlArgs : java. util . HashMap [ String, AnyRef ] = {
val messageTtlMs = 60000
val args = new java. util . HashMap [ String, AnyRef ] ( )
args. put ( «x-message-ttl» , new Integer ( messageTtlMs ) )
args
}
Во-первых, поскольку здесь мы используем RabbitMQ исключительно как in-memory шину, указывается максимальное время жизни всех сообщений, так как хранить их вечно нет смысла. Во-вторых, к новому excahnge’у тут сразу же bind’ится та самая dummy очередь. Дело в том, что здесь мы создаем самоуничтожающийся exchange. Если не осталось ни одной очереди, забайнженой к exchange’у, он будет удален. Но хотя бы одна очередь должна быть забайнжена, иначе удаления не произойдет. Вот для того, чтобы RabbitMQ гарантированно за нами все почистил, когда мы закроем соединение, и нужна dummy очередь. Наконец, в-третьих, что еще интересно, мы должны на всякий случай делать declareExchange каждый раз, когда хотим что-то сделать с exchange’ем (см обработку Publish выше), так как мы не знаем заранее, существует ли уже exchange, а если он и был кем-то когда-то создан, не был ли он удален в результате автоматической чистки или другим клиентом. Кстати, из тех же соображений клиенты должны время от времени делать вызов pubsub. subscribe
, а то вдруг сеть мигала и подписка уже потерялась.
Как видите, при использовании RabbitMQ нужно иметь ввиду некоторые нюансы его работы. Но в общем и целом, всем, кто работает с RabbitMQ, они известны и ничего супер сложного здесь нет. Полную версию исходного кода к этой заметке вы найдете в этом репозитории .
Вопросы, дополнения, а также любые мысли по теме и не очень, как обычно, приветствуются и могут быть оставлены в комментариях ниже.