Поток Akka непрерывно потребляет websocket

0

Вопрос

Я вроде как новичок в потоке Scala и Akka, и я пытаюсь получить строковые сообщения JSON из веб-магазина и перенести их в тему Кафки.

На данный момент я работаю только над частью "получать сообщения от ws".

Сообщения, поступающие с веб-сайта, выглядят следующим образом :

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

Я хочу разделить это сообщение JSON на несколько сообщений :

   {"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

А затем переместите каждое из этих сообщений в тему кафки.

Вот чего я достиг до сих пор :

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

Это работает, я получаю ожидаемое выходное сообщение Json, но мне было интересно, могу ли я написать этого производителя в более "акка-иш" стиле, например, с помощью GraphDSL. Поэтому у меня есть несколько вопросов :

  • Возможно ли непрерывно потреблять веб-сайт с помощью GraphDSL ? Если да, не могли бы вы показать мне пример, пожалуйста ?
  • Хорошая ли идея использовать WS с помощью GraphDSL ?
  • Должен ли я разложить полученное сообщение Json, как это делаю я, прежде чем отправлять его кафке ? Или лучше отправить его как есть для снижения задержки ?
  • После создания сообщения Кафке я планирую использовать его с помощью Apache Storm, это хорошая идея ? Или мне следует придерживаться Акки ?

Спасибо, что прочитали меня, С уважением, Арес

akka akka-stream apache-kafka scala
2021-11-20 14:01:02
1

Лучший ответ

1

Этот код очень похож на Акка-иш: scaladsl это так же Акка, как и GraphDSL или реализация пользовательского GraphStage. Единственная причина, ИМО/Е, пойти в GraphDSL является, если фактическая форма графика не легко выражается в scaladsl.

Я бы лично пошел по пути определения CoinPrice класс, чтобы сделать модель явной

case class CoinPrice(coin: String, price: BigDecimal)

А затем выпейте Flow[Message, CoinPrice, NotUsed] который анализирует 1 входящее сообщение на ноль или более CoinPrices. Что-то (используя здесь Play JSON), например:

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

Вы можете, в зависимости от размера JSON в сообщении, разбить его на различные этапы потока, чтобы обеспечить асинхронную границу между анализом JSON и извлечением в CoinPrices. Например,

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

В приведенном выше примере этапы по обе стороны от async граница будет выполняться отдельными участниками и, таким образом, возможно, одновременно (если доступно достаточно ядер процессора и т. Д.), За счет дополнительных накладных расходов для участников для координации и обмена сообщениями. Эти дополнительные накладные расходы на координацию/связь (см. Универсальный закон масштабируемости Гюнтера) будут оправданы только в том случае, если объекты JSON достаточно велики и поступают достаточно быстро (последовательно поступают до того, как предыдущий завершит обработку).

Если вы намерены использовать websocket до тех пор, пока программа не остановится, возможно, вам будет проще просто использовать Source.never[Message].

2021-11-21 12:42:30

Спасибо за ваш ответ, все очень ясно, у меня только один вопрос. Как я могу разбить свой ответ на различные этапы потока ? Не могли бы вы просто показать мне небольшой пример, пожалуйста ? Или сориентируйте меня на соответствующую часть документации ?
Arès

На других языках

Эта страница на других языках

Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................