I'm trying to parallelize my R code that pulls in data from a Snowflake table, but when I do, I get an error that I have an invalid connection. I don't receive this error when I do NOT have it parallelized, so I think that the table in my environment (a spark table) isn't making it into the foreach loop. But, I'm explicitly exporting my entire environment into it.
Here's a snippet of the code. If it makes any difference, I'm running this within Databricks. Thank you!
This is just set up
library(foreach)
library(doParallel)
library(tidyverse)
library(sparklyr)
unique_leader_test <- 1:10
#Connect to snowflake
SparkR::sparkR.session()
sc <- spark_connect(method="databricks")
httr::set_config(httr::config(http_version = 0))
sf_opts <- list(
sfUrl = "xxxx",
sfUser = "xxx",
sfPassword = "xxx",
sfDatabase = "xxx",
sfSchema = "xxx",
sfWarehouse = "xxx",
sfRole = "xxx"
)
all_survey_sql <- spark_read_source(
sc = sc,
name = "trend_data",
source = "snowflake",
options = c(
sf_opts,
dbtable = "my_data"
)
)
This is where I'm stuck
# Set up parallel processing
num_cores <- 6 # Use all cores minus 1
cl <- makeCluster(num_cores)
registerDoParallel(cl)
# Export necessary variables and functions to the cluster
#=clusterExport(cl, ls(.GlobalEnv))
# Make sure custom functions are available to workers
clusterEvalQ(cl, {
})
# Use foreach instead of for loop
# Note: Variables that accumulate across iterations (like *_list_full variables)
# will be local to each parallel worker. Consider collecting results if needed.
foreach(leader = unique_leader_test, .packages = c("sparklyr", "tidyverse")) %dopar% {
if(custom == FALSE){
leaders_id <- leader
#Connect to snowflake
SparkR::sparkR.session()
sc <- spark_connect(method="databricks")
httr::set_config(httr::config(http_version = 0))
all_surveys <- all_survey_sql %>%
filter(leader == leaders_id) %>%
collect()
print(leaders_id)
}
}
stopCluster(cl)
foreach(i.e., where issclater used)? Please poststr(all_survey_sql)? Is it a data frame? Does it needscas a pointer? Also, please post verbatim error message..exportargument offoreachto export only the objects you need.