Иногда требуется запланировать выполнение кода на определенное время после происшествия какого-то события. Или сказать, что некий код выполняется снова и снова с заданным интервалом времени. Если в вашем проекте используется Akka , то вы можете решать такие задачи просто элементарно… на самом деле нет. Как мы скоро убедимся, даже здесь есть определенные тонкости.
Примечание: Не путайте шедулер в Akka с шедулерами в Erlang . То, что в Erlang называется шедулерами и распределяют время между акторами, в терминах Akka называется диспетчерами. Здесь же мы говорим об аналоге таймеров из Erlang’а.
Рассмотрим следующий код:
import akka. actor . _
import akka. event . _
import akka. pattern . pipe
import com. typesafe . config . ConfigFactory
import scala. concurrent . _
import scala. concurrent . duration . _
import scala. concurrent . ExecutionContext . Implicits . global
import java. util . concurrent . atomic . AtomicInteger
object Database {
val counter = new AtomicInteger ( 0 )
def fakeRequest ( ) : Future [ Int ] = {
val result = counter. incrementAndGet ( )
Future {
if ( result == 3 ) throw new RuntimeException ( «Fake exception» )
result
}
}
}
case object PullCounter
case class PullResult ( counter : Int )
case object PullFailed
class PullActor extends Actor {
val period = 2 . seconds
var timerCancellable : Option [ Cancellable ] = None
def scheduleTimer ( ) = {
timerCancellable = Some (
context. system . scheduler . scheduleOnce (
period, context. self , PullCounter
)
)
}
override def preStart ( ) = scheduleTimer ( )
// so we don’t call preStart and schedule a new message
// see http://doc.akka.io/docs/akka/2.2.4/scala/howto.html
override def postRestart ( reason : Throwable ) = { }
def receive = LoggingReceive {
case PullCounter =>
val fReq = Database. fakeRequest ( )
fReq. map ( counter => PullResult ( counter ) ) pipeTo self
fReq. onFailure { case _ => self ! PullFailed }
case PullFailed =>
scheduleTimer ( )
case r : PullResult =>
if ( r. counter >= 5 ) {
context. system . shutdown ( )
} else {
scheduleTimer ( )
}
}
}
object Example4 extends App {
val config = ConfigFactory. parseString (
«»»
akka.loglevel = «DEBUG»
akka.actor.debug {
receive = on
lifecycle = on
}
«»» )
val system = ActorSystem ( «system» , config )
system. actorOf ( Props [ PullActor ] , «pullActor» )
system. awaitTermination ( )
}
Здесь мы как бы имеем довольно типичную ситуацию. Есть какой-то внешний источник данных, база данных, REST-сервис или вроде того. Ходить в него дорого и долго, да и сервис может оказаться временно недоступен. Поэтому мы заводим актор, который время от времени ходит за данными и кэширует их. В приведенном примере роль источника данных выполняет такая заглушка:
val counter = new AtomicInteger ( 0 )
def fakeRequest ( ) : Future [ Int ] = {
val result = counter. incrementAndGet ( )
Future {
if ( result == 3 ) throw new RuntimeException ( «Fake exception» )
result
}
}
}
На самом деле, нам не важно, что является источником, лишь бы при хождении в него использовались футуры . Как видите, футура из нашей заглушки не всегда завершается успешно.
При старте актор обращается к шедулеру и говорит «пошли мне сообщение PullCounter через две секунды»:
var timerCancellable : Option [ Cancellable ] = None
def scheduleTimer ( ) = {
timerCancellable = Some (
context. system . scheduler . scheduleOnce (
period, context. self , PullCounter
)
)
}
override def preStart ( ) = scheduleTimer ( )
Здесь мы не используем переменную timerCancellable но в общем случае с ее помощью посылку сообщения можно попытаться отменить:
При получении PullCounter актор осуществляет асинхронное хождение в «базу»:
val fReq = Database. fakeRequest ( )
fReq. map ( counter => PullResult ( counter ) ) pipeTo self
fReq. onFailure { case _ => self ! PullFailed }
При получении ответа, обернутого в PullResult, актор планирует посылку нового сообщения PullCounter (заметьте, кусок про system. shutdown ( )
нужен просто чтобы когда-нибудь остановить пример):
if ( r. counter >= 5 ) {
context. system . shutdown ( )
} else {
scheduleTimer ( )
}
Если же хождение в базу завершается неуспешно, срабатывает onFailure и актор получает сообщение PullFailed:
scheduleTimer ( )
Это очень важный момент, так как если бы мы его упустили, в случае ошибки актор перестал бы ходить в базу.
Следует отметить, что здесь мы использовали метод sheduleOnce и вручную планировали следующую посылку сообщения. Шедулер также имеет более простой метод shedule, позволяющий выполнять посылку сообщений регулярно без повторного планирования. Но в данном случае использовать его — плохая идея. Дело в том, что хождение в базу может занять больше заданного нами period’а в две секунды. В этом случае сообщения от шедулера начнут копиться в очереди актора, а в базу данных будет одновременно посылаться множество запросов, так как новый запрос будет создаваться до завершения предыдущего. Как результат, база ляжет, а очередь переполнится.
Если же в вашем приложении требуется именно shedule, соответствующий пример вы найдете здесь . Однако примите во внимание, что использование shedule не гарантирует вам регулярного получения заданного сообщения раз в N единиц времени. Во-первых, потому что при рестарте актора (например, из-за эксепшена) посылку сообщений придется запланировать снова, что как бы собьет весь ритм. Во-вторых, дэфолтная реализация шедулера проверяет, не пора ли послать какие-то сообщения, раз в некоторый интервал времени (который можно поменять в конфиге ), поэтому особой пунктуальности здесь ожидать не следует.
И кстати, раз уж мы вспомнили про эксепшены, обратите внимание на код:
Он нужен для того, чтобы при перезапуске актора повторно не вызвался preStart. В зависимости от конкретного случая вы должны или не должны этого хотеть. Как следует обдумайте конкретную ситуацию и протестируйте все граничные случаи.
Вывод программы (лишние детали опущены):
[akka://system/user/pullActor] received handled message PullResult(1)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(2)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullFailed
[akka://system/user/pullActor] received unhandled message Failure(…
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(4)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(5)
[akka://system/user] stopping
[akka://system/user/pullActor] stopped
[akka://system/user] stopped
Надеюсь, вы заметили, как ловко мы избавились от необходимости писать логи вручную, воспользовавшись LoggingReceive и правильно настроив Akka?
Как видите, при использовании шедулера приходится учитывать массу деталей. Не расслабляйтесь!
Дополнение: Делаем метрики и мониторинг для Akka при помощи Kamon