Иногда требуется запланировать выполнение кода на определенное время после происшествия какого-то события. Или сказать, что некий код выполняется снова и снова с заданным интервалом времени. Если в вашем проекте используется Akka , то вы можете решать такие задачи просто элементарно… на самом деле нет. Как мы скоро убедимся, даже здесь есть определенные тонкости.

Примечание: Не путайте шедулер в Akka с шедулерами в Erlang . То, что в Erlang называется шедулерами и распределяют время между акторами, в терминах Akka называется диспетчерами. Здесь же мы говорим об аналоге таймеров из Erlang’а.

Рассмотрим следующий код:

package me. eax . akka_examples

import akka. actor . _
import akka. event . _
import akka. pattern . pipe
import com. typesafe . config . ConfigFactory

import scala. concurrent . _
import scala. concurrent . duration . _
import scala. concurrent . ExecutionContext . Implicits . global

import java. util . concurrent . atomic . AtomicInteger

object Database {
val counter = new AtomicInteger ( 0 )

def fakeRequest ( ) : Future [ Int ] = {
val result = counter. incrementAndGet ( )
Future {
if ( result == 3 ) throw new RuntimeException ( «Fake exception» )
result
}
}
}

case object PullCounter
case class PullResult ( counter : Int )
case object PullFailed

class PullActor extends Actor {
val period = 2 . seconds
var timerCancellable : Option [ Cancellable ] = None

def scheduleTimer ( ) = {
timerCancellable = Some (
context. system . scheduler . scheduleOnce (
period, context. self , PullCounter
)
)
}

override def preStart ( ) = scheduleTimer ( )

// so we don’t call preStart and schedule a new message
// see http://doc.akka.io/docs/akka/2.2.4/scala/howto.html
override def postRestart ( reason : Throwable ) = { }

def receive = LoggingReceive {
case PullCounter =>
val fReq = Database. fakeRequest ( )
fReq. map ( counter => PullResult ( counter ) ) pipeTo self
fReq. onFailure { case _ => self ! PullFailed }

case PullFailed =>
scheduleTimer ( )

case r : PullResult =>
if ( r. counter >= 5 ) {
context. system . shutdown ( )
} else {
scheduleTimer ( )
}
}
}

object Example4 extends App {
val config = ConfigFactory. parseString (
«»»
akka.loglevel = «DEBUG»
akka.actor.debug {
receive = on
lifecycle = on
}
«»»
)

val system = ActorSystem ( «system» , config )
system. actorOf ( Props [ PullActor ] , «pullActor» )
system. awaitTermination ( )
}

Здесь мы как бы имеем довольно типичную ситуацию. Есть какой-то внешний источник данных, база данных, REST-сервис или вроде того. Ходить в него дорого и долго, да и сервис может оказаться временно недоступен. Поэтому мы заводим актор, который время от времени ходит за данными и кэширует их. В приведенном примере роль источника данных выполняет такая заглушка:

object Database {
val counter = new AtomicInteger ( 0 )

def fakeRequest ( ) : Future [ Int ] = {
val result = counter. incrementAndGet ( )
Future {
if ( result == 3 ) throw new RuntimeException ( «Fake exception» )
result
}
}
}

На самом деле, нам не важно, что является источником, лишь бы при хождении в него использовались футуры . Как видите, футура из нашей заглушки не всегда завершается успешно.

При старте актор обращается к шедулеру и говорит «пошли мне сообщение PullCounter через две секунды»:

val period = 2 . seconds
var timerCancellable : Option [ Cancellable ] = None

def scheduleTimer ( ) = {
timerCancellable = Some (
context. system . scheduler . scheduleOnce (
period, context. self , PullCounter
)
)
}

override def preStart ( ) = scheduleTimer ( )

Здесь мы не используем переменную timerCancellable но в общем случае с ее помощью посылку сообщения можно попытаться отменить:

timerCancellable. foreach ( _ . cancel ( ) )

При получении PullCounter актор осуществляет асинхронное хождение в «базу»:

