Automated insert of CSV data into Bigquery via GCS bucket + Python

i wanted to try out the automatic loading of CSV data into Bigquery, specifically using a Cloud Function that would automatically run whenever a new CSV file was uploaded into a Google Cloud Storage bucket.

it worked like a champ. here’s what i did to PoC:

  1. generate a CSV file with 1000 lines of dummy data via https://www.mockaroo.com/ with their default schema so it looks like:
$ head -3 testdata.csv

id,first_name,last_name,email,gender,ip_address

1,Andres,Berthot,[email protected],Male,104.204.241.0

2,Iris,Zwicker,[email protected],Female,61.87.224.4
  • make a test Google Cloud Storage bucket:
     $ gsutil mb gs://csvtestbucket 
  • install the necessary python bits & pieces:
    $ pip3 install google-cloud-bigquery --upgrade
  • make a Bigquery dataset:
    $ bq mk --dataset rickts-dev-project:csvtestdataset
  • make a table within that dataset to match the CSV schema:
  • $ bq mk -t csvtestdataset.csvtable \
    
    id:INTEGER,first_name:STRING,last_name:STRING,email:STRING,gender:STRING,ip_address:STRING
  • eyeball the table in the Bigquery dataset and verify it is clean and fresh:
  • now its time to write some python. here’s mine:
  • import os
    from google.cloud import bigquery
    def csv_loader(data, context):
    client = bigquery.Client()
    dataset_id = os.environ['DATASET']
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
    bigquery.SchemaField('id', 'INTEGER'),
    bigquery.SchemaField('first_name', 'STRING'),
    bigquery.SchemaField('last_name', 'STRING'),
    bigquery.SchemaField('email', 'STRING'),
    bigquery.SchemaField('gender', 'STRING'),
    bigquery.SchemaField('ip_address', 'STRING')
    ]
    job_config.skip_leading_rows = 1
    job_config.source_format = bigquery.SourceFormat.CSV
    # get the URI for uploaded CSV in GCS from 'data'
    uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
    # lets do this
    load_job = client.load_table_from_uri(
    uri,
    dataset_ref.table(os.environ['TABLE']),
    job_config=job_config)
    print('Starting job {}'.format(load_job.job_id))
    print('Function=csv_loader, Version=' + os.environ['VERSION'])
    print('File: {}'.format(data['name']))
    load_job.result() # wait for table load to complete.
    print('Job finished.')
    destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
    print('Loaded {} rows.'.format(destination_table.num_rows))
    view raw main.py hosted with ❤ by GitHub
  • because you don’t want to hardcode things like bucket/table/dataset names in code, create a yaml file that will store your deployment-specific configuration in environment variables
  •  BUCKET: csvtestbucket
    
    DATASET: csvtestdataset
    
    TABLE: csvtable
    
    VERSION: v14
  • create a requirements.txt file for the necessary imports
  • google-cloud
    
    google-cloud-bigquery
  • create a .gcloudignore file so that your yaml or CSV files will not be deployed into GCP
  • *csv
    
    *yaml
  • at this point, your folder should look something like this:
  • $ ls
    
    env.yaml  main.py  requirements.txt  testdata.csv
  • now we are ready to deploy the cloud function, we will add a trigger on the storage bucket that will fire every time a new file is added to the bucket. here we are creating a cloud function named “csv_loader”:
  • $ gcloud beta functions deploy csv_loader \
    
    --runtime=python37 \
    
    --trigger-resource=gs://csvtestbucket \
    
    --trigger-event=google.storage.object.finalize \
    
    --entry-point=csv_loader \
    
    --env-vars-file=env.yaml
  • ok, the function is deployed, yay! copy your test data CSV into the test bucket:
  • $ gsutil cp testdata.csv gs://csvtestbucket/
    
    Copying file://testdata.csv [Content-Type=text/csv]...
    
    - [1 files][ 60.4 KiB/ 60.4 KiB]
    
    Operation completed over 1 objects/60.4 KiB.  
  •  now that we have copied a CSV file in the bucket, the function should fire! check the cloud function logs:
  • $ gcloud functions logs read
    
    [ ... snipped for brevity ... ]
    
    D      csv_loader  274732139359754  2018-10-22 20:48:27.852  Function execution started
    
    I      csv_loader  274732139359754  2018-10-22 20:48:28.492  Starting job 9ca2f39c-539f-454d-aa8e-3299bc9f7287
    
    I      csv_loader  274732139359754  2018-10-22 20:48:28.492  Function=csv_loader, Version=v14
    
    I      csv_loader  274732139359754  2018-10-22 20:48:28.492  File: testdata2.csv
    
    I      csv_loader  274732139359754  2018-10-22 20:48:31.022  Job finished.
    
    I      csv_loader  274732139359754  2018-10-22 20:48:31.136  Loaded 1000 rows.
    
    D      csv_loader  274732139359754  2018-10-22 20:48:31.139  Function execution took 3288 ms, finished with status: 'ok'

    looks like the function ran as expected!

  • lets eyeball the bigquery table again, and see if the row count has changed
  • $ bq show csvtestdataset.csvtable
    
    Table rickts-dev-project:csvtestdataset.csvtable
    
    Last modified           Schema           Total Rows   Total Bytes   Expiration   Time Partitioning   Labels
    
    ----------------- ----------------------- ------------ ------------- ------------ ------------------- --------
    
    22 Oct 13:48:29   |- id: integer          1000         70950
    
    |- first_name: string
    
    |- last_name: string
    
    |- email: string
    
    |- gender: string
    
    |- ip_address: string

    great! there are now 1000 rows. looking good.

  • as a final check, lets compare the first 3 rows of the CSV
  • $ egrep '^[1,2,3],' testdata.csv
    
    1,Andres,Berthot,[email protected],Male,104.204.241.0
    
    2,Iris,Zwicker,[email protected],Female,61.87.224.4
    
    3,Aime,Gladdis,[email protected],Female,29.55.250.191

    with the first 3 rows of the bigquery table

    $ bq query 'select * from csvtestdataset.csvtable \
    
    where id IN (1,2,3)'
    
    Waiting on bqjob_r6a3239576845ac4d_000001669d987208_1 ... (0s) Current status: DONE
    
    +----+------------+-----------+---------------------------+--------+---------------+
    
    | id | first_name | last_name |           email           | gender |  ip_address   |
    
    +----+------------+-----------+---------------------------+--------+---------------+
    
    |  1 | Andres     | Berthot   | [email protected]         | Male   | 104.204.241.0 |
    
    |  2 | Iris       | Zwicker   | [email protected]         | Female | 61.87.224.4   |
    
    |  3 | Aime       | Gladdis   | [email protected] | Female | 29.55.250.191 |
    
    +----+------------+-----------+---------------------------+--------+---------------+

    and whaddyaknow, they match! w00t!

    proof of concept: complete!

    conclusion: cloud functions are pretty great.