Объединение двух потоков в потоке Акка

0

Вопрос

Я пытаюсь объединить два потока и не могу объяснить результат моей реализации.

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

Я ожидаю следующего вывода из этого кода.

2
3
4
.
.
.
11
10
20
.
.
.
100

Вместо этого я вижу, что печатается только "2". Не могли бы вы, пожалуйста, объяснить, что не так в моей импликации и как я должен изменить программу, чтобы получить желаемый результат.

akka akka-stream scala
2021-10-21 17:29:00
2

Лучший ответ

3

Из документов API Akka Stream:

Concat:

Выдает, когда в текущем потоке доступен элемент; если текущий ввод завершен, он пытается выполнить следующий

Broadcast:

Излучается, когда все выходы прекращают обратное давление и доступен входной элемент

Два оператора не будут работать совместно, так как существует конфликт в том, как они работают -- Concat пытается вытащить все элементы из одного из Broadcastвыходы перед переключением на другой, в то время как Broadcast не будет излучать, если не будет спроса на ВСЕ его продукты.

Для того, что вам нужно, вы можете объединить, используяconcat как предположили комментаторы:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

или, что эквивалентно, используйтеSource.combine как показано ниже:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
2021-10-21 22:34:04
0

С помощью GraphDSL, который является упрощенной версией реализации Source.combine:

val sg = Source.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val concat = builder.add(Concat[Int](2))

    source ~> flow1 ~> concat
    source ~> flow2 ~> concat

    SourceShape(concat.out)
  }
)

sg.runWith(sink)
2021-10-26 19:23:56

На других языках

Эта страница на других языках

Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................