2

I have this dataset with a column of array type. From this column, we need to create another column which will have list of unique elements and its counts.

Example [a,b,e,b] results should be [[b,a,e],[2,1,1]]. Data should be sorted by count. Even key value where value is the count will do. I created a udf (please see below) for this purpose, but it is very slow so I need to do this in PySpark built-in functions.

id col_a collected_col_a
1 a [a, b, e, b]
1 b [a, b, e, b]
struct_schema1 = StructType([
    StructField('elements', ArrayType(StringType()), nullable=True),
    StructField('count', ArrayType(IntegerType()), nullable=True)
])

# udf
@udf(returnType=struct_schema1)
def func1(x, top = 10):
    y,z=np.unique(x,return_counts=True)
    z_y = zip(z.tolist(), y.tolist())
    y = [i for _, i in sorted(z_y, reverse = True)]
    z = sorted(z.tolist(), reverse = True)
    if len(y) > top:
        return {'elements': y[:top],'count': z[:top]}
    else:
        return {'elements': y,'count': z}

2 Answers 2

2

You can use combination of transform and filter functions along with array_distinct and size to get the desired output. Here's and example:

from pyspark.sql import functions as F

# example of input dataframe
df = spark.createDataFrame([(1, ["a", "b", "e", "b"]), (2, ["a", "a", "c", "b"])], ["id", "arrayCol"])


df1 = df.withColumn(
    "uniqueCount",
    F.transform(
        F.array_distinct("arrayCol"),
        lambda x: F.struct(
            x.alias("value"),
            F.size(F.filter("arrayCol", lambda y: x == y)).alias("cout")
        )
    )
)
df1.show(truncate=False)
#+---+------------+------------------------+
#|id |arrayCol    |uniqueCount             |
#+---+------------+------------------------+
#|1  |[a, b, e, b]|[{a, 1}, {b, 2}, {e, 1}]|
#|2  |[a, a, c, b]|[{a, 2}, {c, 1}, {b, 1}]|
#+---+------------+------------------------+
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks, and I was able to sort this struct by its count by reversing the same
1

An approach creating a map. Using aggregate and map_zip_with. The other approach seems clearer though.

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, 'a', ['a', 'b', 'e', 'b']),
     (1, 'b', ['a', 'b', 'e', 'b'])],
    ['id', 'col_a', 'collected_col_a']
)
df = df.withColumn('elem_count',
    F.aggregate(
        'collected_col_a',
        F.lit(None).cast('map<string,int>'),
        lambda m, x: F.map_zip_with(
            F.coalesce(m, F.create_map(x, F.lit(0))),
            F.create_map(x, F.lit(1)),
            lambda k, v1, v2: F.coalesce(v1, F.lit(0)) + F.coalesce(v2, F.lit(0))
        )
    )
)
df.show(truncate=0)
# +---+-----+---------------+------------------------+
# |id |col_a|collected_col_a|elem_count              |
# +---+-----+---------------+------------------------+
# |1  |a    |[a, b, e, b]   |{a -> 1, b -> 2, e -> 1}|
# |1  |b    |[a, b, e, b]   |{a -> 1, b -> 2, e -> 1}|
# +---+-----+---------------+------------------------+

Sorry, I couldn't figure out how to sort based on map values.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.