3

I have some R code that puts together demographic data from the Census for all of states in the US into a list object. The block-level code can take a week to run as a sequential loop since there are ~11M blocks, so I am trying to parallelize the loop over states to make it faster. I have accomplished this goal with this:

states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
           "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
           "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
           "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
           "VT","VA","WA","WV","WI","WY","DC","PR")
library(future.apply)
plan(multiprocess)
ptm <- proc.time()
CensusObj_block_age_sex = list()

CensusObj_block_age_sex[states] <- future_lapply(states, function(s){
  county <- census_geo_api(key = "XXX", state = s, geo = "county", age = TRUE, sex = TRUE)
  tract  <- census_geo_api(key = "XXX", state = s, geo = "tract",  age = TRUE, sex = TRUE)
  block  <- census_geo_api(key = "XXX", state = s, geo = "block",  age = TRUE, sex = TRUE)
  censusObj[[s]] <- list(state = s, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
}
)

However, I need to make it more robust. Sometimes there are problem with the Census API, so I would like the CensusObj to be updated at each state iteration so that I don't loose my completed data if something wrong. That way I can restart the loop over the remaining state if something does goes wrong (like if I spell "WY" as "WU")

Would it be possible to accomplish this somehow? I am open to other methods of parallelization.


The code above runs, but it seems to run into memory issues:

Error: Failed to retrieve the value of MultisessionFuture (future_lapply-3) from cluster RichSOCKnode #3 (PID 80363 on localhost ‘localhost’). The reason reported was ‘vector memory exhausted (limit reached?)’. Post-mortem diagnostic: A process with this PID exists, which suggests that the localhost worker is still alive.

I have R_MAX_VSIZE = 8Gb in my .Renviron, but I am not sure how that would get divided between the 8 cores on my machine. This all suggests that I need to store the results of each iteration rather than try to keep it all in memory, and then append the objects together at the end.

7
  • I'm not sure but maybe you can call save() in each iteration after CensusObj is updated. Then the latest version will be stored in the .RData object used in save(). Commented Aug 17, 2020 at 3:26
  • 1
    I would use saveRDS() to save each result individually somewhere. You can check that it already exists if you have to restart the loop. And combine the results later. Commented Aug 17, 2020 at 6:13
  • @Akshit In my understanding, the census object does not get updated until the very end of the loop, even if it parallelized. Commented Aug 19, 2020 at 5:34
  • @F.Privé I tried your suggestion, but it seems to overwrite the results with the current state. Commented Aug 19, 2020 at 5:35
  • @F.Privé is correct - saving/caching intermediate results to one file per "iteration" (per s) is the way to go. Commented Aug 19, 2020 at 6:44

3 Answers 3

1
+50

Here is a solution that uses doParallel (with the options for UNIX systems, but you can also use it on Windows, see here) and foreach that stores the results for every state separately and afterwards reads in the single files and combines them to a list.

library(doParallel)
library(foreach)

path_results <- "my_path"
ncpus = 8L
registerDoParallel(cores = ncpus)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
            "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
            "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
            "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
            "VT","VA","WA","WV","WI","WY","DC","PR")
