1

I am trying to process data with 60 minutes session interval using Apache Beam Python SDK. But the actual session interval was inaccurate such as 3:00:00 or 1:01:00 or 1:50:00 when I run my application.

Would you help me find a solution to fix this issue and process data with 60 minutes session?

I built my pipeline as bellow.

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | "Read" >> ReadFromText(known_args.input, skip_header_lines=1)
            | "Convert" >> ParDo(Convert())
            | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x, get_timestamp_from_element(x).timestamp()))
            | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x))
            | "Apply Session Window" >> WindowInto(window.Sessions(known_args.session_interval))
            | "Group" >> GroupByKey()
            | "Write To CSV" >> ParDo(WriteToCSV(known_args.output))
        )
        result = pipeline.run()
        result.wait_until_finish()

session_interval (60 minutes) is provided as bellow.

    parser.add_argument(
        "--session_interval",
        help="Interval of each session",
        default=60*60) # 60 mins

WriteToCSV function process data per session. I logged the session duration but it was not accurate.

class WriteToCSV(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, element, window=DoFn.WindowParam):
        window_start = window.start.to_utc_datetime()
        window_end = window.end.to_utc_datetime()
        duration = window_end - window_start
        logging.info(">>> new %s record(s) in %s session (start %s end %s)", len(click_records), duration, window_start, window_end)
        ....

Then I got this log messages when I run this application locally with DirectRunner.

new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end 2018-10-19 05:00:00)
new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end 2018-10-19 03:03:00)
new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end 2018-10-19 05:00:00)

I also deployed the pipeline to Dataflow then got the same result.

new 2 record(s) in 1:50:00 session (start 2018-10-19 11:10:00 end 2018-10-19 13:00:00)
new 2 record(s) in 1:01:00 session (start 2018-10-19 10:02:00 end 2018-10-19 11:03:00)
new 5 record(s) in 3:00:00 session (start 2018-10-19 10:00:00 end 2018-10-19 13:00:00)

1 Answer 1

1

In your beam pipeline, the variable ``known_args.session_intervalinwindow.Sessions` defined the gap duration i.e. the duration in which if no further events come for a specific key the window is closed. Each session can be of different start and end duration based on the number of events that are processed by the pipeline for a given key. This is explained pictorially here

For example

Key 1 - 10:00 AM ----|
Key 1 - 10:45 AM     |
Key 1 - 11:30 AM     |====> One Session Window for Key 1 of Duration 4hours 30 minutes
Key 1 - 12:15 PM     |
Key 1 - 01:00 PM ----|
Key 1 - 02:30 PM =========> Start of new session window for Key 1

Key 2 - 10:00 AM-----|
Key 2 - 10:30 AM     |====> One Session window for key 2 of Duration 1:00 hour
Key 2 - 11:00 PM-----|
Key 2 - 12:30 PM =========> Start of new session window for Key 2

If you are interested in grouping and processing events every 60 minutes then you need to use FixedWindows.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.