case PullCounter =>
val fReq = Database. fakeRequest ( )
fReq. map ( counter => PullResult ( counter ) ) pipeTo self
fReq. onFailure { case _ => self ! PullFailed }

При получении ответа, обернутого в PullResult, актор планирует посылку нового сообщения PullCounter (заметьте, кусок про system. shutdown ( ) нужен просто чтобы когда-нибудь остановить пример):

case r : PullResult =>
if ( r. counter >= 5 ) {
context. system . shutdown ( )
} else {
scheduleTimer ( )
}

Если же хождение в базу завершается неуспешно, срабатывает onFailure и актор получает сообщение PullFailed:

case PullFailed =>
scheduleTimer ( )

Это очень важный момент, так как если бы мы его упустили, в случае ошибки актор перестал бы ходить в базу.

Следует отметить, что здесь мы использовали метод sheduleOnce и вручную планировали следующую посылку сообщения. Шедулер также имеет более простой метод shedule, позволяющий выполнять посылку сообщений регулярно без повторного планирования. Но в данном случае использовать его — плохая идея. Дело в том, что хождение в базу может занять больше заданного нами period’а в две секунды. В этом случае сообщения от шедулера начнут копиться в очереди актора, а в базу данных будет одновременно посылаться множество запросов, так как новый запрос будет создаваться до завершения предыдущего. Как результат, база ляжет, а очередь переполнится.

Если же в вашем приложении требуется именно shedule, соответствующий пример вы найдете здесь . Однако примите во внимание, что использование shedule не гарантирует вам регулярного получения заданного сообщения раз в N единиц времени. Во-первых, потому что при рестарте актора (например, из-за эксепшена) посылку сообщений придется запланировать снова, что как бы собьет весь ритм. Во-вторых, дэфолтная реализация шедулера проверяет, не пора ли послать какие-то сообщения, раз в некоторый интервал времени (который можно поменять в конфиге ), поэтому особой пунктуальности здесь ожидать не следует.

И кстати, раз уж мы вспомнили про эксепшены, обратите внимание на код:

override def postRestart ( reason : Throwable ) = { }

Он нужен для того, чтобы при перезапуске актора повторно не вызвался preStart. В зависимости от конкретного случая вы должны или не должны этого хотеть. Как следует обдумайте конкретную ситуацию и протестируйте все граничные случаи.

Вывод программы (лишние детали опущены):

[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(1)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(2)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullFailed
[akka://system/user/pullActor] received unhandled message Failure(…
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(4)
[akka://system/user/pullActor] received handled message PullCounter
[akka://system/user/pullActor] received handled message PullResult(5)
[akka://system/user] stopping
[akka://system/user/pullActor] stopped
[akka://system/user] stopped

Надеюсь, вы заметили, как ловко мы избавились от необходимости писать логи вручную, воспользовавшись LoggingReceive и правильно настроив Akka?

Как видите, при использовании шедулера приходится учитывать массу деталей. Не расслабляйтесь!

Дополнение: Делаем метрики и мониторинг для Akka при помощи Kamon

admin

Share
Published by
admin

Recent Posts

Консоль удаленного рабочего стола(rdp console)

Клиент удаленного рабочего стола (rdp) предоставляет нам возможность войти на сервер терминалов через консоль. Что…

1 месяц ago

Настройка сети в VMware Workstation

В VMware Workstation есть несколько способов настройки сети гостевой машины: 1) Bridged networking 2) Network…

1 месяц ago

Логи брандмауэра Windows

Встроенный брандмауэр Windows может не только остановить нежелательный трафик на вашем пороге, но и может…

1 месяц ago

Правильный способ отключения IPv6

Вопреки распространенному мнению, отключить IPv6 в Windows Vista и Server 2008 это не просто снять…

1 месяц ago

Ключи реестра Windows, отвечающие за параметры экранной заставки

Параметры экранной заставки для текущего пользователя можно править из системного реестра, для чего: Запустите редактор…

1 месяц ago

Как управлять журналами событий из командной строки

В этой статье расскажу про возможность просмотра журналов событий из командной строки. Эти возможности можно…

1 месяц ago