Как нам с вами уже известно, Akka предоставляет множество богатых средств , существенно упрощающих разработку распределенных приложений. Сегодня мы познакомимся с одним таким средством, а именно — возможностью создавать акторы-одиночки, или синглтоны. Такие акторы присутствуют в кластере в единственном экземпляре. Также в случае падения узла, на котором крутится синглотн, этот синглтон будет перезапущен на другом узле кластера.
Все исходники к заметке вы найдете в этом репозитории .
Итак, в каких же случаях могут быть полезны акторы-синглтоны:
- Вам нужно координировать действия других акторов, например, раздавать задачи и агрегировать результаты их выполнения;
- Требуется выполнять какие-то действия по расписанию, например, строить отчеты по данным, накопленным за прошедший день, притом выполнять эти действия нужно в каком-то одном месте, а не на каждом узле в кластере;
- После раскладки приложения требуется прогнать миграцию схемы БД , что, опять таки, нежелательно делать на всех узлах одновременно;
- Просто бизнес-логика такая, например, нужно учитывать лимиты на число посланных SMS в единицу времени, что удобно делать в одном процессе;
Как вы можете помнить из предыдущей заметки про Akka Cluster, из коробки Akka предлагает нам два варианта на выбор. Когда узел кластера становится недоступен, он либо убирается из кластера вручную при помощи утилиты akka-cluster, либо автоматически после прошествия какого-то времени. Первый вариант не особо удобен, а при использовании второго в случае нетсплита в кластере может запуститься больше одного синглтона. Что необязательно критично в случае вашего конкретного приложения, но вряд ли является ожидаемым поведением в общем случае. Один из способов разрешить дилемму заключается в том, чтобы подписаться на события кластера и реализовать собственную логику. Выглядеть это может как-то так:
val minClusterSize = 2 // TODO: read from config!
val cluster = Cluster ( context. system )
var timerCancellable : Option [ Cancellable ] = None
case class CheckClusterSize ( msg : String )
def checkClusterSize ( msg : String ) {
val clusterSize = cluster. state . members . count { m =>
m. status == MemberStatus. Up ||
m. status == MemberStatus. Joining
}
log. info ( s «[Listener] event: $msg, cluster size: $clusterSize » +
s «(${cluster.state.members})» )
if ( clusterSize < minClusterSize ) {
log. info ( «[Listener] cluster size is less than » +
s «$minClusterSize, shutting down!» )
context. system . shutdown ( )
}
}
def scheduleClusterSizeCheck ( msg : String ) {
timerCancellable. foreach ( _ . cancel ( ) )
timerCancellable = Some (
context. system . scheduler . scheduleOnce (
1 . second , self, CheckClusterSize ( msg )
)
)
}
// subscribe to cluster changes, re-subscribe when restart
override def preStart ( ) {
log. info ( s «[Listener] started!» )
cluster. subscribe ( self, InitialStateAsEvents, classOf [ MemberEvent ] ,
classOf [ UnreachableMember ] )
}
override def postStop ( ) {
log. info ( «[Listener] stopped!» )
cluster. unsubscribe ( self )
}
def receive = LoggingReceive {
case msg : MemberEvent =>
scheduleClusterSizeCheck ( msg. toString )
case msg : UnreachableMember =>
scheduleClusterSizeCheck ( msg. toString )
case r : CheckClusterSize =>
checkClusterSize ( r. msg )
}
}
При этом секция akka.cluster в application.conf должна быть:
min-nr-of-members = 2
auto-down-unreachable-after = 10s
seed-nodes = [
«akka.tcp://system@127.0.0.1:2551»
]
}
Параметр min-nr-of-members = 2
означает , что узлы кластера будут висеть в состоянии Joining до тех пор, пока в кластере не появятся хотя бы два узла. После этого узлы изменят свое состояние на Up, и только тогда будут запускаться синглтоны. Однако не дайте имени этого параметра ввести вас в заблуждение. Он используется только при старте кластера. После этого в кластере может успешно работать как две и более нод, так и одна. В том числе из-за нетсплита кластер из трех узлов может рассыпаться на две части — с одним узлом и двумя. При этом в каждой части будет поднято по своей копии одного и того же синглтона.
На помощь приходит наш самопальный актор, подписанный на события кластера. Он может определить, что узел оказался в меньшей части кластера и в этом случае завершить приложение. Главное — каким-то образом поддерживать значение minClusterSize равное floor(N/2) + 1, где N — текущее число узлов в нашем кластере. Тут многое зависит от специфики вашего приложения, поэтому в приведенном примере значение просто захардкожено. Если ваше приложение работает в AWS и вы используете auto scaling groups, то размер кластера можете определять при помощи AWS SDK. Если же вы добавляете узлы вручную, то можете отправлять на несколько нод полный их список с текущим временем, а по кластеру этот список разлетится Gossip’ом.
После завершения ноды, оказавшейся в меньшинстве, она автоматически будет удалена из кластера благодаря уже знакомому нам параметру auto-down-unreachable-after = 10s
. К сожалению, аккуратная проверка того, что все это хозяйство действительно работает, потянет на отдельную заметку. Но для вас это не должно представлять большого труда, если вы умеете пользоваться Vagrant и iptables . Следует также отметить, что в случае нетсплита оказавшиеся в меньшинстве узлы не завершают работу моментально, а значит одновременно все равно может существовать два синглтона, но недолго. Если в вашем приложении это критично, вы можете добавить задержку между запуском синглтона и выполнением им какой-то реальной работы.
Оставшаяся часть приложения, где, собственно, и происходит работа с синглтонами:
case class GetTimeResponse ( time : Long )
class RemoteTimeActor extends Actor with ActorLogging {
override def preStart ( ) = {
log. info ( «[RemoteTimeActor] started!» )
}
def receive = LoggingReceive {
case GetTimeRequest =>
sender ! GetTimeResponse ( System. currentTimeMillis ( ) )
}
}
case object SyncTime
case object ScheduleSync
class LocalTimeActor extends Actor with ActorLogging {
val remoteTimeActor = context. system . actorOf (
ClusterSingletonProxy. props (
singletonPath = s «/user/$managerName/$remoteTimeActorName» ,
role = managerNodeRole
) ,
name = s «${remoteTimeActorName}Proxy»
)
val syncPeriod = 5 . seconds
var time = System. currentTimeMillis ( )
def scheduleSync ( ) = {
context. system . scheduler . scheduleOnce ( syncPeriod, self, SyncTime )
}
override def preStart ( ) = {
log. info ( «[LocalTimeActor] started!» )
scheduleSync ( )
}
// see http://remontka.com/akka-scheduler/
override def postRestart ( reason : Throwable ) = { }
def receive = LoggingReceive {
case SyncTime =>
val fResp = remoteTimeActor ? GetTimeRequest
fResp pipeTo self
fResp onComplete { case _ => self ! ScheduleSync }
case ScheduleSync =>
scheduleSync ( )
case GetTimeResponse ( remoteTime ) =>
log. info ( s «[LocalTimeActor] sync: $remoteTime» )
time = remoteTime
case GetTimeRequest =>
sender ! GetTimeResponse ( time )
}
}
object AkkaClusterSingletonExample extends App {
val system = ActorSystem ( «system» )
system. actorOf ( Props [ ClusterListener ] , name = «clusterListener» )
system. actorOf ( ClusterSingletonManager. props (
singletonProps = Props [ RemoteTimeActor ] ,
singletonName = remoteTimeActorName,
terminationMessage = PoisonPill,
role = managerNodeRole
) , name = managerName )
system. actorOf ( Props [ LocalTimeActor ] , name = «localTimeActor» )
system. awaitTermination ( )
}
Как видите, здесь решается довольно надуманная задача по синхронизации времени между узлами, притом с довольно большой погрешностью. Узел, на котором поднимается синглтон RemoteTimeActor, предоставляет всем желающим информацию о локальном времени, а локальные акторы LocalTimeActor периодически узнают время у синглтона и сообщают его всем заинтересованным локальным акторам. Отмечу, что пример демонстрационный, в реальных приложениях, скорее всего, вы не должны так синхронизировать время! Однако используя описанную технику можно вполне успешно синхронизировать, скажем, конфиги.
Запускаем первую ноду:
[LocalTimeActor] started!
Логов пишется много. В этой заметке я привожу только малую их часть.
Теперь запускаем вторую ноду, изменив номер порта в application.conf на нулевой:
[LocalTimeActor] started!
[Listener] event: MemberUp(Member(address = akka.tcp://system@127.0…
[LocalTimeActor] sync: 1422111122860
[LocalTimeActor] sync: 1422111127888
Тем временем в логах первой:
[RemoteTimeActor] started!
[LocalTimeActor] sync: 1422111121041
[LocalTimeActor] sync: 1422111126058
Как видите, синглтон был успешно запущен и ноды стали синхронизировать время. Запустите третью ноду и убедитесь, что она также успешно нашла синглтон и стала тянуть с него время.
Теперь остановим ноду, на которой был запущен синглтон. В моем случае это была первая нода. В логах второй:
[Listener] event: MemberRemoved(Member(address = akka.tcp://system@…
[RemoteTimeActor] started!
[LocalTimeActor] sync: 1422111218245
[LocalTimeActor] sync: 1422111223265
Как и ожидалось, синглтон мигрировал на другую ноду. Также теперь можно остановить третью ноду и убедиться, что вторая завершит свою работу, решив, что она осталась в меньшинстве:
[LocalTimeActor] sync: 1422111298564
[LocalTimeActor] sync: 1422111303585
[Listener] event: MemberRemoved(Member(address = akka.tcp://system@…
[Listener] cluster size is less than 2, shutting down!
[Listener] stopped!
Вот так Akka фактически из коробки дает нам leader election и service discovery!
Ну разве это не круто?
Дополнение: Еще вас могут заинтересовать посты Интересный пример с роутингом и кэшами в Akka Cluster и Колхозная реализация выбора лидера на Go и Consul .