0

The following is an example Dataframe snippet:

+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_lid               |trace                           |message                                                                                                                                                                                                                                                                                                                                                                            |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1103960793391132675|47c10fda9b40407c998c154dc71a9e8c|[app.py:208] Prediction label: {"id": 617, "name": "CENSORED"}, score=0.3874854505062103                                                                                                                                                                                                                                                                                           |
|1103960793391132676|47c10fda9b40407c998c154dc71a9e8c|[app.py:224] Similarity values: [0.6530804801919593, 0.6359653379418201]                                                                                                                                                                                                                                                                                                           |
|1103960793391132677|47c10fda9b40407c998c154dc71a9e8c|[app.py:317] Predict=s3://CENSORED/scan_4745/scan4745_t1_r0_c9_2019-07-15-10-32-43.jpg trait_id=112 result=InferenceResult(predictions=[Prediction(label_id='230', label_name='H3', probability=0.0), Prediction(label_id='231', label_name='Other', probability=1.0)], selected=Prediction(label_id='231', label_name='Other', probability=1.0)). Took 1.3637824058532715 seconds |
+-------------------+--------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

I have millions of these, log like structures, where they all can be grouped by trace which is unique to a session.

I'm looking to transform these sets of rows into single rows, essentially mapping over them, for for this example I would extract from the first name the "id": 617 from the second row the values 0.6530804801919593, 0.6359653379418201 and from the third row the Prediction(label_id='231', label_name='Other', probability=1.0) value.

Then I would compose a new table having the columns:

| trace | id | similarity | selected |

with the values:

| 47c10fda9b40407c998c154dc71a9e8c | 617 | 0.6530804801919593, 0.6359653379418201 | 231 |

How should I implement this group-map transform over several rows in pyspark ?

2 Answers 2

1

I've written the below example in Scala for my own convenience, but it should translate readily to Pyspark.

1) Create the new columns in your dataframe via regexp_extract on the "message" field. This will produce the desired values if the regex matches, or empty strings if not:

scala> val dss = ds.select(
     | 'trace, 
     | regexp_extract('message, "\"id\": (\\d+),", 1) as "id", 
     | regexp_extract('message, "Similarity values: \\[(\\-?[0-9\\.]+, \\-?[0-9\\.]+)\\]", 1) as "similarity", 
     | regexp_extract('message, "selected=Prediction\\(label_id='(\\d+)'", 1) as "selected"
     | )
dss: org.apache.spark.sql.DataFrame = [trace: string, id: string ... 2 more fields]

scala> dss.show(false)
+--------------------------------+---+--------------------------------------+--------+
|trace                           |id |similarity                            |selected|
+--------------------------------+---+--------------------------------------+--------+
|47c10fda9b40407c998c154dc71a9e8c|617|                                      |        |
|47c10fda9b40407c998c154dc71a9e8c|   |0.6530804801919593, 0.6359653379418201|        |
|47c10fda9b40407c998c154dc71a9e8c|   |                                      |231     |
+--------------------------------+---+--------------------------------------+--------+

2) Group by "trace" and eliminate the cases where the regex didn't match. The quick and dirty way (show below) is to select the max of each column, but you might need to do something more sophisticated if you expect to encounter more than one match per trace:

scala> val ds_final = dss.groupBy('trace).agg(max('id) as "id", max('similarity) as "similarity", max('selected) as "selected")
ds_final: org.apache.spark.sql.DataFrame = [trace: string, id: string ... 2 more fields]

scala> ds_final.show(false)
+--------------------------------+---+--------------------------------------+--------+
|trace                           |id |similarity                            |selected|
+--------------------------------+---+--------------------------------------+--------+
|47c10fda9b40407c998c154dc71a9e8c|617|0.6530804801919593, 0.6359653379418201|231     |
+--------------------------------+---+--------------------------------------+--------+
Sign up to request clarification or add additional context in comments.

Comments

0

I ended up using something in the lines of

expected_schema = StructType([
   StructField("event_timestamp", TimestampType(), False),
   StructField("trace", StringType(), False),
   ...
])

@F.pandas_udf(expected_schema, F.PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def transform(pdf):
  output = {}

  for l in pdf.to_dict(orient='record'):
    x = re.findall(r'^(\[.*:\d+\]) (.*)', l['message'])[0][1]
    ...

  return pd.DataFrame(data=[output])

df.groupby('trace').apply(transform)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.