2

I want to read many .CSV files inside a folder asynchronously and return an Iterable of a custom case class.

Can i achieve this with Akka Streams and How?

*I have tried to somehow Balance the job according to documentation but it's a little hard to manage through...

Or

Is it a good practice to use Actors instead?(a parent Actor with children, every child to read a File, and return an Iterable to parent, and then parent combine all Iterables?)

1
  • Question is not very clear. 1. Do you want to return a single Iterable of a custom case class for all the CSV files, or one for each csv file? 2. what if there are thousands of files, do you want to read them all at the same time, or do you just want some level of parallelism? Commented Aug 18, 2017 at 7:50

2 Answers 2

3

Mostly the same as @paul answer but with small improvements

def files = new java.io.File("").listFiles().map(_.getAbsolutePath).to[scala.collection.immutable.Iterable]

Source(files).flatMapConcat( filename => //you could use flatMapMerge if you don't bother about line ordering
    FileIO.fromPath(Paths.get(filename))
      .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true).map(_.utf8String))
  ).map { csvLine =>
    // parse csv here
    println(csvLine)
  }
Sign up to request clarification or add additional context in comments.

Comments

1

first of all you need to read/learn how Akka stream works, with Source, Flow and Sink. Then you can start learning the operators.

To make multiple actions in parallel you can use operator mapAsync In which you specify the number of parallelism.

  /**
    * Using mapAsync operator, we pass a function which return a Future, the number of parallel run futures will
    * be determine by the argument passed to the operator.
    */
  @Test def readAsync(): Unit = {
    Source(0 to 10)//-->Your files
      .mapAsync(5) { value => //-> It will run in parallel 5 reads
        implicit val ec: ExecutionContext = ActorSystem().dispatcher
        Future {
          //Here read your file
          Thread.sleep(500)
          println(s"Process in Thread:${Thread.currentThread().getName}")
          value
        }
      }
      .runWith(Sink.foreach(value => println(s"Item emitted:$value in Thread:${Thread.currentThread().getName}")))
  }

You can learn more about akka and akka stream here https://github.com/politrons/Akka

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.