1

I use this SQL to create a session_id for a dataset. If a user is inactive for more than 30 minutes (30*60 seconds), then a new session_id is assigned I am new to Spark SQL and trying to replicate the same procedure using Spark SQL Context. But I'm encountering some errors.

session_id follows the naming convention:
userid_1,
userid_2,
userid_3,...

SQL (date is in seconds):

CREATE TABLE tablename_with_session_id AS 
    SELECT * , userid || '_' || SUM(new_session) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS session_id 
        FROM 
                (SELECT *, 
                    CASE 
                    WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) 
                    THEN 1 
                    WHEN row_number() over (partition by userid order by date) = 1 
                    THEN 1 
                    ELSE 0 
                    END as new_session 
                    FROM 
                        tablename 
                ) 
    order by date;

I tried using the same SQL in Spark-Scala with:

    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

    val tableSessionID = sqlContext.sql("SELECT * , CONCAT(userid,'_',SUM(new_session)) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS new_session_id FROM
(SELECT *, CASE WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) THEN 1 WHEN row_number() over (partition by userid order by date) = 1 THEN 1 ELSE 0 END as new_session FROM clickstream) order by date") 

Some Error which suggested to wrap Spark SQL expression ..sum(new_session).. within window function.

I tried to using multiple data frames:

val temp1 = sqlContext.sql("SELECT *, CASE WHEN (date - LAG(date) OVER (PARTITION BY userid ORDER BY date) >= 30 * 60) THEN 1 WHEN row_number() over (partition by userid order by date) = 1 THEN 1 ELSE 0 END as new_session FROM clickstream")

temp1.registerTempTable("clickstream_temp1")

val temp2 = sqlContext.sql("SELECT * , SUM(new_session) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS s_id FROM clickstream_temp1")

temp2.registerTempTable("clickstream_temp2")

val temp3 = sqlContext.sql("SELECT * , CONCAT(userid,'_',s_id) OVER (PARTITION BY userid ORDER BY date asc, new_session desc rows unbounded preceding) AS new_session_id FROM clickstream_temp2")

It returns an error only on the above statement. 'val temp3 = ...' That CONCAT(userid,'_',s_id) cannot be used within window function.

What's the workaround? Is there an alternative?

Thanks

1
  • CONCAT(userid, '_', SUM(new_session) OVER (PARTITION BY ...)) Commented Feb 25, 2017 at 17:20

1 Answer 1

2

To use concat with spark window function you need to use user defined aggregate function(UDAF). You can't directly use concat function with window function.

//Extend UserDefinedAggregateFunction to write custom aggregate function
//You can also specify any constructor arguments. For instance you can have
//CustomConcat(arg1: Int, arg2: String)
class CustomConcat() extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction {
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.Row
// Input Data Type Schema
def inputSchema: StructType = StructType(Array(StructField("description", StringType)))
// Intermediate Schema
def bufferSchema = StructType(Array(StructField("groupConcat", StringType)))
// Returned Data Type.
def dataType: DataType = StringType
// Self-explaining
def deterministic = true
// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {buffer(0) = " ".toString}
// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = { buffer(0) = buffer.getString(0) + input.getString(0) }
// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { buffer1(0) = buffer1.getString(0) + buffer2.getString(0) }
// Called after all the entries are exhausted.
def evaluate(buffer: Row) = {buffer.getString(0)}
}
val newdescription = new CustomConcat
val newdesc1=newdescription($"description").over(windowspec)

You can use newdesc1 as an aggregate function for concatenation in window functions. For more information you can have a look at : databricks udaf I hope this will answer your question.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.