1

I'm working on a problem where I have a dataset in the following format (replaced real data for example purposes):

session activity timestamp
1 enter_store 2022-03-01 23:25:11
1 pay_at_cashier 2022-03-01 23:31:10
1 exit_store 2022-03-01 23:55:01
2 enter_store 2022-03-02 07:15:00
2 pay_at_cashier 2022-03-02 07:24:00
2 exit_store 2022-03-02 07:35:55
3 enter_store 2022-03-05 11:07:01
3 exit_store 2022-03-05 11:22:51

I would like to be able to compute counting statistics for these events based on the pattern observed within each session. For example, based on the table above, the count of each pattern observed would be as follows:

{
    'enter_store -> pay_at_cashier -> exit_store': 2, 
    'enter_store -> exit_store': 1
}

I'm trying to do this in PySpark, but I'm having some trouble figuring out the most efficient way to do this kind of pattern matching where some steps are missing. The real problem involves a much larger dataset of ~15M+ events like this.

I've tried logic in the form of filtering the entire DF for unique sessions where 'enter_store' is observed, and then filtering that DF for unique sessions where 'pay_at_cashier' is observed. That works fine, the only issue is I'm having trouble thinking of ways where I can count the sessions like 3 where there is only a starting step and final step, but no middle step.

Obviously one way to do this brute-force would be to iterate over each session and assign it a pattern and increment a counter, but I'm looking for more efficient and scalable ways to do this.

Would appreciate any suggestions or insights.

4
  • Do you need the activities themselves or just the count of activities per session? Commented Apr 21, 2022 at 21:50
  • Do you have some columns to order upon? Spark's dataframe is unordered, so counting the sequence of activities only by grouping is non-deterministic. Commented Apr 21, 2022 at 23:05
  • @pltc The session is just a way to identify a group of activities - so I'm looking for a count of unique sessions for each distinct pattern like we see in the JSON. Commented Apr 22, 2022 at 5:35
  • @Emma yes, we would have a timestamp column to sort values by in order to determine the order of events, I forgot to add that to the description. Commented Apr 22, 2022 at 5:35

1 Answer 1

1

For Spark 2.4+, you could do

df = (df
      .withColumn("flow", F.expr("sort_array(collect_list(struct(timestamp, activity)) over (partition by session))"))
      .withColumn("flow", F.expr("concat_ws(' -> ', transform(flow, v -> v.activity))"))
      .groupBy("flow").agg(F.countDistinct("session").alias("total_session"))
      )
df.show(truncate=False)

# +-------------------------------------------+-------------+
# |flow                                       |total_session|
# +-------------------------------------------+-------------+
# |enter_store -> pay_at_cashier -> exit_store|2            |
# |enter_store -> exit_store                  |1            |
# +-------------------------------------------+-------------+

The first block was collecting list of timestamp and its activity for each session in an ordered array (be sure timestamp is timestamp format) based on its timestamp value. After that, use only the activity values from the array using transform function (and combine them to create a string using concat_ws if needed) and group them by the activity order to get the distinct sessions.

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you, this is a great direction to move in. I'm able to replicate what your code is doing and it works. If I were to add another condition/column to base some of the grouping on, would that be possible within the flow of your logic? For example, if I were to say there is another column called door_number, and we want to only aggregate these results based on sessions where the customer has an "exit_door" event, with a "door_number" of 5?
I'm not quite sure what you mean but you could also collect door numbers as array and leaving the collection of activity as array too. That way you could filter the arrays using exist function before doing aggregation.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.