0

I have requirement where, I need to mask the data stored in Cassandra tables using pyspark. I have a frozen data set in Cassandra which I get it as Array in pyspark. I converted it to String for masking it. Now, I want to convert it back to array type.

I am using spark 2.3.2 to mask data from Cassandra table. I copied data to a data frame and converted it to string to perform masking. I tried to convert it back to array However, I am not able to maintain the original structure.

table_df.createOrReplaceTempView("tmp")
networkinfos_df= sqlContext.sql('Select  networkinfos , pid, eid, s sid From tmp  ')


dfn1 = networkinfos_df.withColumn('networkinfos_ntdf',regexp_replace(networkinfos_df.networkinfos.cast(StringType()),r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b', faker.ipv4_private(network=False, address_class=None))).drop('networkinfos') \
.withColumn('networkinfos_ntdf',regexp_replace('networkinfos_ntdf',r'([a-fA-F0-9]{2}[:|\-]?){6}', faker.mac_address())) \
.withColumn('networkinfos_ntdf',regexp_replace('networkinfos_ntdf',r'(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))', faker.ipv6(network=False))) \
.drop('networkinfos') 

dfn2 = dfn1.withColumn("networkinfos_ntdf", array(dfn1["networkinfos_ntdf"]))

dfn2.show(30,False)

The original structure it as follows:

enter code here
|-- networkinfos: array (nullable = true)

 |    |-- element: struct (containsNull = true)

 |    |    |-- vendor: string (nullable = true)

 |    |    |-- product: string (nullable = true)

 |    |    |-- dhcp_enabled: boolean (nullable = true)

 |    |    |-- dhcp_server: string (nullable = true)

 |    |    |-- dns_servers: array (nullable = true)

 |    |    |    |-- element: string (containsNull = true)

 |    |    |-- ipv4: string (nullable = true)

 |    |    |-- ipv6: string (nullable = true)

 |    |    |-- subnet_mask_obsolete: string (nullable = true)

 |    |    |-- default_ip_gateway: string (nullable = true)

 |    |    |-- mac_address: string (nullable = true)

 |    |    |-- logical_name: string (nullable = true)

 |    |    |-- dhcp_lease_obtained: timestamp (nullable = true)

 |    |    |-- dhcp_lease_expires: timestamp (nullable = true)

 |    |    |-- ip_enabled: boolean (nullable = true)

 |    |    |-- ipv4_list: array (nullable = true)

 |    |    |    |-- element: string (containsNull = true)

 |    |    |-- ipv6_list: array (nullable = true)

 |    |    |    |-- element: string (containsNull = true)

 |    |    |-- subnet_masks_obsolete: array (nullable = true)

 |    |    |    |-- element: string (containsNull = true)

 |    |    |-- default_ip_gateways: array (nullable = true)

 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- wins_primary_server: string (nullable = true)


 |    |    |-- wins_secondary_server: string (nullable = true)

 |    |    |-- subnet_mask: string (nullable = true)

 |    |    |-- subnet_masks: array (nullable = true)

 |    |    |    |-- element: string (containsNull = true)

 |    |    |-- interface_index: integer (nullable = true)
 |    |    |-- speed: long (nullable = true)

 |    |    |-- dhcp_servers: array (nullable = true)

 |    |    |    |-- element: string (containsNull = true)

What I am getting is:

root
 |-- pid: string (nullable = true)

 |-- eid: string (nullable = true)

 |-- sid: string (nullable = true)

 |-- networkinfos_ntdf: array (nullable = false)

 |    |-- element: string (containsNull = true)

How can I get it converted to original structure?

1

1 Answer 1

1

You can try using pyspark.sql.functions.to_json() and pyspark.sql.functions.from_json() to handle your task if your regexp_replace operations do not break the JSON data:

First find the schema for the field networkinfos:

from pyspark.sql.types import ArrayType
from pyspark.sql.functions import regexp_replace, from_json, to_json

# get the schema of the array field `networkinfos` in JSON
schema_data = df.select('networkinfos').schema.jsonValue()['fields'][0]['type']

# convert it into pyspark.sql.types.ArrayType:
field_schema = ArrayType.fromJson(schema_data)

After you have the field_schema, you can use from_json to set it back to its original schema from the modified JSON strings:

dfn1 = networkinfos_df \
        .withColumn('networkinfos', to_json('networkinfos')) \
        .withColumn('networkinfos', regexp_replace('networkinfos',...)) \
        .....\
        .withColumn('networkinfos', from_json('networkinfos', field_schema))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.