0

I created a streaming Dataflow pipeline in Python and just want to clarify if my below code is doing what I expected. This is what I intend to do :

  1. Consume from Pub/Sub continuously
  2. Batch load into BigQuery every 1 minute instead of streaming to bring down the cost

This is the code snippet in Python

options = PipelineOptions(
    subnetwork=SUBNETWORK,
    service_account_email=SERVICE_ACCOUNT_EMAIL,
    use_public_ips=False,
    streaming=True,
    project=project,
    region=REGION,
    staging_location=STAGING_LOCATION,
    temp_location=TEMP_LOCATION,
    job_name=f"pub-sub-to-big-query-xxx-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
)

p = beam.Pipeline(DataflowRunner(), options=options)


pubsub = (
        p
        | "Read Topic" >> ReadFromPubSub(topic=INPUT_TOPIC)
        | "To Dict" >> Map(json.loads)
        | "Write To BigQuery" >> WriteToBigQuery(table=TABLE, schema=schema, method='FILE_LOADS',
                                                 triggering_frequency=60, max_files_per_bundle=1,
                                                 create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                                 write_disposition=BigQueryDisposition.WRITE_APPEND))

May I know if the above code is doing what I intend it to do? Stream from Pub/Sub and at every 60 seconds, it will batch insert into BigQuery. I purposely set the max_files_per_bundle to 1 to prevent more than 1 shard being created so that there is only 1 file being loaded every minute but not sure if I am doing it right. There is withNumFileShards option for Java version but I could not find the equivalent in Python. I refer to the documentation below: https://beam.apache.org/releases/pydoc/2.31.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery

https://cloud.google.com/blog/products/data-analytics/how-to-efficiently-process-both-real-time-and-aggregate-data-with-dataflow

Just curious if I should use windowing to achieve what I intend to do?

options = PipelineOptions(
    subnetwork=SUBNETWORK,
    service_account_email=SERVICE_ACCOUNT_EMAIL,
    use_public_ips=False,
    streaming=True,
    project=project,
    region=REGION,
    staging_location=STAGING_LOCATION,
    temp_location=TEMP_LOCATION,
    job_name=f"pub-sub-to-big-query-xxx-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
)

p = beam.Pipeline(DataflowRunner(), options=options)

pubsub = (
        p
        | "Read Topic" >> ReadFromPubSub(topic=INPUT_TOPIC)
        | "To Dict" >> Map(json.loads)
        | 'Window' >> beam.WindowInto(window.FixedWindows(60), trigger=AfterProcessingTime(60),
                                      accumulation_mode=AccumulationMode.DISCARDING)
        | "Write To BigQuery" >> WriteToBigQuery(table=TABLE, schema=schema, method='FILE_LOADS',
                                                 triggering_frequency=60, max_files_per_bundle=1,
                                                 create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                                 write_disposition=BigQueryDisposition.WRITE_APPEND))

The first method is good enough without the windowing in second method? I am using the first method now but I am not sure if every minute, it's doing multiple load from multiple files or it actually merge all the pub/sub message into 1 and do a single bulk load?

Thank you!

9
  • Windows are used when you need to perform some aggregation in your data, as described here. In addition, if you are using a streaming pipeline you should stream data to BigQuery because when setting triggering_frequency=60 you will surpass the maximum amount of load jobs per table per day , see Quotas. If you want to use batch load you need to exemplify more how your data is streamed to Pub/Sub. Also, it is good to mention that GCS to BigQuery is free of charge. Commented Jul 29, 2021 at 13:15
  • Lastly, could you tell me if you ran your pipeline and checked the output in BigQuery? Commented Jul 29, 2021 at 13:15
  • Hello @AlexandreMoraes , the daily limit per table is 1500, I am loading it every minute, so will be 60*24 = 1440 which is still below the 1500 per day. Actually I do not need to do any aggregation, it's more on I want to merge all the files every minute. My script is sending messages in json to Pub/Sub after it parse every file in GCS. Theoretically, I can send my output to GCS but it would be a bit too slow to kick start dataflow job every minute or so. Unless I create another script that send message to Pub/Sub every minute which trigger the batch loading in the streaming dataflow job. Commented Jul 30, 2021 at 1:10
  • For data with low number of Pub/Sub message every minute, my pipeline is working fine without the windowing method. But when there is a lot pub/sub messages every minute, I keep encountering this error. My table is partitioned by a date column. Error Result: <ErrorProto message: 'Failed to copy Non partitioned table to Column partitioned table: not supported.' reason: 'invalid'> [while running 'Write To BigQuery/BigQueryBatchFileLoads/ParDo(TriggerCopyJobs)/ParDo(TriggerCopyJobs)-ptransform-46459'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:644 Commented Jul 30, 2021 at 1:12
  • I suppose all the files are not actually being merged and loaded into BigQuery every minute based on my current implementation. Not sure if that is because Python version of dataflow doesn't have the functionality or there is something that I miss out. Appreciate your advice and help! Commented Jul 30, 2021 at 1:13

1 Answer 1

1

Not the python solution, but I resort to Java version in the end

public static PTransform<PCollection<String>, PCollection<TableRow>> jsonToTableRow() {
    return new JsonToTableRow();
}

private static class JsonToTableRow
        extends PTransform<PCollection<String>, PCollection<TableRow>> {

    @Override
    public PCollection<TableRow> expand(PCollection<String> stringPCollection) {
        return stringPCollection.apply("JsonToTableRow", MapElements.via(
                new SimpleFunction<String, TableRow>() {
                    @Override
                    public TableRow apply(String json) {
                        try {
                            InputStream inputStream =
                                    new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
                            return TableRowJsonCoder.of().decode(inputStream, Context.OUTER);
                        } catch (IOException e) {
                            throw new RuntimeException("Unable to parse input", e);
                        }
                    }
                }));
    }
}


public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);
    options.setDiskSizeGb(10);

    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply("Read from PubSub", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
            .apply(jsonToTableRow())
            .apply("WriteToBigQuery", BigQueryIO.writeTableRows().to(options.getOutputTable())
                    .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                    .withTriggeringFrequency(Duration.standardMinutes(1))
                    .withNumFileShards(1)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    pipeline.run();
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.