0

In pyspark I have a data frame composed of two columns

Assume the details in the array of array are timestamp, email, phone number, first name, last name, address, city, country, randomId

+-------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| str1                    | array_of_str                                                                                                                                                                                                                                                                                                                                                                                |
+-------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| random column data1     | [[‘2020-01-26 17:30:57.000 +0000’, ’’, ‘728-802-5766’, ‘’, ‘’, ‘7th street crossroads’, ‘seattle’, ‘’, ‘randomId104’], [‘2019-07-20 20:54:57.000 +0000’, ’[email protected]’, ‘728-802-5766’, ‘Katuscha’, ‘’, ‘’, ‘’, ‘us’, ‘randomId225’], [‘2015-12-04 04:54:57.000 +0000’, ’[email protected]’, ‘728-802-5766’, ‘’, ‘Othen’, ‘7th street crossroads’, ‘seattle’, ‘’, ‘randomid306’]]|
| random column data2     | [[‘2021-01-30 17:30:04.000 +0000’, ’[email protected]’, ‘313-984-9692’, ‘’, ‘’, ‘th street crossroads’, ‘New york’, ‘us’, ‘randomId563’], [‘2018-05-15 20:44:57.000 +0000’, ’[email protected]’, ‘’, ‘Marianne’, ‘Allmann’, ‘’, ‘’, ‘us’, ‘randomId884’]]                                                                                                                                |
+-------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

I am expecting output data frame like below

+-------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| str1                    | array_of_str                                                                                                                                         |
+-------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| random column data1     | [‘2020-01-26 17:30:57.000 +0000’, ’[email protected]’, ‘728-802-5766’, ‘Katuscha’, ‘Othen’, ‘7th street crossroads’, ‘seattle’, ‘us’, ‘randomid306’] |
| random column data2     | [‘2021-01-30 17:30:04.000 +0000’, ’[email protected]’, ‘313-984-9692’, ‘Marianne’, ‘Allmann’, ‘111th Ave NE’, ‘New york’, ‘us’, ‘randomId884’]      |
+-------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------- +

optional :- existing data in array of array might not be already sorted at decreasing timestamp order. How do I sort the array of array in decreasing timestamp order

Here I am planning to write a udf to pull latest non null (timestamp, email, phone number, first name, last name, address, city, country) data from array of array. In case of randomId, I will always pull the randomId associated with the oldest record in the system.

example:- for random column data1 emailId i.e. [email protected] is getting populated from second element in the array since the first one is having empty email id. similar is the case with other columns. In case of randomid randomid306 for first record is the oldest entry so its populated in my output data frame.

In the udf

  1. How do I sort the array of array elements in descending timestamp order? - kind of an optional step

  2. How do i iterate over the array of array column in the data frame?

3)How to access individual items on the array in a udf?

like in the case of python we can iterate over list of list elements like

for item in items:
   print(item[0], item[1])

how can I achieve similar thing for array of array columns in pyspark?

can I do above steps in pyspark by not converting the data to pandas dataframe?

spark version 2.4.3 python 3.6.8

1 Answer 1

1

You can use the built in sort_array for this, create a new column using this and extract the first element (0) using getItem

Built in sort_array


input_list = [('10',[
                  ['2020-01-26 17:30:57.000 +0000', '', '728-802-5766', '', '', '7th street crossroads', 'seattle', '', 'randomId104']
                , ['2019-07-20 20:54:57.000 +0000', '[email protected]', '728-802-5766', 'Katuscha', '', '', '', 'us', 'randomId225']
                , ['2015-12-04 04:54:57.000 +0000', '[email protected]', '728-802-5766', '', 'Othen', '7th street crossroads', 'seattle', '', 'randomid306']
            ])
        ,('20',[
          ['2021-01-30 17:30:04.000 +0000', '[email protected]', '313-984-9692', '', '', 'th street crossroads', 'New york', 'us', 'randomId563']
        , ['2018-05-15 20:44:57.000 +0000', '[email protected]', '', 'Marianne', 'Allmann', '', '', 'us', 'randomId884']
        ])
    ]

sparkDF = sql.createDataFrame(input_list,['id','array_str'])


sparkDF = sparkDF.withColumn('sorted_array_str'
                                 ,F.sort_array(F.col('array_str'),False).getItem(0))


sparkDF.select(['id','sorted_array_str']).show(truncate=False)

UDF

# array_sort_udf, sorts on the timestamp , can be futher customized for giving precedence to multiple elements

array_sort_udf = F.udf(lambda x : sorted(x,key=lambda x: x[0], reverse=True), ArrayType(StringType()))

sparkDF = sparkDF.withColumn('sorted_array_str'
                                 ,array_sort_udf(F.col('array_str')).getItem(0))

sparkDF.select(['id','sorted_array_str']).show(truncate=False)

Output

+---+----------------------------------------------------------------------------------------------------------------------+
|id |sorted_array_str                                                                                                      |
+---+----------------------------------------------------------------------------------------------------------------------+
|10 |[2020-01-26 17:30:57.000 +0000, , 728-802-5766, , , 7th street crossroads, seattle, , randomId104]                    |
|20 |[2021-01-30 17:30:04.000 +0000, [email protected], 313-984-9692, , , th street crossroads, New york, us, randomId563]|
+---+----------------------------------------------------------------------------------------------------------------------+

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.