Как ранее уже отмечалось, Akka позволяет не только создавать акторы , которые обмениваются между собой сообщениями, но и строить кластеры, состоящие из нескольких физических машин. При этом акторы, запущенные на разных машинах, все еще могут взаимодействовать друг с другом. Кроме того, Akka из коробки предоставляет ряд полезных при построении распределенных приложений примитивов. Например, возможность подписаться на события, происходящие с кластером, присваивать узлам роли или запустить актор-одиночку. Что, кстати, делает Akka намного интереснее других реализаций модели акторов ( Erlang , Cloud Haskell ). В этой заметке мы напишем очень простое приложение, использующее akka-cluster, а также ознакомимся с его поведением при различных условиях.
Исходники к заметке вы найдете в этом репозитории .
Файл build.sbt:
version := «0.1»
scalaVersion := «2.11.4»
val akkaVersion = «2.3.8»
libraryDependencies ++ = Seq (
«com.typesafe.akka» %% «akka-actor» % akkaVersion,
«com.typesafe.akka» %% «akka-cluster» % akkaVersion
)
resolvers + = «Akka Snapshots» at «http://repo.akka.io/snapshots/»
Ничего особенного, к проекту просто подключаются пакеты akka-actor и akka-cluster.
Файл application.conf:
actor {
provider = «akka.cluster.ClusterActorRefProvider»
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = «127.0.0.1»
port = 2551
}
}
cluster {
seed-nodes = [
«akka.tcp://system@127.0.0.1:2551»,
«akka.tcp://system@127.0.0.1:2552»]
}
}
Значение akka.actor.provider заменяется, так как теперь мы с вами пишем распределенное приложение. Задается интерфейс и порт, на которых будет висеть actor system. Наконец, задается список seed nodes. Этот список используется при присоединении узла к кластеру. Seed nodes не являются единой точкой отказа или чем-то в этом роде. Просто одна из seed nodes должна быть онлайн в момент запуска узла, чтобы он успешно присоединился к кластеру. Seed nodes можно задавать не только при помощи конфига, но и при помощи аргументов командной строки:
-Dakka.cluster.seed-nodes.1=akka.tcp://system@127.0.0.1:2552 …
Наконец, код самого приложения:
import akka. cluster . ClusterEvent . _
import akka. actor . _
import akka. event . _
import akka. cluster . _
class ClusterListener extends Actor with ActorLogging {
val cluster = Cluster ( context. system )
// subscribe to cluster changes, re-subscribe when restart
override def preStart ( ) {
cluster. subscribe ( self, InitialStateAsEvents, classOf [ MemberEvent ] ,
classOf [ UnreachableMember ] )
}
override def postStop ( ) {
cluster. unsubscribe ( self )
}
def receive = LoggingReceive {
case MemberUp ( member ) =>
log. info ( s «[Listener] node is up: $member» )
case UnreachableMember ( member ) =>
log. info ( s «[Listener] node is unreachable: $member» )
case MemberRemoved ( member, prevStatus ) =>
log. info ( s «[Listener] node is removed: $member» )
case ev : MemberEvent =>
log. info ( s «[Listener] event: $ev» )
}
}
object AkkaClusterExample extends App {
val system = ActorSystem ( «system» )
system. actorOf ( Props [ ClusterListener ] , «clusterListener» )
system. awaitTermination ( )
}
Здесь мы создаем единственный актор, который подписывается на все события, происходящие с кластером. Эти события, как можно было легко догадаться, будут приходить актору в виде сообщений. Давайте запустим наше приложение и посмотрим, что это будут за сообщения:
[Remoting] Remoting started; listening on addresses: [akka.tcp://
system@127.0.0.1:2551]
…
Node [akka.tcp://system@127.0.0.1:2551] is JOINING, roles []
Leader is moving node [akka.tcp://system@127.0.0.1:2551] to [Up]
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:2551, status = Up)
Как видите, было получено единственное сообщение — MemberUp, что не особо удивительно.
Теперь, не останавливая приложение, отредактируем application.conf:
hostname = «127.0.0.1»
port = 0 # 2551
}
Здесь port = 0
означает, что порт будет выбираться случайным образом. Запустим второй экземпляр приложения. В логах первого экземпляра увидим:
Leader is moving node [akka.tcp://system@127.0.0.1:4982] to [Up]
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:4982, status = Up)
В логах второго:
[Remoting] Remoting started; listening on addresses :[akka.tcp://
system@127.0.0.1:4982]
…
Cluster Node [akka.tcp://system@127.0.0.1:4982] — Welcome from
[akka.tcp://system@127.0.0.1:2551]
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:2551, status = Up)
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:4982, status = Up)
Теперь остановим второй экземпляр. В логах первого увидим:
127.0.0.1:4982, status = Up)
Теперь попробуем кое-что новое. На сайте akka.io можно скачать так называемый akka distribution, это архив с именем вроде akka_2.11-2.3.8.zip. Помимо прочего в этом архиве можно найти утилиту akka-cluster, позволяющую получать различную информацию о кластере и управлять им при помощи JMX . Чтобы все работало, утилите нужно сделать chmod u+x
, а также добавить путь до нее в $PATH. Кроме того, приложение должно быть запущено примерно с такими опциями:
-Dcom.sun.management.jmxremote.authenticate= false
-Dcom.sun.management.jmxremote.ssl= false …
В IntelliJ IDEA все перечисленные опции можно прописать в Run → Edit Configurations… → AkkaClusterExample → Configuration → VM Options. Если теперь вернуть конфиг в изначальное состояние, запустить ноду и сказать:
… то вы увидите что-то вроде:
{
«self-address»: «akka.tcp://system@127.0.0.1:2551»,
«members»: [
{
«address»: «akka.tcp://system@127.0.0.1:2551»,
«status»: «Up»
}
],
«unreachable»: [
]
}
После запуска второй ноды, как мы это делали в прошлый раз (номер порта JMX также придется поменять):
{
«self-address»: «akka.tcp://system@127.0.0.1:2551»,
«members»: [
{
«address»: «akka.tcp://system@127.0.0.1:2551»,
«status»: «Up»
},
{
«address»: «akka.tcp://system@127.0.0.1:49603»,
«status»: «Up»
}
],
«unreachable»: [
]
}
После остановки второй ноды:
{
«self-address»: «akka.tcp://system@127.0.0.1:2551»,
«members»: [
{
«address»: «akka.tcp://system@127.0.0.1:2551»,
«status»: «Up»
},
{
«address»: «akka.tcp://system@127.0.0.1:49603»,
«status»: «Up»
}
],
«unreachable»: [
{
«node»: «akka.tcp://system@127.0.0.1:49603»,
«observed-by»: [
«akka.tcp://system@127.0.0.1:2551»
]
}
]
}
Если теперь заглянуть в логи первой ноды, то вы увидите, как туда сыпятся ошибки вроде следующих:
failed, address is now gated for [5000] ms. Reason is: [Association
failed with [akka.tcp://system@127.0.0.1:49603]].
По умолчанию Akka сама не помечает ноды, как упавшие (down). Узлы всего-навсего помечаются, как недоступные (unreachable). Если теперь снова поднять второй узел, в его логах вы увидите:
failed, address is now gated for [5000] ms. Reason is: [Association
failed with [akka.tcp://system@127.0.0.1:49603]].
Вы не сможете добавить новые узлы в кластер, до тех пор, пока в кластере есть недоступные узлы. Казалось бы, чтобы решить проблему, просто запустим второй узел с тем же номером порта, что был использован в первый раз, а именно 49603. Как вы помните, номер порта можно задать явно, отредактировав application.conf. Однако это не сработает. В логах первой ноды будет сказано, почему:
414446918)] is trying to join, ignoring
Что мы на самом деле должны сделать — это вручную убрать ноду из кластера:
akka-cluster 127.0.0.1 9999 cluster-status
Что увидим:
{
«self-address»: «akka.tcp://system@127.0.0.1:2551»,
«members»: [
{
«address»: «akka.tcp://system@127.0.0.1:2551»,
«status»: «Up»
},
{
«address»: «akka.tcp://system@127.0.0.1:47064»,
«status»: «Up»
}
],
«unreachable»: [
{
«node»: «akka.tcp://system@127.0.0.1:47064»,
«observed-by»: [
«akka.tcp://system@127.0.0.1:2551»
]
}
]
}
У меня узел с портом 47064 оказался в списке unreachable, так как я уже остановил узел. Соответственно, его также следует вручную убрать из кластера. Если же вы не останавливали узел, у вас он, скорее всего, вполне успешно присоединится к кластеру.
Как вы догадываетесь, вручную убирать ноды из кластера не всегда удобно. Исправить ситуацию можно, добавив в application.conf после списка seed nodes опцию:
Если узел будет недоступен в течение заданного количества секунд, он автоматически будет удален из кластера. Можете повторить все проделанные нами ранее эксперименты с включенной опцией в качестве домашнего задания. Следует однако отметить, что бездумно включать эту опцию не следует, так как в ряде случаев она может привести к нежелательному поведению кластера, например, при нетсплитах . К сожалению, флага «сделать мне офигительно» в Akka не предусмотрено. Вам предлагается набор примитивов с некой семантикой, и только. Как правильно использовать эти примитивы, зависит от конкретного приложения.
Подробности о работе Akka Cluster вы найдете на страницах Cluster Specification и Cluster Usage официальной документации по Akka. Эта документация классная, не поленитесь с ней ознакомиться 🙂
А используете ли вы Akka Cluster и если да, то какие задачи с его помощью решаете?
Дополнение: Пример использования акторов-одиночек в Akka