0

I am quite new to Pyspark (and Spark) and have a concrete task to solve that is currently beyond my knowledge :).

I have a bunch of files of the following structure:

'File_A.dtx':

## Animal
# Header Start
Name, Type, isMammal
# Body Start
Hasi, Rabbit, yes
Birdi, Bird, no
Cathi, Cat, yes
## House
# Header Start
Street, Number
# Body Start
Main Street, 32
Buchengasse, 11

'File_B.dtx':

## Animal
# Header Start
Name, Type, isMammal
# Body Start
Diddi, Dog, yes
Eli, Elephant, yes
## House
# Header Start
Street, Number
# Body Start
Strauchweg, 13
Igelallee, 22

My anticipated result are two dataframes as follows:

Animals:

| Filename   | Name    | Type     | isMammal    | 
| ---------- | ------- | -------- | ----------- | 
| File_A.dtx | Hasi    | Rabbit   | yes         | 
| File_A.dtx | Birdi   | Bird     | no          | 
| File_A.dtx | Cathi   | Cat      | yes         | 
| File_B.dtx | Diddi   | Dog      | yes         | 
| File_B.dtx | Eli     | Elephant | yes         | 

House:

| Filename   | Street       | Number   | 
| ---------- | ------------ | -------- | 
| File_A.dtx | Main Street  | 32       | 
| File_A.dtx | Buchengasse  | 11       | 
| File_B.dtx | Strauchweg   | 13       | 
| File_B.dtx | Igelallee    | 22       | 

The solution should be able to work in parallel. It can work per file since each file is small (around 3 MB) but I have a lot of them.

Thanks so much for hints.

What I currently have is just:

from  pyspark.sql.functions import input_file_name
df1 = spark.read.text(filelist).withColumn("Filename", input_file_name())

Now my main problem is, how do I split the dataframe according to the rows ## Animal and ## House and aggregate it again to a dataframe to fullfil my task?

1
  • Finally, my question is complete ! Sorry for the multiple edits! Thanks for help! Commented Jan 11, 2022 at 12:52

1 Answer 1

1

Assuming you know the format of the before hand and no two dataframes will have the same number of columns. Then you can do the following:

  1. Remove comments (lines start with #) from the dataset
  2. Remove header rows from the dataset
  3. Remove empty lines
  4. Split the lines using ,
  5. Create animals_df as subset of rows from df in step 4 wherein the size of array from splitting is equal to 3 and extract the array values as columns
  6. Create house_df as subset of rows from df in step 4 wherein the size of array from splitting is equal to 2 and extract the array values as columns
from  pyspark.sql.functions import element_at, input_file_name, length, col as c, split, size

filelist = ["File_A.dtx", "File_B.dtx"]

df1 = spark.read.text(filelist).withColumn("Filename", input_file_name())

# STEP 1
comment_removed = df1.filter(~(c("value").startswith("#")))

# STEP 2
header_removed = comment_removed.filter(~(c("value").isin("Name, Type, isMammal", "Street, Number")))

# STEP 3
remove_empty_lines = header_removed.filter(length("value") > 0)

# STEP 4
processed_df = remove_empty_lines.withColumn("value", split("value", ",")).withColumn("Filename", element_at(split("Filename", "/"), -1)).cache()

# STEP 5
animals_df = processed_df.filter(size("value") == 3).selectExpr("Filename", "value[0] as Name", "value[1] as Type", "value[2] as isMammal")
animals_df.show()

"""
+----------+-----+---------+--------+
|  Filename| Name|     Type|isMammal|
+----------+-----+---------+--------+
|File_A.dtx| Hasi|   Rabbit|     yes|
|File_A.dtx|Birdi|     Bird|      no|
|File_A.dtx|Cathi|      Cat|     yes|
|File_B.dtx|Diddi|      Dog|     yes|
|File_B.dtx|  Eli| Elephant|     yes|
+----------+-----+---------+--------+
"""

# STEP 6
house_df = processed_df.filter(size("value") == 2).selectExpr("Filename", "value[0] as Street", "cast(value[1] as int) as Number")
house_df.show()
"""
+----------+-----------+------+
|  Filename|     Street|Number|
+----------+-----------+------+
|File_A.dtx|Main Street|    32|
|File_A.dtx|Buchengasse|    11|
|File_B.dtx| Strauchweg|    13|
|File_B.dtx|  Igelallee|    22|
+----------+-----------+------+
"""
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks heaps... this is an excellent and nice introduction into the topic! Exactly as I needed it. For me, having worked with 'normal' dataframes only, it is harder than expected to get used to the new syntax and new handling!
You have now used the nice trick that the number of columns were different for the two types (animals and house)... However, would it be also possible to use the ## Animal and ## House tags directly?
If the files have line numbers or if you read the files as an RDD and add an index then, ## Animal and ## House can be used to identify range of rows containing the rows.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.