Я пытаюсь написать фрагмент кода, который делает следующее:-
- Считывает большой csv-файл из удаленного источника, такого как s3.
- Обрабатывайте файл запись за записью.
- Отправить уведомление пользователю
- Запишите выходные данные в удаленное расположение
Запись образца во входном формате csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
Мой входной класс case, который представляет запись во входном csv:
case class InputRecord(recordId: String, name: String, salary: Long)
Пример записи в выходном формате csv (который необходимо записать):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
Мой выходной класс вариантов, который представляет запись во входном csv:
case class OutputRecord(recordId: String, name: String, designation: String)
Чтение записи с помощью csv-файла akka stream (использует Alpakka reactive s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
Теперь у меня есть функция для обработки записей:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
Функция записи выходной записи в формате csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
Функция отправки уведомлений по электронной почте:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
Сшить все это вместе
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
В строке 15 и 16 я получаю ошибку, я могу добавить строку 15 или строку 16, но не обе, так как обе notify
& writeOutput
потребности outputRecord
. Как только уведомление вызвано, я теряю outputRecord
.
Есть ли способ, которым я могу добавить оба notify
и writeOutput
к тому же графику?
Я не ищу параллельное выполнение, так как хочу сначала позвонить notify
и то только writeOutput
. Так что это не поможет: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
Вариант использования кажется мне очень простым, но я не могу найти чистое решение.