Как ранее уже отмечалось, Akka позволяет не только создавать акторы , которые обмениваются между собой сообщениями, но и строить кластеры, состоящие из нескольких физических машин. При этом акторы, запущенные на разных машинах, все еще могут взаимодействовать друг с другом. Кроме того, Akka из коробки предоставляет ряд полезных при построении распределенных приложений примитивов. Например, возможность подписаться на события, происходящие с кластером, присваивать узлам роли или запустить актор-одиночку. Что, кстати, делает Akka намного интереснее других реализаций модели акторов ( Erlang , Cloud Haskell ). В этой заметке мы напишем очень простое приложение, использующее akka-cluster, а также ознакомимся с его поведением при различных условиях.

Исходники к заметке вы найдете в этом репозитории .

Файл build.sbt:

name := «akka-cluster-example»

version := «0.1»

scalaVersion := «2.11.4»

val akkaVersion = «2.3.8»

libraryDependencies ++ = Seq (
«com.typesafe.akka» %% «akka-actor» % akkaVersion,
«com.typesafe.akka» %% «akka-cluster» % akkaVersion
)

resolvers + = «Akka Snapshots» at «http://repo.akka.io/snapshots/»

Ничего особенного, к проекту просто подключаются пакеты akka-actor и akka-cluster.

Файл application.conf:

akka {
actor {
provider = «akka.cluster.ClusterActorRefProvider»
}

remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = «127.0.0.1»
port = 2551
}
}

cluster {
seed-nodes = [
«akka.tcp://system@127.0.0.1:2551»,
«akka.tcp://system@127.0.0.1:2552»]
}
}

Значение akka.actor.provider заменяется, так как теперь мы с вами пишем распределенное приложение. Задается интерфейс и порт, на которых будет висеть actor system. Наконец, задается список seed nodes. Этот список используется при присоединении узла к кластеру. Seed nodes не являются единой точкой отказа или чем-то в этом роде. Просто одна из seed nodes должна быть онлайн в момент запуска узла, чтобы он успешно присоединился к кластеру. Seed nodes можно задавать не только при помощи конфига, но и при помощи аргументов командной строки:

java -Dakka.cluster.seed-nodes.0=akka.tcp://system@127.0.0.1:2551
-Dakka.cluster.seed-nodes.1=akka.tcp://system@127.0.0.1:2552 …

Наконец, код самого приложения:

package me. eax . akka_examples

import akka. cluster . ClusterEvent . _
import akka. actor . _
import akka. event . _
import akka. cluster . _

class ClusterListener extends Actor with ActorLogging {

val cluster = Cluster ( context. system )

// subscribe to cluster changes, re-subscribe when restart
override def preStart ( ) {
cluster. subscribe ( self, InitialStateAsEvents, classOf [ MemberEvent ] ,
classOf [ UnreachableMember ] )
}

override def postStop ( ) {
cluster. unsubscribe ( self )
}

def receive = LoggingReceive {
case MemberUp ( member ) =>
log. info ( s «[Listener] node is up: $member» )

case UnreachableMember ( member ) =>
log. info ( s «[Listener] node is unreachable: $member» )

case MemberRemoved ( member, prevStatus ) =>
log. info ( s «[Listener] node is removed: $member» )

case ev : MemberEvent =>
log. info ( s «[Listener] event: $ev» )
}
}

object AkkaClusterExample extends App {
val system = ActorSystem ( «system» )
system. actorOf ( Props [ ClusterListener ] , «clusterListener» )
system. awaitTermination ( )
}

Здесь мы создаем единственный актор, который подписывается на все события, происходящие с кластером. Эти события, как можно было легко догадаться, будут приходить актору в виде сообщений. Давайте запустим наше приложение и посмотрим, что это будут за сообщения:

[Remoting] Starting remoting
[Remoting] Remoting started; listening on addresses: [akka.tcp://
system@127.0.0.1:2551]

