2

I am trying to write a tool to load CSV files into multiple database. While trying to search online how to use COPY command with Snowflake, I couldn't find information how to do it in Java. This is how I do with PostgreSQL

public void loadData(Message message) throws Exception {
    try (Connection connection = DriverManager.getConnection(message.getJdbcUrl(),
            message.getUser(), message.password)) {
        loadDataWithMode(loadRequest, (BaseConnection) connection);
    } catch (Throwable error){
        throw error;   
    }
}


public void loadDataWithMode(Message message, BaseConnection connection) throws Exception {
    CopyManager copyManager = new CopyManager(connection);
    String fields = message.getFields();
    final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
        try (InputStream input = fileService.load(loadRequest.getFilePath())) {
            try (InputStreamReader reader = new InputStreamReader(input, "UTF-8")) {
                copyManager.copyIn("COPY " + targetTableWithFields + " from STDIN 
            }
        }
 }

I'm not familiar with Snowflake how to do it, any help will be appreciated.

2

1 Answer 1

4
public void loadDataWithMode(Message message, Connection connection) throws Exception {
        String fields = message.getFields();
        final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
            LOG.info("about to copy data into table: " + targetTableWithFields);
            try (Statement statement = connection.createStatement()) {

            final SnowflakeConnectionV1 snowflakeConnectionV1 = (SnowflakeConnectionV1) connection;
            final File tempFile = fileSystemService.asLocalFile(message.getFilePath());
            try (Statement stmt = snowflakeConnectionV1.createStatement(); InputStream inputStream = new FileInputStream(tempFile)) {
                final String createStage = buildCreateStageStatement();
                LOG.info("Executing sql:{}", createStage);
                stmt.execute(createStage);
                LOG.info("Create stage was successfully executed");
                snowflakeConnectionV1.uploadStream("COPYIN_STAGE", "", inputStream, tempFile.getName(), false);
                LOG.info("Upload stream was successfully executed");
                stmt.execute("USE WAREHOUSE "+ message.getExportConnectionDetails().getWarehouse());
                LOG.info("Warehouse was successfully set to: "+message.getExportConnectionDetails().getWarehouse());
                final boolean purgeData = !(message.getLoadMode() == LoadMode.INCREMENTAL);
                String sql = String.format("copy into %s(%s) from @COPYIN_STAGE/%s file_format = ( type='CSV', skip_header=1) purge=" + purgeData + "   ", message.getTableName(), fields, tempFile.getName());
                LOG.info("Executing sql:{}", sql);
                stmt.execute(sql);
            }
        connection.commit();
        LOG.info("data was successfully copied " + targetTableWithFields);
    }
}

private String buildCreateStageStatement() {
    return "CREATE OR REPLACE TEMPORARY STAGE COPYIN_STAGE " + "file_format = ( type ='CSV')";
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.