15

I'm reading a csv file. I am using Akka Streams to do this so that I can create a graph of actions to perform on each line. I've got the following toy example up and running.

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("MyAkkaSystem")
    implicit val materializer = ActorMaterializer()

        val source = akka.stream.scaladsl.Source.fromIterator(Source.fromFile("a.csv").getLines)
        val sink = Sink.foreach(println)
        source.runWith(sink)
      }

The two Source types don't sit easy with me. Is this idiomatic or is there is a better way to write this?

4 Answers 4

23

Actually, akka-streams provides a function to directly read from a file.

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .runForeach(println)

Here, runForeach method is to print the lines. If you have a proper Sink to process these lines, use it instead of this function. For example, if you want to split the lines by ' and print the total number of words in it:

val sink: Sink[String] = Sink.foreach(x => println(x.split(",").size))

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .to(sink)
      .run()
Sign up to request clarification or add additional context in comments.

1 Comment

Yeah I came across this the other day. I will probably use it. I tried using the PureCSV library but it loads all the file in memory before processing which defeats the purpose of using a stream based approach.
16

The idiomatic way to read a CSV file with Akka Streams is to use the Alpakka CSV connector. The following example reads a CSV file, converts it to a map of column names (assumed to be the first line in the file) and ByteString values, transforms the ByteString values to String values, and prints each line:

FileIO.fromPath(Paths.get("a.csv"))
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMap())
  .map(_.mapValues(_.utf8String))
  .runForeach(println)

Comments

7

Try this:

import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.Await
import scala.concurrent.duration._

object ReadStreamApp extends App {
  implicit val actorSystem = ActorSystem()
  import actorSystem.dispatcher
  implicit val flowMaterializer = ActorMaterializer()

  val logFile = Paths.get("src/main/resources/a.csv")

  val source = FileIO.fromPath(logFile)

  val flow = Framing
    .delimiter(ByteString(System.lineSeparator()), maximumFrameLength = 512, allowTruncation = true)
    .map(_.utf8String)

  val sink = Sink.foreach(println)

  source
    .via(flow)
    .runWith(sink)
    .andThen {
      case _ =>
        actorSystem.terminate()
        Await.ready(actorSystem.whenTerminated, 1 minute)
    }
}

Comments

2

Yeah, it's ok because these are different Sources. But if you don't like scala.io.Source you can read file yourself (which sometimes we have to do e.g. source csv file is zipped) and then parse it using given InputStream like this

StreamConverters.fromInputStream(() => input)
  .via(Framing.delimiter(ByteString("\n"), 4096))
  .map(_.utf8String)
  .collect { line =>
    line
  }

Having said that consider using Apache Commons CSV with akka-stream. You may end up writing less code :)

1 Comment

Hi, I was trying to run this example for a few minutes, I didn't succeeded (implicits and all). Can I ask you please to provide missing imports and setup?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.