5

I am using Apache Beam on Python and would like to ask what is the equivalent of Apache Beam Java Wait.on() on python SDK?

currently I am having problem with this code snippet below

    if len(output_pcoll) > 1:
        merged = (tuple(output_pcoll) |
                  'MergePCollections1' >> beam.Flatten())
    else:
        merged = output_pcoll[0]

    outlier_side_input = self.construct_outlier_side_input(merged)

    (merged |
     "RemoveOutlier" >>
     beam.ParDo(utils.Remove_Outliers(),
                beam.pvalue.AsDict(outlier_side_input)) |
     "WriteToCSV" >>
     beam.io.WriteToText('../../ML-DATA/{0}.{1}'.format(self.BUCKET,
                         self.OUTPUT), num_shards=1))

it seems Apache Beam does not wait until the code on self.construct_outlier_side_input finished executing and result in empty side input when executing "RemoveOutlier" in the next pipeline. In Java version you can use Wait.On() to wait for construct_outlier_side_input to finish executing, however I could not find the equivalent method in the Python SDK.

--Edit-- what i am trying to achieve is almost the same as in this link, https://rmannibucau.metawerx.net/post/apache-beam-initialization-destruction-task

8
  • Welcome to stackoverflow. I'm new to python too, what I want you to make sure of is that the issue is mainly about timing and not anything else.. What I understand is that for you to have multithreading in python you have to implement that, otherwise the code would execute synchronously; statement after the other. You should first test that if you wait x seconds before removing outlier that your code behaves as you expect it, you can use time.sleep(x), you may check this link as well: blog.miguelgrinberg.com/post/how-to-make-python-wait Commented Nov 9, 2019 at 17:41
  • Can you explain your usecase ? Are you trying to remove outliers from the merged dataset? Commented Nov 10, 2019 at 5:42
  • HasnaaIbraheem thanks, i tried that before posting it here, but still no success. @jjayadeep, yes, i am trying to remove outliers from the merged dataset, but the outliers i am trying to remove is local outliers. So from the merged dataset i construct a dictionary consists of item name as key and a list of its value as the dictionary value, then i use that dictionary as side input when removing outliers Commented Nov 10, 2019 at 11:11
  • I assume this is in a streaming pipeline? If yes what is the window you are applying to the SideInput data? Commented Nov 12, 2019 at 1:51
  • @RezaRokni no it's not, this is batch pipeline Commented Nov 12, 2019 at 14:35

1 Answer 1

2

You can use additional outputs feature of Beam to do this.

A sample code snippet is as follows

results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
           .with_outputs('above_cutoff_lengths', 'marked strings',
                         main='below_cutoff_strings'))
below = results.below_cutoff_strings
above = results.above_cutoff_lengths
marked = results['marked strings']  # indexing works as well

Once you run the above code snippet you get multiple PCollections such as below, above and marked. You can then use side inputs to further filter or join the results

Hope that helps.

Update

Based on the comments I would like to mention that Apache Beam has capabilities to do stateful processing with the help of ValueState and BagState. If the requirement is to read through a PCollection and then make decisions based on if a prior value is present or not then such requirements can be handled through BagState as shown below:-

def process(self,
              element,
              timestamp=beam.DoFn.TimestampParam,
              window=beam.DoFn.WindowParam,
              buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
              buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2),
              watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)):

    # Do you processing here
    key, value = element
    # Read all the data from buffer1
    all_values_in_buffer_1 = [x for x in buffer_1.read()]

    if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1):
        # clear the buffer data if required conditions are met.
        buffer_1.clear()

    # add the value to buffer 2
    buffer_2.add(value)

    if StatefulDoFn._all_condition_met():
      # Clear the timer if certain condition met and you don't want to trigger
      # the callback method.
      watermark_timer.clear()

    yield element

  @on_timer(WATERMARK_TIMER)
  def on_expiry_1(self,
                  timestamp=beam.DoFn.TimestampParam,
                  window=beam.DoFn.WindowParam,
                  key=beam.DoFn.KeyParam,
                  buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
                  buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)):
    # Window and key parameters are really useful especially for debugging issues.
    yield 'expired1'
Sign up to request clarification or add additional context in comments.

4 Comments

Unfortunately this solustion doesn't work for me, since what is passed to my Remove_Outliers() will be every element in the pcollection, and to construct a dictionary like this { item: [price1, price2,...,priceN] } i will need to have same state saved e.g. whether a particular item exists in the dict or not, if exists append the list. And saving state in apache beam is not supported.
@ruka - Beam provides Stateful processing semantics. I have updated my answer with the details that you can refer to. If it helped please do accept the answer.
sorry, i forgot to mention my pipeline is batch pipeline. Thank you for the detailed explanation tho
You can use the BagState and ValueState for the batch pipeline as well. You can find more details here - beam.apache.org/blog/2017/02/13/stateful-processing.html. If this answer was helpful please do accept my answer.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.