Node [akka.tcp://system@127.0.0.1:2551] is JOINING, roles []
Leader is moving node [akka.tcp://system@127.0.0.1:2551] to [Up]
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:2551, status = Up)

Как видите, было получено единственное сообщение — MemberUp, что не особо удивительно.

Теперь, не останавливая приложение, отредактируем application.conf:

netty.tcp {
hostname = «127.0.0.1»
port = 0 # 2551
}

Здесь port = 0 означает, что порт будет выбираться случайным образом. Запустим второй экземпляр приложения. В логах первого экземпляра увидим:

Node [akka.tcp://system@127.0.0.1:4982] is JOINING, roles []
Leader is moving node [akka.tcp://system@127.0.0.1:4982] to [Up]
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:4982, status = Up)

В логах второго:

[Remoting] Starting remoting
[Remoting] Remoting started; listening on addresses :[akka.tcp://
system@127.0.0.1:4982]

Cluster Node [akka.tcp://system@127.0.0.1:4982] — Welcome from
[akka.tcp://system@127.0.0.1:2551]
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:2551, status = Up)
[Listener] node is up: Member(address = akka.tcp://system@
127.0.0.1:4982, status = Up)

Теперь остановим второй экземпляр. В логах первого увидим:

[Listener] node is unreachable: Member(address = akka.tcp://system@
127.0.0.1:4982, status = Up)

Теперь попробуем кое-что новое. На сайте akka.io можно скачать так называемый akka distribution, это архив с именем вроде akka_2.11-2.3.8.zip. Помимо прочего в этом архиве можно найти утилиту akka-cluster, позволяющую получать различную информацию о кластере и управлять им при помощи JMX . Чтобы все работало, утилите нужно сделать chmod u+x , а также добавить путь до нее в $PATH. Кроме того, приложение должно быть запущено примерно с такими опциями:

java -Dcom.sun.management.jmxremote.port= 9999
-Dcom.sun.management.jmxremote.authenticate= false
-Dcom.sun.management.jmxremote.ssl= false

В IntelliJ IDEA все перечисленные опции можно прописать в Run → Edit Configurations… → AkkaClusterExample → Configuration → VM Options. Если теперь вернуть конфиг в изначальное состояние, запустить ноду и сказать:

akka-cluster 127.0.0.1 9999 cluster-status

… то вы увидите что-то вроде:

Querying cluster status
{
«self-address»: «akka.tcp://system@127.0.0.1:2551»,
«members»: [
{
«address»: «akka.tcp://system@127.0.0.1:2551»,
«status»: «Up»
}
],
«unreachable»: [

]
}

После запуска второй ноды, как мы это делали в прошлый раз (номер порта JMX также придется поменять):

Querying cluster status
{
«self-address»: «akka.tcp://system@127.0.0.1:2551»,
«members»: [
{
«address»: «akka.tcp://system@127.0.0.1:2551»,
«status»: «Up»
},
{
«address»: «akka.tcp://system@127.0.0.1:49603»,
«status»: «Up»
}
],
«unreachable»: [

]
}

После остановки второй ноды:

Querying cluster status
{
«self-address»: «akka.tcp://system@127.0.0.1:2551»,
«members»: [
{
«address»: «akka.tcp://system@127.0.0.1:2551»,
«status»: «Up»
},
{
«address»: «akka.tcp://system@127.0.0.1:49603»,
«status»: «Up»
}
],
«unreachable»: [
{
«node»: «akka.tcp://system@127.0.0.1:49603»,
«observed-by»: [
«akka.tcp://system@127.0.0.1:2551»
]
}
]
}

Если теперь заглянуть в логи первой ноды, то вы увидите, как туда сыпятся ошибки вроде следующих:

Association with remote system [akka.tcp://system@127.0.0.1:49603] has
failed, address is now gated for [5000] ms. Reason is: [Association
failed with [akka.tcp://system@127.0.0.1:49603]].

По умолчанию Akka сама не помечает ноды, как упавшие (down). Узлы всего-навсего помечаются, как недоступные (unreachable). Если теперь снова поднять второй узел, в его логах вы увидите:

Association with remote system [akka.tcp://system@127.0.0.1:49603] has
failed, address is now gated for [5000] ms. Reason is: [Association
failed with [akka.tcp://system@127.0.0.1:49603]].

Вы не сможете добавить новые узлы в кластер, до тех пор, пока в кластере есть недоступные узлы. Казалось бы, чтобы решить проблему, просто запустим второй узел с тем же номером порта, что был использован в первый раз, а именно 49603. Как вы помните, номер порта можно задать явно, отредактировав application.conf. Однако это не сработает. В логах первой ноды будет сказано, почему:

Existing member [UniqueAddress(akka.tcp://system@127.0.0.1:49603,
414446918)] is trying to join, ignoring

Что мы на самом деле должны сделать — это вручную убрать ноду из кластера:

akka-cluster 127.0.0.1 9999 down akka.tcp: // system @ 127.0.0.1: 49603
akka-cluster 127.0.0.1 9999 cluster-status

Что увидим:

Querying cluster status
{
«self-address»: «akka.tcp://system@127.0.0.1:2551»,
«members»: [
{
«address»: «akka.tcp://system@127.0.0.1:2551»,
«status»: «Up»
},
{
«address»: «akka.tcp://system@127.0.0.1:47064»,
«status»: «Up»
}
],
«unreachable»: [
{
«node»: «akka.tcp://system@127.0.0.1:47064»,
«observed-by»: [
«akka.tcp://system@127.0.0.1:2551»
]
}
]
}

У меня узел с портом 47064 оказался в списке unreachable, так как я уже остановил узел. Соответственно, его также следует вручную убрать из кластера. Если же вы не останавливали узел, у вас он, скорее всего, вполне успешно присоединится к кластеру.

Как вы догадываетесь, вручную убирать ноды из кластера не всегда удобно. Исправить ситуацию можно, добавив в application.conf после списка seed nodes опцию:

auto-down-unreachable-after = 10s

Если узел будет недоступен в течение заданного количества секунд, он автоматически будет удален из кластера. Можете повторить все проделанные нами ранее эксперименты с включенной опцией в качестве домашнего задания. Следует однако отметить, что бездумно включать эту опцию не следует, так как в ряде случаев она может привести к нежелательному поведению кластера, например, при нетсплитах . К сожалению, флага «сделать мне офигительно» в Akka не предусмотрено. Вам предлагается набор примитивов с некой семантикой, и только. Как правильно использовать эти примитивы, зависит от конкретного приложения.

Подробности о работе Akka Cluster вы найдете на страницах Cluster Specification и Cluster Usage официальной документации по Akka. Эта документация классная, не поленитесь с ней ознакомиться 🙂

А используете ли вы Akka Cluster и если да, то какие задачи с его помощью решаете?

Дополнение: Пример использования акторов-одиночек в Akka

EnglishRussianUkrainian