I created a streaming Dataflow pipeline in Python and just want to clarify if my below code is doing what I expected. This is what I intend to do :
- Consume from Pub/Sub continuously
- Batch load into BigQuery every 1 minute instead of streaming to bring down the cost
This is the code snippet in Python
options = PipelineOptions(
subnetwork=SUBNETWORK,
service_account_email=SERVICE_ACCOUNT_EMAIL,
use_public_ips=False,
streaming=True,
project=project,
region=REGION,
staging_location=STAGING_LOCATION,
temp_location=TEMP_LOCATION,
job_name=f"pub-sub-to-big-query-xxx-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
)
p = beam.Pipeline(DataflowRunner(), options=options)
pubsub = (
p
| "Read Topic" >> ReadFromPubSub(topic=INPUT_TOPIC)
| "To Dict" >> Map(json.loads)
| "Write To BigQuery" >> WriteToBigQuery(table=TABLE, schema=schema, method='FILE_LOADS',
triggering_frequency=60, max_files_per_bundle=1,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND))
May I know if the above code is doing what I intend it to do? Stream from Pub/Sub and at every 60 seconds, it will batch insert into BigQuery. I purposely set the max_files_per_bundle to 1 to prevent more than 1 shard being created so that there is only 1 file being loaded every minute but not sure if I am doing it right. There is withNumFileShards option for Java version but I could not find the equivalent in Python. I refer to the documentation below: https://beam.apache.org/releases/pydoc/2.31.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery
Just curious if I should use windowing to achieve what I intend to do?
options = PipelineOptions(
subnetwork=SUBNETWORK,
service_account_email=SERVICE_ACCOUNT_EMAIL,
use_public_ips=False,
streaming=True,
project=project,
region=REGION,
staging_location=STAGING_LOCATION,
temp_location=TEMP_LOCATION,
job_name=f"pub-sub-to-big-query-xxx-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
)
p = beam.Pipeline(DataflowRunner(), options=options)
pubsub = (
p
| "Read Topic" >> ReadFromPubSub(topic=INPUT_TOPIC)
| "To Dict" >> Map(json.loads)
| 'Window' >> beam.WindowInto(window.FixedWindows(60), trigger=AfterProcessingTime(60),
accumulation_mode=AccumulationMode.DISCARDING)
| "Write To BigQuery" >> WriteToBigQuery(table=TABLE, schema=schema, method='FILE_LOADS',
triggering_frequency=60, max_files_per_bundle=1,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND))
The first method is good enough without the windowing in second method? I am using the first method now but I am not sure if every minute, it's doing multiple load from multiple files or it actually merge all the pub/sub message into 1 and do a single bulk load?
Thank you!
triggering_frequency=60you will surpass the maximum amount of load jobs per table per day , see Quotas. If you want to use batch load you need to exemplify more how your data is streamed to Pub/Sub. Also, it is good to mention that GCS to BigQuery is free of charge.