Photo by Azamat E on Unsplash

An Apache Beam Hack — Streaming into Sharded BQ Tables

Ilan Uzan
4 min readJun 19, 2021

--

For my previous posts on Apache Beam:

Apache Beam Batch Pipelines
Apache Beam Streaming Pipelines

While these 2 posts are written using the Python SDK for brevity, in my company we actually use the Java SDK for performance reasons — in checks I’ve made, the Java SDK tends to be 2–3 times faster then the Python one for our usecase.

The Initial Problem — Sharded BQ Tables

Lately I came across an interesting usecase that was needed in our Streaming Pipelines. Let’s describe the need here first:

  1. In order to improve query performance on our BigQuery tables, we had to change the partitioning interval of these tables (tables that are loaded using Apache Beam Streaming Pipelines with the Google Cloud Dataflow Runner) from Daily to Hourly.
  2. After the change, it took 166 days to hit the BQ hard limit of 4000 partitions per table.
  3. There were 2 ways to approach the problem — either having some process merge old hourly partitiong to an historic, daily partitioned table, or having our tables sharded — say, monthly sharded tables with hourly partitioning (for those of you who don’t know what BQ Sharding is, see here).
  4. The chosen solution was the BQ Sharding one, mainly because we didn’t want to add another component to maintain, and because it was less trivial to implement the first solution without incurring additional BQ costs.
  5. That required us to change our Streaming Pipelines to be able to load into a table with a dynamic name — since we want monthly sharding, on Janurary 31st the pipeline should load data to the January table, and on February 1st to the February table.

That’s the problem so far — now, I would have expected some kind of built in solution in the Apache Beam framework for this issue, but apparently there is none. However, I found this solution (taken from here):

While that works, integrating that solution to our pipelines wasn’t that trivial unfortunately.

The Second Problem — Integrating with Runtime Options

You see, the problem with the piece of code above is that the sharded table desired prefix is a constant, while I need to get it as a runtime option. We can deploy Apache Beam pipelines directly (as I demonstrated in previous posts) or we can build a template of a pipline, and deploy it multiple times with different runtime options . In our case, we have a about a dozen pipelines of the same template ingesting data for different tables.

Back to Runtime Options — when we want to provide runtime options to a pipeline, we have to define the options with ValueProvider, as described here. If we try the naive solution of using a table prefix runtime option and use it’s value in the Serializable Function in the code above, we’d get an error saying “PipelineOptions is not serializable” — apparently, we can’t use runtime options inside serializable functions.

What can we do then? Apparently, we can use runtime options in DoFNs, and therefore we came up with the following solution:

  1. We get the option value in a separate DoFN and pass that value with the element:
  • Passing the runtime options to the DoFN:
  • Sticking all of the runtime options in the TableRow object, so later they can be used to specify the table creation definitions in the Serializable Function:

2. We get the value we pushed to the row in the Serialzable Function, construct the TableDefinition object, and remove it from the row actually being loaded into BigQuery with withFormatFunction:

I re-deployed the pipeline and it worked — AKA, the table for that month was created successfully. But then…

The Third Problem — Table does not Exist

The pipeline ran just fine for 2–3 weeks — until the month has changed. We started getting an error saying that the table does not exist, which is weird considering that the pipeline specifies CREATE_IF_NEEDED CreateDisposition.

After some digging around, we came across this bug — evidently, CREATE_IF_NEEDED doesn’t work very well with Dynamic Destinations or with Serializable Functions.

So what can we do? Well, when there’s a bug, someone probably have already implemented a workaround for it:

What’s being done here is simply: instead of relying on the create disposition, we create the table in a different step. Now all that was left to do was integrating that solution with our own pipeline, until finally I came up with the following, hopefully last piece of code:

  1. The new DoFN — handling the BQ table creation, setting for it the relevant schema, partitioning, clustering and table name:

2. The load to BigQuery step — by that step the table already exists, and all we need to know is it’s name. We extract it in the to option, and we remove it in the withFormatFunction:

Conclusion

It’s important to get familiar with the less pleasant parts of the framework: missing features, bugs and the hacks we need to do to get the job done. Hope you’ve enjoyed the post, and leave a comment if you have any questions.

--

--