results <- foreach(state = states) %dopar% {
                     county <- census_geo_api(key = "XXX", state = state, geo = "county", age = TRUE, sex = TRUE)
                     tract  <- census_geo_api(key = "XXX", state = state, geo = "tract",  age = TRUE, sex = TRUE)
                     block  <- census_geo_api(key = "XXX", state = state, geo = "block",  age = TRUE, sex = TRUE)
                     results <- list(state = state, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
                     
                     # store the results as rds
                     saveRDS(results,
                             file = paste0(path_results, "/", state, ".Rds"))
                     
                     # remove the results
                     rm(county)
                     rm(tract)
                     rm(block)
                     rm(results)
                     gc()
                     
                     # just return a string
                     paste0("done with ", state)
}

library(purrr)
# combine the results to a list
result_files <- list.files(path = path_results)
CensusObj_block_age_sex <- set_names(result_files, states) %>% 
  map(~ readRDS(file = paste0(path_results, "/", .x)))
Sign up to request clarification or add additional context in comments.

7 Comments

This is great. The only thing I had to tweak was to get this to work was foreach(i = states) since the census API need two character version.
Thanks for pointing this out; I forgot to use states in the loop; I've edited it so that it directly works now
I am having some trouble with the combining step at the very end. The error says Error: vector memory exhausted (limit reached?), but I have R_MAX_VSIZE = 14Gb in my .Renviron file and I have 16G of memory on my machine (and I am doing nothing else). There are 52 Rds files that are 654M according to du -sh. TX and CA are both less than 70M. I am not sure why I am having this problem. Is there any way around that?
So all Rds files together are 654MB? I'm not sure why R behaves so strangely. Are you by any change using MacOS? There seems to be this problem: stackoverflow.com/questions/51295402/…
That's why this is so strange. I am using Rstudio on MacOS. I can watch the memory used by RStudio go up to 14GB as the files are added and then I get that error. Would I need to turn off the parallel processing before I merge? I wonder if R is trying to load the same data into each of the 8 cores that I was using before to download the data and that's how I run out of memory.
|
1

You could use a tryCatch inside future_lapply to try to relaunch the calculation in case of API error, for a maximum of maxtrials.
In the resulting list, you get for each calculation the number of trials and the final status, OK or Error:

    states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
                "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
                "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
                "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
                "VT","VA","WA","WV","WI","WY","DC","PR")
    library(future.apply)
    #> Le chargement a nécessité le package : future
    plan(multiprocess)
    ptm <- proc.time()

    maxtrials <- 3

    census_geo_api <-
      function(key = "XXX",
               state = s,
               geo = "county",
               age = TRUE,
               sex = TRUE) {
        paste(state,'-', geo)
      }


    CensusObj_block_age_sex <- future_lapply(states, function(s) {
      ntrials <- 1
      while (ntrials <= maxtrials) {
        hasError <- tryCatch({
          #simulate random error
          if (runif(1)>0.3) {error("API failed")}
          county <- census_geo_api(key = "XXX", state = s, geo = "county", age = TRUE, sex = TRUE)
          tract  <- census_geo_api(key = "XXX", state = s, geo = "tract",  age = TRUE, sex = TRUE)
          block  <- census_geo_api(key = "XXX", state = s, geo = "block",  age = TRUE, sex = TRUE)
        },
        error = function(e)
          e)

        if (inherits(hasError, "error")) {
          ntrials <- ntrials + 1
        } else { break}
          
      }
      
      if (ntrials > maxtrials) {
        res <- list(state = s, status = 'Error', ntrials = ntrials-1, age = NA, sex = NA, block = NA, tract = NA, county = NA)
      } else  {
        res <- list(state = s, status = 'OK'    , ntrials = ntrials, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
      }
      res
    }
    )

    CensusObj_block_age_sex[[1]]
    #> $state
    #> [1] "AL"
    #> 
    #> $status
    #> [1] "OK"
    #> 
    #> $ntrials
    #> [1] 3
    #> 
    #> $age
    #> [1] TRUE
    #> 
    #> $sex
    #> [1] TRUE
    #> 
    #> $block
    #> [1] "AL - block"
    #> 
    #> $tract
    #> [1] "AL - tract"
    #> 
    #> $county
    #> [1] "AL - county"

<sup>Created on 2020-08-19 by the [reprex package](https://reprex.tidyverse.org) (v0.3.0)</sup>

Comments

0

One possible solution that I have is to log the value of CensusObj to a text file i.e print the CensusObj in each iteration. The doSNOW package can be used for logging for example

library(doSNOW)
cl <- makeCluster(1, outfile="abc.out")
registerDoSNOW(cl)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
        "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
        "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
        "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
        "VT","VA","WA","WV","WI","WY","DC","PR")
foreach(i=1:length(states), .combine=rbind, .inorder = TRUE) %dopar% {
    county <- "A"
    tract  <- "B"
    block  <- "C"
    censusObj <- data.frame(state = states[i], age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
    # edit: print json objects to easily extract from the file
    cat(sprintf("%s\n",rjson::toJSON(censusObj)))
}
stopCluster(cl)

This would log the value of censusObj in abc.out and also logs the error if program crashes but you will get the latest value of censusObj logged in abc.out.

Here is the output of the last iteration from the log file:

Type: EXEC {"state":"PR","age":true,"sex":true,"block":"C","tract":"B","county":"A"} Type: DONE

Type:EXEC means that the iteration has started and Type:DONE means execution is completed. The result of cat will be present between these two statements of each iteration. Now, the value of CensusObj can be extracted from the log file as shown below:

Lines = readLines("abc.out")
results = list()
for(i in Lines){
    # skip processing logs created by doSNOW
    if(!startsWith(i, "starting") && !startsWith(i, "Type:")){
        results = rlist::list.append(results, jsonlite::fromJSON(i))      
    }
}

results will contain the elements all the values printed in abc.out.

> head(results, 1)
[[1]]
[[1]]$state
[1] "AL"

[[1]]$age
[1] TRUE

[[1]]$sex
[1] TRUE

[[1]]$block
[1] "C"

[[1]]$tract
[1] "B"

[[1]]$county
[1] "A"

It is not a very clean solution but works.

2 Comments

Could you give a pointer on the best way to extract from the log file and put it all back together at the end?
@DimitriyV.Masterov I have updated my answer demonstrating how you could get the values in R. Hope this helps you!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.