6

Imagine a large dataset (>40GB parquet file) containing value observations of thousands of variables as triples (variable, timestamp, value).

Now think of a query in which you are just interested in a subset of 500 variables. And you want to retrieve the observations (values --> time series) for those variables for specific points in time (observation window or timeframe). Such having a start and end time.

Without distributed computing (Spark), you could code it like this:

for var_ in variables_of_interest:
    for incident in incidents:

        var_df = df_all.filter(
            (df.Variable == var_)
            & (df.Time > incident.startTime)
            & (df.Time < incident.endTime))

My question is: how to do that with Spark/PySpark? I was thinking of either:

  1. joining the incidents somehow with the variables and filter the dataframe afterward.
  2. broadcasting the incident dataframe and use it within a map-function when filtering the variable observations (df_all).
  3. use RDD.cartasian or RDD.mapParitions somehow (remark: the parquet file was saved partioned by variable).

The expected output should be:

incident1 --> dataframe 1
incident2 --> dataframe 2
...

Where dataframe 1 contains all variables and their observed values within the timeframe of incident 1 and dataframe 2 those values within the timeframe of incident 2.

I hope you got the idea.

UPDATE

I tried to code a solution based on idea #1 and the code from the answer given by zero323. Work's quite well, but I wonder how to aggregate/group it to the incident in the final step? I tried adding a sequential number to each incident, but then I got errors in the last step. Would be cool if you can review and/or complete the code. Therefore I uploaded sample data and the scripts. The environment is Spark 1.4 (PySpark):

5
  • Not so much of a large dataset, it's not even a tera. A big one though! :) What have you tried so far..? Commented Aug 25, 2016 at 22:58
  • I read tons of posts and examples just to get an idea how I could solve it. But haven't implemented anything yet. But my first shot would be filtering the variables with the isin-function to df2, then broadcasting the incident dataframe and use map on df2. But not sure how to get (yield) to those dataframes (observations) for each incident. Somehow stuck. Commented Aug 25, 2016 at 23:01
  • join looks like a sensible starting pont. You have enough to avoid Cartesian product and with 500 records this can be easily optimized to broadcast join. Commented Aug 26, 2016 at 0:10
  • Any example? I will try to code one today. Commented Aug 26, 2016 at 10:46
  • I added sample code and script above. Please review. Commented Aug 28, 2016 at 14:03

1 Answer 1

3

Generally speaking only the first approach looks sensible to me. Exact joining strategy on the number of records and distribution but you can either create a top level data frame:

ref = sc.parallelize([(var_, incident) 
    for var_ in variables_of_interest:
    for incident in incidents
]).toDF(["var_", "incident"])

and simply join

same_var = col("Variable") == col("var_")
same_time = col("Time").between(
    col("incident.startTime"),
    col("incident.endTime")
)

ref.join(df.alias("df"), same_var &  same_time)

or perform joins against particular partitions:

incidents_ = sc.parallelize([
   (incident, ) for incident in incidents
]).toDF(["incident"])

for var_ in variables_of_interest:
    df = spark.read.parquet("/some/path/Variable={0}".format(var_))
    df.join(incidents_, same_time)

optionally marking one side as small enough to be broadcasted.

Sign up to request clarification or add additional context in comments.

5 Comments

Hmm, thanks for the example. will need some time to understand it. could you contact me on Skype? just to discuss some details? nagilo12345
Sorry @Matthias, I don't have an account anymore.
Hi Zero. I tried using your code in a script and works fine so far. The only thing that I don't get is how to add a number to each incident that I can use after the final join step to select the data from the resulting frame by incidents number. You can find the scripts in the updated question above. Please review, thanks!
Sorry, I am not much around here lately. I'll try to take a look when I have a spare moment.
Yeah, stackoverlow is not the fastest way to communicate ;) thanks for your support anyway.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.