0

I'm trying to parallelize my R code that pulls in data from a Snowflake table, but when I do, I get an error that I have an invalid connection. I don't receive this error when I do NOT have it parallelized, so I think that the table in my environment (a spark table) isn't making it into the foreach loop. But, I'm explicitly exporting my entire environment into it.

Here's a snippet of the code. If it makes any difference, I'm running this within Databricks. Thank you!

This is just set up

library(foreach)
library(doParallel)
library(tidyverse)
library(sparklyr)

unique_leader_test <- 1:10

#Connect to snowflake
SparkR::sparkR.session()
sc <- spark_connect(method="databricks")

httr::set_config(httr::config(http_version = 0))


sf_opts <- list(
  sfUrl       = "xxxx",
  sfUser      = "xxx",
  sfPassword  = "xxx",
  sfDatabase  = "xxx",
  sfSchema    = "xxx",
  sfWarehouse = "xxx",
  sfRole      = "xxx"
)


all_survey_sql <- spark_read_source(
  sc     = sc,
  name   = "trend_data",
  source = "snowflake",
  options = c(
    sf_opts,
    dbtable = "my_data"
  )
)

This is where I'm stuck

# Set up parallel processing
num_cores <- 6  # Use all cores minus 1
cl <- makeCluster(num_cores)
registerDoParallel(cl)

# Export necessary variables and functions to the cluster
#=clusterExport(cl, ls(.GlobalEnv))
# Make sure custom functions are available to workers
clusterEvalQ(cl, {

})

# Use foreach instead of for loop
# Note: Variables that accumulate across iterations (like *_list_full variables) 
# will be local to each parallel worker. Consider collecting results if needed.
foreach(leader = unique_leader_test, .packages = c("sparklyr", "tidyverse")) %dopar% {  
  if(custom == FALSE){
  
  leaders_id <- leader

#Connect to snowflake
SparkR::sparkR.session()
sc <- spark_connect(method="databricks")

httr::set_config(httr::config(http_version = 0))

    all_surveys <- all_survey_sql %>%
    filter(leader == leaders_id) %>%
    collect()

print(leaders_id)

}
}

stopCluster(cl)
2
  • Interesting post but I have many questions. Are these separate code blocks or part of one contiguous code? Why are you connecting inside foreach (i.e., where is sc later used)? Please post str(all_survey_sql)? Is it a data frame? Does it need sc as a pointer? Also, please post verbatim error message. Commented Aug 7 at 0:01
  • "I'm explicitly exporting my entire environment" No, you are not. It's commented out. It also would be bad practice. You should use the .export argument of foreach to export only the objects you need. Commented Aug 7 at 6:22

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.