0

I have a dataframe with a column called "traits" which is an integer composed of multiple flags.

I need to convert this column to a list of strings (for elastic search indexing). Conversion looks like this.

TRAIT_0 = 0
TRAIT_1 = 1
TRAIT_2 = 2
def flag_to_list(flag: int) -> List[str]:
    trait_list = []
    if flag & (1 << TRAIT_0):
        trait_list.append("TRAIT_0")
    elif flag & (1 << TRAIT_1):
        trait_list.append("TRAIT_1")
    elif flag & (1 << TRAIT_2):
        trait_list.append("TRAIT_2")

    return trait_list

What is the most efficient way of doing this transformation in pyspark? I saw lots of examples on how to do concatenation and splitting of strings, but not an operation like this.

Using pyspark vesion 2.4.5

Input json looks like this: { "name": "John Doe", "traits": 5 } Output json should look like this: { "name": "John Doe", "traits": ["TRAIT_0", "TRAIT_2"] }

2
  • It would help if you could provide sample reproducible data and required output. Also what is your spark version ? Commented Apr 28, 2020 at 1:20
  • Edited my question, pyspark version is 2.4.5 and provided input and output data. Commented Apr 28, 2020 at 1:39

2 Answers 2

1

IIUC, you can try SparkSQL built-in functions: (1) use conv + split to convert integer(base-10) -> binary(base-2) -> string -> array of strings(reversed), (2) based on 0 or 1 values and their array indices to filter and transform the array into the corresponding array of named traits:

from pyspark.sql.functions import expr

df = spark.createDataFrame([("name1", 5),("name2", 1),("name3", 0),("name4", 12)], ['name', 'traits'])
#DataFrame[name: string, traits: bigint]

traits = [ "Traits_{}".format(i) for i in range(8) ]
traits_array = "array({})".format(",".join("'{}'".format(e) for e in traits))
# array('Traits_0','Traits_1','Traits_2','Traits_3','Traits_4','Traits_5','Traits_6','Traits_7')

sql_expr = """

    filter(
      transform(
        /* convert int -> binary -> string -> array of strings, and then reverse the array */
        reverse(split(string(conv(traits,10,2)),'(?!$)')),
        /* take the corresponding items from the traits_array when value > 0, else NULL */
        (x,i) -> {}[IF(x='1',i,NULL)]
      ),
      /* filter out NULL items from the array */
      y -> y is not NULL
    ) AS trait_list

""".format(traits_array)
# filter(
#   transform(
#     reverse(split(string(conv(traits,10,2)),'(?!$)')),
#     (x,i) -> array('Traits_0','Traits_1','Traits_2','Traits_3','Traits_4','Traits_5','Traits_6','Traits_7')[IF(x='1',i,NULL)]
#   ),
#   y -> y is not NULL
# )

df.withColumn("traits_list", expr(sql_expr)).show(truncate=False)
+-----+------+--------------------+
|name |traits|traits_list         |
+-----+------+--------------------+
|name1|5     |[Traits_0, Traits_2]|
|name2|1     |[Traits_0]          |
|name3|0     |[]                  |
|name4|12    |[Traits_2, Traits_3]|
+-----+------+--------------------+

Below is the result after running reverse(split(string(conv(traits,10,2)),'(?!$)')), notice that the split-pattern (?!$) is used to avoid a NULL shown as the last array item.

df.selectExpr("*", "reverse(split(string(conv(traits,10,2)),'(?!$)')) as t1").show()
+-----+------+------------+
| name|traits|          t1|
+-----+------+------------+
|name1|     5|   [1, 0, 1]|
|name2|     1|         [1]|
|name3|     0|         [0]|
|name4|    12|[0, 0, 1, 1]|
+-----+------+------------+
Sign up to request clarification or add additional context in comments.

1 Comment

Marking this one as the correct answer since using SparkSql functions is more performant than the UDF-based approach, even though this one is a lot more complicated.
1

We can define a UDF to wrap your function and then call it. This is some sample code:

from typing import List
from pyspark.sql.types import ArrayType, StringType

TRAIT_0 = 0
TRAIT_1 = 1
TRAIT_2 = 2


def flag_to_list(flag: int) -> List[str]:
    trait_list = []
    if flag & (1 << TRAIT_0):
        trait_list.append("TRAIT_0")
    elif flag & (1 << TRAIT_1):
        trait_list.append("TRAIT_1")
    elif flag & (1 << TRAIT_2):
        trait_list.append("TRAIT_2")
    return trait_list


flag_to_list_udf = udf(lambda x: None if x is None else flag_to_list(x),
                       ArrayType(StringType()))

# Create dummy data to test
data = [
    { "name": "John Doe", "traits": 5 },
    { "name": "Jane Doe", "traits": 2 },
    { "name": "Jane Roe", "traits": 0 },
    { "name": "John Roe", "traits": 6 },
]
df = spark.createDataFrame(data, 'name STRING, traits INT')
df.show()
# +--------+------+
# |    name|traits|
# +--------+------+
# |John Doe|     5|
# |Jane Doe|     2|
# |Jane Roe|     0|
# |John Roe|     6|
# +--------+------+

df = df.withColumn('traits_processed', flag_to_list_udf(df['traits']))
df.show()
# +--------+------+----------------+
# |    name|traits|traits_processed|
# +--------+------+----------------+
# |John Doe|     5|       [TRAIT_0]|
# |Jane Doe|     2|       [TRAIT_1]|
# |Jane Roe|     0|              []|
# |John Roe|     6|       [TRAIT_1]|
# +--------+------+----------------+

If you don't want to create a new column, you can replace traits_processed with traits.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.