0

I have a pipeline that drops and re-creates several Snowflake tables every day (effectively a full refresh using CREATE OR REPLACE TABLE). I want to capture daily deltas (inserts/updates/deletes) for downstream incremental consumers, without hand-rolling complex MERGE logic for hundreds of tables.

I know I can compute diffs with MINUS/EXCEPT, MERGE, etc., but I’m trying to find a Snowflake-native, lighter-touch approach that scales across many tables.

What I’ve considered (learned so far) :

Streams (stops working as underlying table is dropped/re-created) Time Travel (requires MINUS) Table Clone (requires MINUS)

Appreciate your help, thank you!

2
  • 1
    Stop dropping the tables. Put streams on them. Write a script to dynamically generate the merge statements for all the tables you want to process Commented Oct 21 at 8:18
  • This is going to be difficult to achieve in snowflake if the tables are recreated every day. Your best option is to alter the pipeline so that it either updates existing tables or possibly creates a new table each day e.g. mytable_20251029 from which you could then derive the deltas. Alternatively you could clone the existing table each day before the pipeline runs (or get the pipeline to do this first) to give you a "yesterday" version of the table to compare with. As you've already noted streams won't work on tables that get recreated and neither will time travel. Commented Oct 28 at 21:43

2 Answers 2

1

One option is to use Snowflake Dynamic table that helps to track the history i.e change data capture.

To identify what has changed, you can create stream on top of the dynamic table or you can create SCD type 1 Dynamic table on top of SCD 2 dynamic table like explained in this article

Sign up to request clarification or add additional context in comments.

Comments

0

you’re trying to use Snowflake’s native tools to track daily changes. Since you already know why Streams/Time Travel/Clones don’t fit your “drop & recreate” pattern, here’s another approach.

Below is a stored procedure (works in practice) that replaces your CREATE OR REPLACE TABLE step, computes daily deltas (INSERT/UPDATE/DELETE), and writes a summary to a global changelog table.

Important: to identify updates, the target table must have a primary key.
If you don’t need update tracking, you can drop that requirement and approximate :
updated = (total_current_rows - total_previous_rows) - (inserted - deleted)

NOTICE- this procedure will fail without a primary key.


--  ## DEFINE SCHEMA OR USE FULL PATHS  ##
-- use schema <DATABASE_NAME>.<SCHEMA_NAME>;

CREATE OR REPLACE PROCEDURE replace_table(table_path STRING, sql_code STRING)
RETURNS  OBJECT
LANGUAGE SQL
COMMENT = 'This procedure replaces a table and track the changelog, according to the primary key'
as $$
BEGIN
--     ##   GET THE PRIMARY KEY FROM THE TARGET TABLE   ##
      LET show_table string := 'SHOW PRIMARY KEYS IN TABLE '|| table_path;
      EXECUTE IMMEDIATE show_table;
      LET primary_key string :=
        (SELECT listagg(DISTINCT $5,',') WITHIN GROUP ( ORDER BY $5) AS PK
        FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))
        );

--    ##    BUILD CREATE CODE FOR THE REPLACE, AND TEMPORARY TABLES (NEW AND OLD)  ##
      LET code_save_old_keys STRING := 'CREATE OR REPLACE TEMPORARY TABLE old_keys_table  AS SELECT ' || primary_key || ' FROM ' || :table_path;
      LET code_replace_new STRING := 'CREATE OR REPLACE TABLE ' || :table_path || ' AS ' || :sql_code;
      LET code_save_new_keys STRING := 'CREATE OR REPLACE TEMPORARY TABLE new_keys_table  AS SELECT ' || primary_key || ' FROM ' || :table_path;

--    ##    EXECUTE THE SQL CODE  ##
      EXECUTE IMMEDIATE code_save_old_keys;
      EXECUTE IMMEDIATE code_replace_new;
      EXECUTE IMMEDIATE code_save_new_keys;

--    ##    RESTORE THE PRIMARY KEY IN THE TARGET TABLE AFTER REPLACE   ##
      LET code_restore_primary_key string:= 'ALTER TABLE ' || :table_path ||' ADD CONSTRAINT pk_' || replace(:table_path,'.','_') || ' PRIMARY KEY (' || :primary_key ||')';
      EXECUTE IMMEDIATE code_restore_primary_key;

--    ##    COUNT THE CHANGES OF EACH KEY ITEM  ##
      CREATE OR REPLACE TEMPORARY TABLE raw_changes AS
          SELECT * EXCLUDE (change)
                ,CASE sum(change)
                    WHEN -1 THEN 'Deleted'
                    WHEN  0 THEN 'Updated'
                    WHEN  1 THEN 'Inserted'
                END as change_type
          FROM (select *,1 as change from new_keys_table
                UNION ALL
                select *,-1 as change from old_keys_table) U
          GROUP BY ALL;

--    ##    AGGREGATE THE CHANGES BY TYPE (INSERT, UPDATE, DELETE) AND STORE IN AN OBJECT VARIABLE  ##
      LET result_object object:=
          (SELECT distinct object_agg(change_type, count(1))over()
          FROM raw_changes
          GROUP BY change_type);

--    ##    IF NOT EXISTS, CREATE A GLOBAL CHANGELOG SUMMARY TABLE (NOT TEMPORARY) TO TRACK ALL CHANGES BY TABLE AND TIME  ##
      CREATE TABLE IF NOT EXISTS changelog (
          table_path string
          ,change_time timestamp
          ,inserted_rows int
          ,updated_rows int
          ,deleted_rows int
      );

--    ##    INSERT THE CHANGES OF THE CURRENT ACTION TO THE CHANGELOG TABLE ##
      let _insert int:= result_object:Inserted;
      let _updated int:= result_object:Updated;
      let _deleted int:= result_object:Deleted;
      INSERT INTO changelog (table_path,change_time,inserted_rows,updated_rows,deleted_rows)
            VALUES  (:table_path,current_timestamp,:_insert,:_updated,:_deleted);

      return result_object;     -- RETURN THE OBJECT OF THE CURRENT CHANGES

END
$$;

I hope you will find it useful

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.