1

So I have this streaming dataframe and I'm trying to cast this 'customer_ids' column to a simple string.

schema = StructType()\
    .add("customer_ids", MapType(StringType(), StringType()))\
    .add("date", TimestampType())

original_sdf = spark.readStream.option("maxFilesPerTrigger", 800)\
    .load(path=source, ftormat="parquet", schema=schema)\
    .select('customer_ids', 'date')

The intend to this conversion is to group by this column and agregate by max(date) like this

original_sdf.groupBy('customer_ids')\
  .agg(max('date')) \
  .writeStream \
  .trigger(once=True) \
  .format("memory") \
  .queryName('query') \
  .outputMode("complete") \
  .start()

but I got this exception

AnalysisException: u'expression `customer_ids` cannot be used as a grouping expression because its data type map<string,string> is not an orderable data type.

How can I cast this kind of streaming DataFrame column or any other way to groupBy this column?

1
  • Please provide some input/output data. Commented Dec 13, 2017 at 13:57

1 Answer 1

2

TL;DR Use getItem method to access the values per key in a MapType column.


The real question is what key(s) you want to groupBy since a MapType column can have a variety of keys. Every key can be a column with values from the map column.

You can access keys using Column.getItem method (or a similar python voodoo):

getItem(key: Any): Colum An expression that gets an item at position ordinal out of an array, or gets a value by key key in a MapType.

(I use Scala and am leaving converting it to pyspark as a home exercise)

val ds = Seq(Map("hello" -> "world")).toDF("m")
scala> ds.show(false)
+-------------------+
|m                  |
+-------------------+
|Map(hello -> world)|
+-------------------+

scala> ds.select($"m".getItem("hello") as "hello").show
+-----+
|hello|
+-----+
|world|
+-----+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.