Reading JSON off GCP PubSub and writing to BigQuery via Apache Beam
1. What is the velocity of your data?
Data processing based on frequency it needs to be updated:
- Batch processing (few days to hourly)
- Micro-batch (under an hour, to a few seconds)
- Streaming (sub-second latency is crucial)
Some common use cases for data streaming are:
- Grid computing & Numerical Simulations
- Batch ETL & ML Pre-processing
- Streaming ETL
- IOT & Real Time Anomaly Detection
- Real Time Mobile Game Session Personalisation
(Source Iyer and Onofré 2018.)
2. A first glance at Apache Beam
a) Batch + strEAM = BEAM
- Treats batch as stream.
- Contributed from Google CloudDataflow SDKs.
- An Open Source model that provides a portable unified programming layer and SDKs to define and execute pipelines.
- To get started, create a driver program using a Beam SDK to run your pipeline.
b) What is a “Pipeline”?
- It represents a Directed Acyclic Graph (linear sequence) of steps and can have multiple inputs and outputs.
- A Pipeline creates data sets as PCollections
c) What is a “PCollection”?
- It represents a distributed data set that your Beam pipeline operates on.
- Bounded dataset = fixed source eg. text file
- Unbounded dataset = continuously updating source eg. messaging
d) What is a “PipelineRunner”?
- It translates the data processing pipeline into the API compatible with the backend of the user’s choice.
- When you run your Beam driver program, the Pipeline Runner that you designate constructs a workflow graph of your pipeline based on the PCollection objects and transforms. The graph is then executed, becoming an asynchronous “job” (or equivalent) on that back-end. For more details see the capability matrix
- Distributed processing back-ends include: (Python, Go) Direct Runner, Google Cloud Dataflow, Apache Flink, Apache Spark, Apache Nemo, Apache Samza, Hazelcast Jet, Twister2, IBM Streams.
3. How do I input data to my Beam Pipeline?
- I/O connectors: File based, filesystem, messaging, database (Full list here)
- Parsing JSON in Python from an input stream: Decode the given byte stream in the given context
with beam.Pipeline(options=pipeline_options) as p:
pubsub_data = (
p
| beam.io.ReadFromPubSub(subscription= inputs_pattern).with_output_types(bytes)
| beam.Map(lambda x: x.decode('utf-8'))
4. How do I transform my data?
- Map: Accepts a function that returns a single element for every input element in the PCollection.
| 'Convert string to json' >> beam.Map(to_json)
- Filter: Useful if function is just deciding to output an element or not.
| 'Filter Only Trains At Stops' >> beam.Filter(lambda x : (x['stop_id']!='0'))
- ParDo: Similar to the “Map” phase of a Map/Shuffle/Reduce-style algorithm. It considers each element in the input PCollection, performs some processing function on that element, and emits zero, one, or multiple elements to an output PCollection.
| 'Create tuples from vehicle and delay time' >> beam.ParDo(CollectVehicleDelays())
5. How do I define windows where data will be processed?
- A defined period of time in which elements are captured
- beam.window.TimestampedValue: Set a timestamp value to each element.
| 'Identify Field For Use In Windowing' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['response_ts']))
- WindowInto: Fixed (aka Tumbling), Sliding, or Session
- Triggers: Control for when elements of a specific window are output
- Accumulation: Since a trigger can fire multiple times, the accumulation mode determines whether the system accumulates the window panes as the trigger fires, or discards them.
- Allowed lateness: The trigger will emit new results immediately for the late data that arrives after the watermark passes the end of the window.
| beam.WindowInto(beam.window.FixedWindows(1 * 60),
trigger=beam.transforms.trigger.Repeatedly(
beam.transforms.trigger.AfterAny(beam.transforms.trigger.AfterCount(5),
beam.transforms.trigger.AfterProcessingTime(1 * 60))),
accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING,
allowed_lateness=60)
6. How can I group the data in my windows?
- GroupByKey: Takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key. In our example we use a ParDo to convert our elements to tuples first.
| 'Group by the tuple key' >> beam.GroupByKey()
- CombineValues: Combines an iterable of values in a keyed collection of elements.
| beam.CombineValues(beam.combiners.MeanCombineFn())
7. How can I write my data to BigQuery?
- BigQuery: Requires input of a PCollection of dictionaries.
| 'Convert output to dict' >> beam.Map(transform_to_json)| 'Write to bigquery' >> beam.io.WriteToBigQuery(
outputs_prefix+table_name_veh_info,
schema=table_veh_info_schema,
custom_gcs_temp_location=temp_location,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
8. Putting the pieces together
Try it out:
More details on how to get an example running in the ReadMe