The goal is to filter the first dataframe based on how similar the x and y are to different zones in the second dataframe. There is a calculation set to each x and y in df 1 that creates boundaries +- a delta value (ie x_minus = x - 2 or x_plus = x + 2). The function then filters df2 based on if the x is less than x_plus and greater than x_minus and the same for y's. The issue I'm running into is how to access 2 dataframes in a udf or how to read a dataframe inside another
The actual datasets have grown to be 100's of gb's large so python alone isn's sufficient, but initially the solution was found in python on the smaller versions of the data and now it must be converted to pyspark. I'm currently running these processes using EMR clusters and a jupyter notebook to test it out. Below is a sample of fake data to demonstrate process.
id ; x ; y
1 ;19.1;11.1
2 ;29.1;9.1
3 ;39.1;3.1
4 ;49.1;6.1
df2
id ; x ; y ; zone
1 ;20.1; 12.1 ; GG
2 ;30.1; 9.1 ; AA
3 ;40.1; 3.1 ; BB
4 ;50.1; 6.1 ; TT
df3 - desired results
id ; x ; y ; zone
1 ;19; 11 ; GG
2 ;29; 9 ; AA
3 ;39; 3 ; BB
4 ;49; 6 ; TT
# x is gathered on a row by row basis like .apply for pandas dataframes
def find_zone(x,y,lookup):
x_plus = x + 2
x_minus = x - 2
y_plus = y + 2
y_minus = y - 2
zone = lookup.loc[(lookup['x'] < x_plus & lookup['x'] > x_minus & lookup['y'] < y_plus & lookup['y'] > y_minus)]['zone']
return zone
#zone for each row converted to udf for pyspark
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
find_zone = udf(find_zone, IntegerType)
# first way tried with df2 as a parameter
df1 = df1.withColumn('zone',find_zone('x','y',df2)
# second way tried to create df2 within the udf
def find_zone(x,y):
x_plus = x + 2
x_minus = x - 2
y_plus = y + 2
y_minus = y - 2
# importing as spark df and converting to pandas because it was the only method that partially worked while testing
lookup = spark.read.parquet('s3_uri').toPandas()
zone = lookup.loc[(lookup['x'] < x_plus & lookup['x'] > x_minus & lookup['y'] < y_plus & lookup['y'] > y_minus)]['zone']
return zone
df1 = df1.withColumn('zone',find_zone('x','y')
My apologies for any syntax issues I'm trying to copy something similar without revealing my actual functions or values.