Вход потока Акка (`Вход") как выход (`Выход")

0

Вопрос

Я пытаюсь написать фрагмент кода, который делает следующее:-

  1. Считывает большой csv-файл из удаленного источника, такого как s3.
  2. Обрабатывайте файл запись за записью.
  3. Отправить уведомление пользователю
  4. Запишите выходные данные в удаленное расположение

Запись образца во входном формате csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

Мой входной класс case, который представляет запись во входном csv:

case class InputRecord(recordId: String, name: String, salary: Long)

Пример записи в выходном формате csv (который необходимо записать):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

Мой выходной класс вариантов, который представляет запись во входном csv:

case class OutputRecord(recordId: String, name: String, designation: String)

Чтение записи с помощью csv-файла akka stream (использует Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

Теперь у меня есть функция для обработки записей:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Функция записи выходной записи в формате csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

Функция отправки уведомлений по электронной почте:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Сшить все это вместе

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

В строке 15 и 16 я получаю ошибку, я могу добавить строку 15 или строку 16, но не обе, так как обе notify & writeOutput потребности outputRecord. Как только уведомление вызвано, я теряю outputRecord.

Есть ли способ, которым я могу добавить оба notify и writeOutput к тому же графику?

Я не ищу параллельное выполнение, так как хочу сначала позвонить notify и то только writeOutput. Так что это не поможет: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

Вариант использования кажется мне очень простым, но я не могу найти чистое решение.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

Лучший ответ

1

Выход изnotify это PushResult, но вклад writeOutput является ByteString. Как только вы измените это, он будет скомпилирован. На случай, если вам понадобится ByteString, получите то же самое от OutputRecord.

Кстати, в приведенном вами примере кода аналогичная ошибка существует в readCSV и process.

2021-11-24 03:36:16

На других языках

Эта страница на других языках

Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................