0

The code below maps values and column names of my reference df with my actual dataset, finding exact matches and if an exact match is found, return the OutputValue. However, I'm trying to add the rule that when PrimaryValue = DEFAULT to also return the OutputValue.

The solution I'm trying out to tackle this is to create a new dataframe with null values - since there was no match provided by code below. Thus the next step would be to target the null values whose corresponding PrimaryValue = DEFAULT to replace null by the OutputValue.

  #create a map based on columns from reference_df
  map_key = concat_ws('\0', final_reference.PrimaryName, final_reference.PrimaryValue)
  map_value = final_reference.OutputValue

  #dataframe of concatinated mappings to get the corresponding OutputValues from reference table
  d = final_reference.agg(collect_set(array(concat_ws('\0','PrimaryName','PrimaryValue'), 'OutputValue')).alias('m')).first().m
  #display(d)

  #iterate through mapped values 
  mappings = create_map([lit(i) for i in chain.from_iterable(d)])

  #dataframe with corresponding matched OutputValues
  dataset = datasetM.select("*",*[ mappings[concat_ws('\0', lit(c), col(c))].alias(c_name) for c,c_name in matched_List.items()]) 
  display(dataset)
8
  • do you mean when the requested col-name from primaryLookupAttributeName_List does not exists in datasetMatchedPortfolio which will yield ERROR? so you want to add a default name to go through the ERROR? Commented May 22, 2020 at 21:45
  • @jxc, so there's no error. It simply populates the column with null. The dataset will never include DEFAULT , it will have a regular value. When PrimaryLookupAttributeName is DEFAULT then I will like to replace those null (no match found) by the corresponding OutputItemNameByValue. I will update my question with more info! Commented May 22, 2020 at 21:49
  • very likely, you just need coalesce, for example: coalesce(mappings[concat_ws('\0', lit(c), col(c))], lit("DEFAULT")).alias(c_name). make sure to import pyspark.sql.functions.coalesce Commented May 22, 2020 at 21:54
  • @jxc, sorry, are you proposing to do this to the null df or to include it as part of my initial datasetPrimaryAttributes_False = Commented May 22, 2020 at 21:57
  • @jxc, would your recommend including a udf here with if/else/ elif loop? I have 3 cases for matched: 1) if match is found, copy the outputValue, 2) if DeFAULT, copy outputValue, 3) if no match at all and null, "Lookup not found". Otherwise, my idea so far is to continue building filter down dataframes until my last case and all dataset values have a corresponding updated value. Commented May 22, 2020 at 22:01

1 Answer 1

2

From discussion in comments, I think you just need to add a default mappings from the existing one and then use coalease() function to find the first non-null value, see below:

from pyspark.sql.functions import collect_set, array, concat_ws, lit, col, create_map, coalesce

# skip some old code

d    
#[['LeaseStatus\x00Abandoned', 'Active'],
# ['LeaseStatus\x00DEFAULT', 'Pending'],
# ['LeaseRecoveryType\x00Gross-modified', 'Modified Gross'],
# ['LeaseStatus\x00Archive', 'Expired'],
# ['LeaseStatus\x00Terminated', 'Terminated'],
# ['LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'],
# ['LeaseRecoveryType\x00Gross', 'Gross']]

# original mapping
mappings = create_map([ lit(j) for i in d for j in i ])

# default mapping
mappings_default = create_map([ lit(j.split('\0')[0]) for i in d if i[0].upper().endswith('\x00DEFAULT') for j in i ])
#Column<b'map(LeaseStatus, Pending)'>

# a set of available PrimaryLookupAttributeName
available_list = set([ i[0].split('\0')[0] for i in d ])
# {'LeaseRecoveryType', 'LeaseStatus'}

# use coalesce to find the first non-null values from mappings, mappings_defaul etc
datasetPrimaryAttributes_False = datasetMatchedPortfolio.select("*",*[ 
  coalesce(
    mappings[concat_ws('\0', lit(c), col(c))],
    mappings_default[c],
    lit("Not Specified at Source" if c in available_list else "Lookup not found")
  ).alias(c_name) for c,c_name in matchedAttributeName_List.items()])

Some explanation:

(1) d is a list of lists retrieved from the reference_df, we use a list comprehension [ lit(j) for i in d for j in i ] to flatten this to a list and apply the flattened list to the create_map function:

(2) The mappings_default is similar to the above, but add a if condition to serve as a filter and keep only entries having PrimaryLookupAttributeValue (which is the first item of the inner list i[0]) ending with \x00DEFAULT and then use split to strip PrimaryLookupAttributeValue(which is basically \x00DEFAULT) off from the map_key.

Sign up to request clarification or add additional context in comments.

9 Comments

can you explain mappings
I understand it actually. I'll rewrite them to describe variable names , thanks! Also, I just realized that I actually have another scenario. 1) if match is found, copy the outputValue, 2) if DEFAULT, copy outputValue, 3) if no match at all and null, "Lookup not found" 4) if there are null values in dataset. For example, dataset.LeaseRecoveryType has nulls, so there is no match not because there is no reference_table value but bc the actual dataset does not provide a value. So if the dataset value = null, then return " Not Specified at Source" ....@jxc
don't think it's working. I still only get "Look up not found" for when dataset.LeaseRecoveryType = null. Playing around with it right now @jxc
how about change if c in available_list to if c not in available_list?
Yes! I tried that too and it works .. however, struggling to understand why exactly
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.