Intro to Apache Beam with Google Cloud

Josh Yeo
4 min readMay 15, 2021

--

Reading JSON off GCP PubSub and writing to BigQuery via Apache Beam

1. What is the velocity of your data?

Data processing based on frequency it needs to be updated:

  • Batch processing (few days to hourly)
  • Micro-batch (under an hour, to a few seconds)
  • Streaming (sub-second latency is crucial)

Some common use cases for data streaming are:

  • Grid computing & Numerical Simulations
  • Batch ETL & ML Pre-processing
  • Streaming ETL
  • IOT & Real Time Anomaly Detection
  • Real Time Mobile Game Session Personalisation

(Source Iyer and Onofré 2018.)

2. A first glance at Apache Beam

a) Batch + strEAM = BEAM

  • Treats batch as stream.
  • Contributed from Google CloudDataflow SDKs.
  • An Open Source model that provides a portable unified programming layer and SDKs to define and execute pipelines.
  • To get started, create a driver program using a Beam SDK to run your pipeline.

b) What is a “Pipeline”?

  • It represents a Directed Acyclic Graph (linear sequence) of steps and can have multiple inputs and outputs.
  • A Pipeline creates data sets as PCollections

c) What is a “PCollection”?

  • It represents a distributed data set that your Beam pipeline operates on.
  • Bounded dataset = fixed source eg. text file
  • Unbounded dataset = continuously updating source eg. messaging

d) What is a “PipelineRunner”?

  • It translates the data processing pipeline into the API compatible with the backend of the user’s choice.
  • When you run your Beam driver program, the Pipeline Runner that you designate constructs a workflow graph of your pipeline based on the PCollection objects and transforms. The graph is then executed, becoming an asynchronous “job” (or equivalent) on that back-end. For more details see the capability matrix
  • Distributed processing back-ends include: (Python, Go) Direct Runner, Google Cloud Dataflow, Apache Flink, Apache Spark, Apache Nemo, Apache Samza, Hazelcast Jet, Twister2, IBM Streams.

3. How do I input data to my Beam Pipeline?

  • I/O connectors: File based, filesystem, messaging, database (Full list here)
  • Parsing JSON in Python from an input stream: Decode the given byte stream in the given context
with beam.Pipeline(options=pipeline_options) as p:
pubsub_data = (
p
| beam.io.ReadFromPubSub(subscription= inputs_pattern).with_output_types(bytes)
| beam.Map(lambda x: x.decode('utf-8'))

4. How do I transform my data?

  • Map: Accepts a function that returns a single element for every input element in the PCollection.
| 'Convert string to json' >> beam.Map(to_json)
  • Filter: Useful if function is just deciding to output an element or not.
| 'Filter Only Trains At Stops' >> beam.Filter(lambda x : (x['stop_id']!='0'))
  • ParDo: Similar to the “Map” phase of a Map/Shuffle/Reduce-style algorithm. It considers each element in the input PCollection, performs some processing function on that element, and emits zero, one, or multiple elements to an output PCollection.
| 'Create tuples from vehicle and delay time' >> beam.ParDo(CollectVehicleDelays())

5. How do I define windows where data will be processed?

  • A defined period of time in which elements are captured
  • beam.window.TimestampedValue: Set a timestamp value to each element.
| 'Identify Field For Use In Windowing' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['response_ts']))
  • WindowInto: Fixed (aka Tumbling), Sliding, or Session
  • Triggers: Control for when elements of a specific window are output
  • Accumulation: Since a trigger can fire multiple times, the accumulation mode determines whether the system accumulates the window panes as the trigger fires, or discards them.
  • Allowed lateness: The trigger will emit new results immediately for the late data that arrives after the watermark passes the end of the window.
| beam.WindowInto(beam.window.FixedWindows(1 * 60), 
trigger=beam.transforms.trigger.Repeatedly(
beam.transforms.trigger.AfterAny(beam.transforms.trigger.AfterCount(5),
beam.transforms.trigger.AfterProcessingTime(1 * 60))),
accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING,
allowed_lateness=60)

6. How can I group the data in my windows?

  • GroupByKey: Takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key. In our example we use a ParDo to convert our elements to tuples first.
| 'Group by the tuple key' >> beam.GroupByKey()
  • CombineValues: Combines an iterable of values in a keyed collection of elements.
| beam.CombineValues(beam.combiners.MeanCombineFn())

7. How can I write my data to BigQuery?

  • BigQuery: Requires input of a PCollection of dictionaries.
| 'Convert output to dict' >> beam.Map(transform_to_json)| 'Write to bigquery' >> beam.io.WriteToBigQuery(
outputs_prefix+table_name_veh_info,
schema=table_veh_info_schema,
custom_gcs_temp_location=temp_location,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)

8. Putting the pieces together

Try it out:

More details on how to get an example running in the ReadMe

https://github.com/eqilabs/MetroBeamExample

--

--