Я вроде как новичок в потоке Scala и Akka, и я пытаюсь получить строковые сообщения JSON из веб-магазина и перенести их в тему Кафки.
На данный момент я работаю только над частью "получать сообщения от ws".
Сообщения, поступающие с веб-сайта, выглядят следующим образом :
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
Я хочу разделить это сообщение JSON на несколько сообщений :
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
А затем переместите каждое из этих сообщений в тему кафки.
Вот чего я достиг до сих пор :
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
Это работает, я получаю ожидаемое выходное сообщение Json, но мне было интересно, могу ли я написать этого производителя в более "акка-иш" стиле, например, с помощью GraphDSL. Поэтому у меня есть несколько вопросов :
- Возможно ли непрерывно потреблять веб-сайт с помощью GraphDSL ? Если да, не могли бы вы показать мне пример, пожалуйста ?
- Хорошая ли идея использовать WS с помощью GraphDSL ?
- Должен ли я разложить полученное сообщение Json, как это делаю я, прежде чем отправлять его кафке ? Или лучше отправить его как есть для снижения задержки ?
- После создания сообщения Кафке я планирую использовать его с помощью Apache Storm, это хорошая идея ? Или мне следует придерживаться Акки ?
Спасибо, что прочитали меня, С уважением, Арес