3

I have a PostgreSQL database set up with a table and columns already defined. The primary key for the table is a combination of (Id, datetime) column. I need to periodically INSERT data for different Ids from R data.table into the database. However, if data for a particular (Id, datetime) combination already exists it should be UPDATED (overwritten). How can I do this using RPostgres or RPostgreSQL packages?

When I try to insert a data.table where some (Id, datetime) rows already exist I get an error saying the primary key constraint is violated:

dbWriteTable(con, table, dt, append = TRUE, row.names = FALSE)

Error in connection_copy_data(conn@ptr, sql, value) : 
  COPY returned error: ERROR:  duplicate key value violates unique constraint "interval_data_pkey"
DETAIL:  Key (id, dttm_utc)=(a0za000000CSdLoAAL, 2018-10-01 05:15:00+00) already exists.
CONTEXT:  COPY interval_data, line 1
5
  • 1
    Question is too broad. Please read up on tutorials/vignettes/books and make an earnest attempt. Then, come back with specific issues on your implementation. Commented Aug 12, 2019 at 14:50
  • 2
    Insert into a new (or reused but truncated) table and merge insert into the target table. Sorry but this is a pure SQL problem, not an R problem... Commented Aug 12, 2019 at 16:05
  • @RYoda Do you mean create a new temporary table and then merge that with the target table? Can you point me to a resource on how to do the "merge insert" operation you mention? Commented Aug 12, 2019 at 16:07
  • See the merge stmt doc for PostgreSQL: postgresql.org/message-id/attachment/23520/sql-merge.html Commented Aug 12, 2019 at 16:30
  • I get a syntax error when I try a MERGE statement and an answer here seems to indicate there is no MERGE statement in PostgreSQL? stackoverflow.com/questions/49368083/… Commented Aug 12, 2019 at 21:51

1 Answer 1

1

You can use my pg package that has upsert functionality, or just grab code for upsert from there: https://github.com/jangorecki/pg/blob/master/R/pg.R#L249 It is basically what others said in comments. Write data into temp table and then insert into destination table using on conflict clause.

pgSendUpsert = function(stage_name, name, conflict_by, on_conflict = "DO NOTHING", techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
    cols = pgListFields(stage_name)
    cols = setdiff(cols, c("run_id","r_timestamp")) # remove techstamp to have clean column list, as the fresh one will be used, if any
    # sql
    insert_into = sprintf("INSERT INTO %s.%s (%s)", name[1L], name[2L], paste(if(techstamp) c(cols, c("run_id","r_timestamp")) else cols, collapse=", "))
    select = sprintf("SELECT %s", paste(cols, collapse=", "))
    if(techstamp) select = sprintf("%s, %s::INTEGER run_id, '%s'::TIMESTAMPTZ r_timestamp", select, get_run_id(), format(Sys.time(), "%Y-%m-%d %H:%M:%OS"))
    from = sprintf("FROM %s.%s", stage_name[1L], stage_name[2L])
    if(!missing(conflict_by)) on_conflict = paste(paste0("(",paste(conflict_by, collapse=", "),")"), on_conflict)
    on_conflict = paste("ON CONFLICT",on_conflict)
    sql = paste0(paste(insert_into, select, from, on_conflict), ";")
    pgSendQuery(sql, conn = conn, .log = .log)
}

#' @rdname pg
pgUpsertTable = function(name, value, conflict_by, on_conflict = "DO NOTHING", stage_name, techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
    name = schema_table(name)
    if(!missing(stage_name)){
        stage_name = schema_table(stage_name)
        drop_stage = FALSE
    } else {
        stage_name = name
        stage_name[2L] = paste("tmp", stage_name[2L], sep="_")
        drop_stage = TRUE
    }
    if(pgExistsTable(stage_name)) pgTruncateTable(name = stage_name, conn = conn, .log = .log)
    pgWriteTable(name = stage_name, value = value, techstamp = techstamp, conn = conn, .log = .log)
    on.exit(if(drop_stage) pgDropTable(stage_name, conn = conn, .log = .log))
    pgSendUpsert(stage_name = stage_name, name = name, conflict_by = conflict_by, on_conflict = on_conflict, techstamp = techstamp, conn = conn, .log = .log)
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.