Иногда требуется запланировать выполнение кода на определенное время после происшествия какого-то события. Или сказать, что некий код выполняется снова и снова с заданным интервалом времени. Если в вашем проекте используется Akka , то вы можете решать такие задачи просто элементарно… на самом деле нет. Как мы скоро убедимся, даже здесь есть определенные тонкости.

Примечание: Не путайте шедулер в Akka с шедулерами в Erlang . То, что в Erlang называется шедулерами и распределяют время между акторами, в терминах Akka называется диспетчерами. Здесь же мы говорим об аналоге таймеров из Erlang’а.

Рассмотрим следующий код:

package me. eax . akka_examples

import akka. actor . _
import akka. event . _
import akka. pattern . pipe
import com. typesafe . config . ConfigFactory

import scala. concurrent . _
import scala. concurrent . duration . _
import scala. concurrent . ExecutionContext . Implicits . global

import java. util . concurrent . atomic . AtomicInteger

object Database {
val counter = new AtomicInteger ( 0 )

def fakeRequest ( ) : Future [ Int ] = {
val result = counter. incrementAndGet ( )
Future {
if ( result == 3 ) throw new RuntimeException ( «Fake exception» )
result
}
}
}

case object PullCounter
case class PullResult ( counter : Int )
case object PullFailed

class PullActor extends Actor {
val period = 2 . seconds
var timerCancellable : Option [ Cancellable ] = None

def scheduleTimer ( ) = {
timerCancellable = Some (
context. system . scheduler . scheduleOnce (
period, context. self , PullCounter
)
)
}

override def preStart ( ) = scheduleTimer ( )

// so we don’t call preStart and schedule a new message
// see http://doc.akka.io/docs/akka/2.2.4/scala/howto.html
override def postRestart ( reason : Throwable ) = { }

def receive = LoggingReceive {
case PullCounter =>
val fReq = Database. fakeRequest ( )
fReq. map ( counter => PullResult ( counter ) ) pipeTo self
fReq. onFailure { case _ => self ! PullFailed }

case PullFailed =>
scheduleTimer ( )

case r : PullResult =>
if ( r. counter >= 5 ) {
context. system . shutdown ( )
} else {
scheduleTimer ( )
}
}
}

object Example4 extends App {
val config = ConfigFactory. parseString (
«»»
akka.loglevel = «DEBUG»
akka.actor.debug {
receive = on
lifecycle = on
}
«»»
)

val system = ActorSystem ( «system» , config )
system. actorOf ( Props [ PullActor ] , «pullActor» )
system. awaitTermination ( )
}

Здесь мы как бы имеем довольно типичную ситуацию. Есть какой-то внешний источник данных, база данных, REST-сервис или вроде того. Ходить в него дорого и долго, да и сервис может оказаться временно недоступен. Поэтому мы заводим актор, который время от времени ходит за данными и кэширует их. В приведенном примере роль источника данных выполняет такая заглушка:

object Database {
val counter = new AtomicInteger ( 0 )

def fakeRequest ( ) : Future [ Int ] = {
val result = counter. incrementAndGet ( )
Future {
if ( result == 3 ) throw new RuntimeException ( «Fake exception» )
result
}
}
}

На самом деле, нам не важно, что является источником, лишь бы при хождении в него использовались футуры . Как видите, футура из нашей заглушки не всегда завершается успешно.

При старте актор обращается к шедулеру и говорит «пошли мне сообщение PullCounter через две секунды»:

val period = 2 . seconds
var timerCancellable : Option [ Cancellable ] = None

def scheduleTimer ( ) = {
timerCancellable = Some (
context. system . scheduler . scheduleOnce (
period, context. self , PullCounter
)
)
}

override def preStart ( ) = scheduleTimer ( )

Здесь мы не используем переменную timerCancellable но в общем случае с ее помощью посылку сообщения можно попытаться отменить:

timerCancellable. foreach ( _ . cancel ( ) )

При получении PullCounter актор осуществляет асинхронное хождение в «базу»:

case PullCounter =>
val fReq = Database. fakeRequest ( )
fReq. map ( counter => PullResult ( counter ) ) pipeTo self
fReq. onFailure { case _ => self ! PullFailed }

При получении ответа, обернутого в PullResult, актор планирует посылку нового сообщения PullCounter (заметьте, кусок про system. shutdown ( ) нужен просто чтобы когда-нибудь остановить пример):

case r : PullResult =>
if ( r. counter >= 5 ) {
context. system . shutdown ( )
} else {
scheduleTimer ( )
}

Если же хождение в базу завершается неуспешно, срабатывает onFailure и актор получает сообщение PullFailed:

case PullFailed =>
scheduleTimer ( )

Это очень важный момент, так как если бы мы его упустили, в случае ошибки актор перестал бы ходить в базу.

Следует отметить, что здесь мы использовали метод sheduleOnce и вручную планировали следующую посылку сообщения. Шедулер также имеет более простой метод shedule, позволяющий выполнять посылку сообщений регулярно без повторного планирования. Но в данном случае использовать его — плохая идея. Дело в том, что хождение в базу может занять больше заданного нами period’а в две секунды. В этом случае сообщения от шедулера начнут копиться в очереди актора, а в базу данных будет одновременно посылаться множество запросов, так как новый запрос будет создаваться до завершения предыдущего. Как результат, база ляжет, а очередь переполнится.

Если же в вашем приложении требуется именно shedule, соответствующий пример вы найдете здесь . Однако примите во внимание, что использование shedule не гарантирует вам регулярного получения заданного сообщения раз в N единиц времени. Во-первых, потому что при рестарте актора (например, из-за эксепшена) посылку сообщений придется запланировать снова, что как бы собьет весь ритм. Во-вторых, дэфолтная реализация шедулера проверяет, не пора ли послать какие-то сообщения, раз в некоторый интервал времени (который можно поменять в конфиге ), поэтому особой пунктуальности здесь ожидать не следует.

И кстати, раз уж мы вспомнили про эксепшены, обратите внимание на код:

override def postRestart ( reason : Throwable ) = { }

Он нужен для того, чтобы при перезапуске актора повторно не вызвался preStart. В зависимости от конкретного случая вы должны или не должны этого хотеть. Как следует обдумайте конкретную ситуацию и протестируйте все граничные случаи.

Вывод программы (лишние детали опущены):

[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(1)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(2)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullFailed
[akka://system/user/pullActor] received unhandled message Failure(…
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(4)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(5)
[akka://system/user] stopping
[akka://system/user/pullActor] stopped
[akka://system/user] stopped

Надеюсь, вы заметили, как ловко мы избавились от необходимости писать логи вручную, воспользовавшись LoggingReceive и правильно настроив Akka?

Как видите, при использовании шедулера приходится учитывать массу деталей. Не расслабляйтесь!

Дополнение: Делаем метрики и мониторинг для Akka при помощи Kamon

EnglishRussianUkrainian