Некоторое время назад мы с вами успешно разобрались, что такое Cassandra, а также как установить и настроить Cassandra-кластер в облаке от Amazon. В чем мы не разобрались, это в том, как использовать Cassandra в наших программах. Настало время исправить столь вопиющую несправедливость!
Изучая CQL в качестве примера мы создавали базу данных, хранящую список TODO. Раз уж этот пример так хорошо изучен, нет причин не использовать его повторно при написании программы.
Добавляем зависимость в build.sbt:
«com.datastax.cassandra» % «cassandra-driver-core» % «2.1.6»
)
Драйвер использует Guava, поэтому нам понадобится implicit преобразование футур Guava в футуры Scala :
import com. datastax . driver . core . _
import com. google . common . util . concurrent . _
import scala. concurrent . _
package object utils {
implicit def futCSToScala ( f : ResultSetFuture ) : Future [ ResultSet ] = {
val promise = Promise [ ResultSet ] ( )
val callback = new FutureCallback [ ResultSet ] {
def onSuccess ( result : ResultSet ) : Unit = {
promise success result
}
def onFailure ( err : Throwable ) : Unit = {
promise failure err
}
}
Futures. addCallback ( f, callback )
promise. future
}
}
Пользоваться драйвером довольно просто:
import com. datastax . driver . core . _
import com. datastax . driver . core . querybuilder . { QueryBuilder => QB }
import me. eax . cassandra_example . utils . _
import scala. collection . JavaConverters . _
import scala. concurrent . _
case class TodoDTO ( id : Int, descr : String )
case class TodoDAO ( session : Session ) ( implicit ec : ExecutionContext ) {
private val table = «todo_list»
private val id = «id»
private val description = «description»
def createTable : Future [ Unit ] = {
val query = s «create table if not exists $table ($id int » +
s «primary key, $description text )»
session. executeAsync ( query ) . map ( _ => { } )
}
def dropTable : Future [ Unit ] = {
val query = s «drop table if exists $table»
session. executeAsync ( query ) . map ( _ => { } )
}
def insert ( dto : TodoDTO ) : Future [ Unit ] = {
val query = {
QB. insertInto ( table )
. value ( id, dto. id )
. value ( description, dto. descr )
}
session. executeAsync ( query ) . map ( _ => { } )
}
def select : Future [ Seq [ TodoDTO ] ] = {
val query = {
QB. select ( id, description )
. from ( table )
}
for {
resultSet < — session. executeAsync ( query )
} yield {
resultSet
. asScala
. map ( row => TodoDTO ( row. getInt ( id ) ,
row. getString ( description ) ) )
. toSeq
}
}
def delete ( idToDelete : Long ) : Future [ Unit ] = {
val query = {
QB. delete ( ) . all ( )
. from ( table )
. where ( QB. eq ( id, idToDelete ) )
}
session. executeAsync ( query ) . map ( _ => { } )
}
}
Наконец, имея готовый DAO, несложно написать программу, устанавливающую соединение с кластером и производящую кое-какие махинации с данными:
import com. datastax . driver . core . _
import me. eax . cassandra_example . dao . _
import me. eax . cassandra_example . utils . _
import scala. concurrent . _
import scala. concurrent . duration . _
import scala. concurrent . ExecutionContext . Implicits . global
object CassandraExample extends App {
val cluster = {
Cluster. builder ( )
. addContactPoint ( «10.110.0.10» )
// .withCredentials(«username», «password»)
. build ( )
}
val session = cluster. connect ( «test» )
val todoDao = TodoDAO ( session )
val f = {
for {
_ < — todoDao. createTable
_ = println ( «Inserting items» )
_ < — {
ftraverse ( ( 1 to 3 ) . toSeq ) { n =>
val item = TodoDTO ( n, s «Todo item $n» )
todoDao. insert ( item )
}
}
items < — todoDao. select
_ = println ( s «Items: $items» )
_ = println ( «Deleting item 2» )
_ < — todoDao. delete ( 2 )
newItems < — todoDao. select
_ = println ( s «New items: $newItems» )
_ < — todoDao. dropTable
} yield { }
}
f onFailure { case e =>
println ( s «ERROR: $e» )
e. printStackTrace ( )
}
Await. ready ( f, Duration. Inf )
cluster. close ( )
println ( «Done!» )
}
Используемая в этом примере функция ftraverse аналогичная Future.sequence. Отличие заключается в том, что Future.sequence выполняет несколько футур параллельно, а при использовании ftraverse одновременно выполняется только одна футура. В сложных приложениях использование sequence может приводить к лавинообразному созданию футур и забиванию ими трэдпула при получении одного-единственного запроса пользователя. Поэтому по возможности я бы советовал всегда использовать ftraverse вместо sequence. Исходный код функции ftraverse можно найти здесь .
Есть один важный момент, который следует учитывать при работе с драйвером к Cassandra. Метод executeAsync класса Session перегружен и может принимать как строку с запросом на языке CQL, так и наследника абстрактного класса Statement. В приведенном примере мы пользовались последним способом. Но при этом Statement имеет метод getQueryString, который вроде как возвращает строку к запросом. И поэтому возникает соблазн вызвать getQueryString, а затем executeAsync, принимающий строку. Так вот, я не знаю, считается ли это багом или нормальным поведением, но getQueryString иногда возвращает невалидные запросы . Так что, если вдруг вы словите странное исключение вроде:
… проверьте, не генерируете ли вы запросы при помощи getQueryString.
Следует отметить, что в отличие, например, от Java-клиента к Couchbase , клиент к Cassandra является очень умным. Он поддерживает пулы соединений, которые увеличиваются или уменьшаются в зависимости от текущей нагрузки, умеет автоматически восстанавливать порвавшееся соединение с экспоненциальным ростом времени реконнекта, а также многое другое. Поэтому в реальном проекте объявление переменной cluster будет больше похоже на следующее:
Cluster. builder ( )
. addContactPoint ( «172.31.0.11» )
. addContactPoint ( «172.31.0.22» )
. addContactPoint ( «172.31.0.33» )
. withPort ( port )
. withPoolingOptions (
new PoolingOptions ( )
. setConnectionsPerHost ( HostDistance. REMOTE , 5 , 8 )
. setNewConnectionThreshold ( HostDistance. REMOTE , 10 )
. setConnectionsPerHost ( HostDistance. LOCAL , 5 , 8 )
. setNewConnectionThreshold ( HostDistance. LOCAL , 10 )
. setHeartbeatIntervalSeconds ( 10 )
)
. withSocketOptions (
new SocketOptions ( )
. setConnectTimeoutMillis ( 1000 )
. setReadTimeoutMillis ( 3000 )
)
. withReconnectionPolicy (
new ExponentialReconnectionPolicy ( 300 , 3000 )
)
. build ( )
}
Понятно, что приведенный пример очень прост и многое осталось за кадром. Но со всякими нюансами типа выполнения batch запросов и указания consistency level, думаю, вы теперь без труда разберетесь самостоятельно.
Ссылки по теме:
- Список рассылки, посвященный Java-клиенту к Cassandra ;
- Phantom — асинхронный и типизированный DSL для Cassandra ;
- При написании тестов используйте Mock к Cassandra ;
- Что нужно и не нужно делать при работе с Cassandra-драйвером ;
- Полная версия исходников к этой заметке на GitHub ;
А каким драйвером к Cassandra пользуетесь вы?