BigQuery to Cloud Storage extract using DataFlow

Suds Kumar
2 min readSep 27, 2020

BigQuery provides an excellent CLI — “bq extract” and an API as well as Client Library in many languages for achieving the same. In exporting table data documentation it says — You can use a service such as Dataflow to read data from BigQuery instead of manually exporting it. This post looks at the Dataflow way to extract data out of BigQuery. This is useful in situations where “bq extract” doesn’t meet requirements and you really need a programmatic way to extract and manipulate into files.

As an example, I have used the Chicago Taxi Trips public dataset and the following query:

SELECT trip_start_timestamp, trip_end_timestamp, trip_miles, fare
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
LIMIT 100

We now read the query results with:

bq_data = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query=query, use_standard_sql=True))

bq_data now contains key-value pairs:

{u'trip_start_timestamp': u'2013-04-03 22:30:00 UTC', u'trip_end_timestamp': u'2013-04-03 22:39:00 UTC', u'trip_miles': 3.8, u'fare': 10.45}
{u'trip_start_timestamp': u'2014-10-09 23:30:00 UTC', u'trip_end_timestamp': u'2014-10-09 23:45:00 UTC', u'trip_miles': 1.4, u'fare': 6.05}
{u'trip_start_timestamp': u'2014-11-23 20:30:00 UTC', u'trip_end_timestamp': u'2014-11-23 20:45:00 UTC', u'trip_miles': 4.2, u'fare': 12.25}

We can apply a beam.Map function to yield only values:

bq_values = bq_data | 'read values' >> beam.Map(lambda x: x.values())

Excerpt of bq_values:

[u'2013-04-03 22:30:00 UTC', u'2013-04-03 22:39:00 UTC', 3.8, 10.45]
[u'2014-10-09 23:30:00 UTC', u'2014-10-09 23:45:00 UTC', 1.4, 6.05]
[u'2014-11-23 20:30:00 UTC', u'2014-11-23 20:45:00 UTC', 4.2, 12.25]

And finally, map again to have all column values separated by commas instead of a list (take into account that you would need to escape double quotes if they can appear within a field):

bq_csv = bq_values | 'CSV format' >> beam.Map(
lambda row: ', '.join(['"'+ str(column) +'"' for column in row]))

Now we write the results to GCS with the suffix and headers:

bq_csv | 'Write_to_GCS' >> beam.io.WriteToText(
'gs://{0}/results/output'.format(BUCKET), file_name_suffix='.csv', header='trip_start_timestamp, trip_end_timestamp, trip_miles, fare')

Written results:

$ gsutil cat gs://$BUCKET/results/output-00000-of-00001.csv
trip_start_timestamp, trip_end_timestamp, trip_miles, fare
"2013-04-03 22:30:00 UTC", "2013-04-03 22:39:00 UTC", "3.8", "10.45"
"2014-10-09 23:30:00 UTC", "2014-10-09 23:45:00 UTC", "1.4", "6.05"
"2014-11-23 20:30:00 UTC", "2014-11-23 20:45:00 UTC", "4.2", "12.25"

--

--