9

I am having a Zipped file containing multiple text files. I want to read each of the file and build a List of RDD containining the content of each files.

val test = sc.textFile("/Volumes/work/data/kaggle/dato/test/5.zip")

will just entire files, but how to iterate through each content of zip and then save the same in RDD using Spark.

I am fine with Scala or Python.

Possible solution in Python with using Spark -

archive = zipfile.ZipFile(archive_path, 'r')
file_paths = zipfile.ZipFile.namelist(archive)
for file_path in file_paths:
    urls = file_path.split("/")
    urlId = urls[-1].split('_')[0]
4
  • Hi @AbhishekChoudhary - what solution of the ones below worked best for you? Thanks. Commented Oct 29, 2017 at 10:25
  • 1
    used spark APi, to read all files saved in a single RDD, and then used different filter mechanism to partition the data Commented Oct 29, 2017 at 16:40
  • Unzipping a file is inherently a single-threaded process -- isn't doing this in Spark a waste of resources? Commented Apr 4, 2019 at 9:54
  • It was , but now apis are there to read zipped file as well in Spark. Commented Apr 4, 2019 at 10:59

5 Answers 5

10

Apache Spark default compression support

I have written all the necessary theory in other answer, that you might want to refer to: https://stackoverflow.com/a/45958182/1549135

Read zip containing multiple files

I have followed the advice given by @Herman and used ZipInputStream. This gave me this solution, which returns RDD[String] of the zip content.

import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

      if (path.endsWith(".zip")) {
        sc.binaryFiles(path, minPartitions)
          .flatMap { case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            Stream.continually(zis.getNextEntry)
                  .takeWhile {
                      case null => zis.close(); false
                      case _ => true
                  }
                  .flatMap { _ =>
                      val br = new BufferedReader(new InputStreamReader(zis))
                      Stream.continually(br.readLine()).takeWhile(_ != null)
                  }
        }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
  }

simply use it by importing the implicit class and call the readFile method on SparkContext:

import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)
Sign up to request clarification or add additional context in comments.

10 Comments

You are not closing the connections.
@Programmer I tried closing it but then this method fails on me. So I left it to Spark.
@Atais Spark didn't close the streams in my case. I attempted to read thousands of files from S3, and failed because connection thread pool exhausted, but it worked once I closed the streams in the code. Anyhow, its always a good idea to do cleanup promptly.
Can you create an answer how did you handle it? Or post it somewhere?
I am having other issues. Will post once done. It's something like this stackoverflow.com/questions/35746539/close-a-stream
|
4

If you are reading binary files use sc.binaryFiles. This will return an RDD of tuples containing the file name and a PortableDataStream. You can feed the latter into a ZipInputStream.

Comments

2

Here's a working version of @Atais solution (which needs enhancement by closing the streams) :

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

def readFile(path: String,
             minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

  if (path.toLowerCase.contains("zip")) {

    sc.binaryFiles(path, minPartitions)
      .flatMap {
        case (zipFilePath, zipContent) ⇒
          val zipInputStream = new ZipInputStream(zipContent.open())
          Stream.continually(zipInputStream.getNextEntry)
            .takeWhile(_ != null)
            .map { _ ⇒
              scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString("\n")
            } #::: { zipInputStream.close; Stream.empty[String] }
      }
  } else {
    sc.textFile(path, minPartitions)
  }
}
}

Then all you have to do is the following to read a zip file :

sc.readFile(path)

2 Comments

how to add file name to the output so I can filter on file name imagine one zip file has multiple schema files I can use spark input_file_name virtual column on file name if I can get file name in the rdd @mahmoud mehdi
this will give file names too , .map { x ⇒ val filename1 = x.getName scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString(s"~${filename1}\n")+s"~${filename1}" } #::: { zipInputStream.close; Stream.empty[String] }
1

This filters only the first line. can anyone share your insights. I am trying to read a CSV file which is zipped and create JavaRDD for further processing.

JavaPairRDD<String, PortableDataStream> zipData =
                sc.binaryFiles("hdfs://temp.zip");
        JavaRDD<Record> newRDDRecord = zipData.flatMap(
          new FlatMapFunction<Tuple2<String, PortableDataStream>, Record>(){
              public Iterator<Record> call(Tuple2<String,PortableDataStream> content) throws Exception {
                  List<Record> records = new ArrayList<Record>();
                      ZipInputStream zin = new ZipInputStream(content._2.open());
                      ZipEntry zipEntry;
                      while ((zipEntry = zin.getNextEntry()) != null) {
                          count++;
                          if (!zipEntry.isDirectory()) {
                              Record sd;
                              String line;
                              InputStreamReader streamReader = new InputStreamReader(zin);
                              BufferedReader bufferedReader = new BufferedReader(streamReader);
                              line = bufferedReader.readLine();
                              String[] records= new CSVParser().parseLineMulti(line);
                              sd = new Record(TimeBuilder.convertStringToTimestamp(records[0]),
                                        getDefaultValue(records[1]),
                                        getDefaultValue(records[22]));
                              records.add(sd);
                          }
                      }

                return records.iterator();
              }

        });

Comments

-1

Here is another working solution which gives out file name which can be later split and used to create separate schemas from it.

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

      if (path.toLowerCase.contains("zip")) {

        sc.binaryFiles(path, minPartitions)
          .flatMap {
            case (zipFilePath, zipContent) ⇒
              val zipInputStream = new ZipInputStream(zipContent.open())
              Stream.continually(zipInputStream.getNextEntry)
                .takeWhile(_ != null)
                .map { x ⇒
                  val filename1 = x.getName
                  scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString(s"~${filename1}\n")+s"~${filename1}"
                } #::: { zipInputStream.close; Stream.empty[String] }
          }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
  }

full code is here

https://github.com/kali786516/Spark2StructuredStreaming/blob/master/src/main/scala/com/dataframe/extraDFExamples/SparkReadZipFiles.scala

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.