Intro to Apache Beam with Google Cloud

Josh Yeo
4 min readMay 15, 2021


Reading JSON off GCP PubSub and writing to BigQuery via Apache Beam

1. What is the velocity of your data?

Data processing based on frequency it needs to be updated:

  • Batch processing (few days to hourly)
  • Micro-batch (under an hour, to a few seconds)
  • Streaming (sub-second latency is crucial)

Some common use cases for data streaming are:

  • Grid computing & Numerical Simulations
  • Batch ETL & ML Pre-processing
  • Streaming ETL
  • IOT & Real Time Anomaly Detection
  • Real Time Mobile Game Session Personalisation

(Source Iyer and Onofré 2018.)

2. A first glance at Apache Beam

a) Batch + strEAM = BEAM

  • Treats batch as stream.
  • Contributed from Google CloudDataflow SDKs.
  • An Open Source model that provides a portable unified programming layer and SDKs to define and execute pipelines.
  • To get started, create a driver program using a Beam SDK to run your pipeline.

b) What is a “Pipeline”?

  • It represents a Directed Acyclic Graph (linear sequence) of steps and can have multiple inputs and outputs.
  • A Pipeline creates data sets as PCollections

c) What is a “PCollection”?

  • It represents a distributed data set that your Beam pipeline operates on.
  • Bounded dataset = fixed source eg. text file
  • Unbounded dataset = continuously updating source eg. messaging

d) What is a “PipelineRunner”?

  • It translates the data processing pipeline into the API compatible with the backend of the user’s choice.
  • When you run your Beam driver program, the Pipeline Runner that you designate constructs a workflow graph of your pipeline based on the PCollection objects and transforms. The graph is then executed, becoming an asynchronous “job” (or equivalent) on that back-end. For more details see the capability matrix
  • Distributed processing back-ends include: (Python, Go) Direct Runner, Google Cloud Dataflow, Apache Flink, Apache Spark, Apache Nemo, Apache Samza, Hazelcast Jet, Twister2, IBM Streams.

3. How do I input data to my Beam Pipeline?

  • I/O connectors: File based, filesystem, messaging, database (Full list here)
  • Parsing JSON in Python from an input stream: Decode the given byte stream in the given context
with beam.Pipeline(options=pipeline_options) as p:
pubsub_data = (
| inputs_pattern).with_output_types(bytes)
| beam.Map(lambda x: x.decode('utf-8'))

4. How do I transform my data?

  • Map: Accepts a function that returns a single element for every input element in the PCollection.
| 'Convert string to json' >> beam.Map(to_json)
  • Filter: Useful if function is just deciding to output an element or not.
| 'Filter Only Trains At Stops' >> beam.Filter(lambda x : (x['stop_id']!='0'))
  • ParDo: Similar to the “Map” phase of a Map/Shuffle/Reduce-style algorithm. It considers each element in the input PCollection, performs some processing function on that element, and emits zero, one, or multiple elements to an output PCollection.
| 'Create tuples from vehicle and delay time' >> beam.ParDo(CollectVehicleDelays())

5. How do I define windows where data will be processed?

  • A defined period of time in which elements are captured
  • beam.window.TimestampedValue: Set a timestamp value to each element.
| 'Identify Field For Use In Windowing' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['response_ts']))
  • WindowInto: Fixed (aka Tumbling), Sliding, or Session
  • Triggers: Control for when elements of a specific window are output
  • Accumulation: Since a trigger can fire multiple times, the accumulation mode determines whether the system accumulates the window panes as the trigger fires, or discards them.
  • Allowed lateness: The trigger will emit new results immediately for the late data that arrives after the watermark passes the end of the window.
| beam.WindowInto(beam.window.FixedWindows(1 * 60), 
beam.transforms.trigger.AfterProcessingTime(1 * 60))),

6. How can I group the data in my windows?

  • GroupByKey: Takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key. In our example we use a ParDo to convert our elements to tuples first.
| 'Group by the tuple key' >> beam.GroupByKey()
  • CombineValues: Combines an iterable of values in a keyed collection of elements.
| beam.CombineValues(beam.combiners.MeanCombineFn())

7. How can I write my data to BigQuery?

  • BigQuery: Requires input of a PCollection of dictionaries.
| 'Convert output to dict' >> beam.Map(transform_to_json)| 'Write to bigquery' >>

8. Putting the pieces together

Try it out:

More details on how to get an example running in the ReadMe



No responses yet