PoC: Automated insert of CSV data into Bigquery via GCS bucket + Python

i wanted to try out the automatic loading of CSV data into Bigquery, specifically using a Cloud Function that would automatically run whenever a new CSV file was uploaded into a Google Cloud Storage bucket.

it worked like a champ. here’s what i did to PoC:

  1. generate a CSV file with 1000 lines of dummy data via https://www.mockaroo.com/ with their default schema so it looks like:
    $ head -3 testdata.csv
    id,first_name,last_name,email,gender,ip_address
    1,Andres,Berthot,aberthot0@pbs.org,Male,104.204.241.0
    2,Iris,Zwicker,izwicker1@icq.com,Female,61.87.224.4
  2. make a test Google Cloud Storage bucket:
     $ gsutil mb gs://csvtestbucket 
  3. install the necessary python bits & pieces:
    $ pip3 install google-cloud-bigquery --upgrade
  4. make a Bigquery dataset:
    $ bq mk --dataset rickts-dev-project:csvtestdataset
  5. make a table within that dataset to match the CSV schema:
    $ bq mk -t csvtestdataset.csvtable \
    id:INTEGER,first_name:STRING,last_name:STRING,email:STRING,gender:STRING,ip_address:STRING
  6. eyeball the table in the Bigquery dataset and verify it is clean and fresh:
     $ bq show csvtestdataset.csvtable
    Table rickts-dev-project:csvtestdataset.csvtable
    
       Last modified           Schema           Total Rows   Total Bytes   Expiration   Time Partitioning   Labels
     ----------------- ----------------------- ------------ ------------- ------------ ------------------- --------
      22 Oct 13:42:15   |- id: integer          0            0
                        |- first_name: string
                        |- last_name: string
                        |- email: string
                        |- gender: string
                        |- ip_address: string
  7. now its time to write some python. here’s mine:
    import os
    from google.cloud import bigquery
    
    def csv_loader(data, context):
            client = bigquery.Client()
            dataset_id = os.environ['DATASET']
            dataset_ref = client.dataset(dataset_id)
            job_config = bigquery.LoadJobConfig()
            job_config.schema = [
                    bigquery.SchemaField('id', 'INTEGER'),
                    bigquery.SchemaField('first_name', 'STRING'),
                    bigquery.SchemaField('last_name', 'STRING'),
                    bigquery.SchemaField('email', 'STRING'),
                    bigquery.SchemaField('gender', 'STRING'),
                    bigquery.SchemaField('ip_address', 'STRING')
                    ]
            job_config.skip_leading_rows = 1
            job_config.source_format = bigquery.SourceFormat.CSV
    
            # get the URI for uploaded CSV in GCS from 'data'
            uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
    
            # lets do this
            load_job = client.load_table_from_uri(
                    uri,
                    dataset_ref.table(os.environ['TABLE']),
                    job_config=job_config)
    
            print('Starting job {}'.format(load_job.job_id))
            print('Function=csv_loader, Version=' + os.environ['VERSION'])
            print('File: {}'.format(data['name']))
    
            load_job.result()  # wait for table load to complete.
            print('Job finished.')
    
            destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
            print('Loaded {} rows.'.format(destination_table.num_rows))
  8. because you don’t want to hardcode things like bucket/table/dataset names in code, create a yaml file that will store your deployment-specific configuration in environment variables
     BUCKET: csvtestbucket
    DATASET: csvtestdataset
    TABLE: csvtable
    VERSION: v14
  9. create a requirements.txt file for the necessary imports
    google-cloud
    google-cloud-bigquery
  10. create a .gcloudignore file so that your yaml or CSV files will not be deployed into GCP
    *csv
    *yaml
  11. at this point, your folder should look something like this:
    $ ls
    env.yaml  main.py  requirements.txt  testdata.csv
  12. now we are ready to deploy the cloud function, we will add a trigger on the storage bucket that will fire every time a new file is added to the bucket. here we are creating a cloud function named “csv_loader”:
    $ gcloud beta functions deploy csv_loader \
    --runtime=python37 \
    --trigger-resource=gs://csvtestbucket \
    --trigger-event=google.storage.object.finalize \
    --entry-point=csv_loader \
    --env-vars-file=env.yaml
  13. ok, the function is deployed, yay! copy your test data CSV into the test bucket:
    $ gsutil cp testdata.csv gs://csvtestbucket/
    Copying file://testdata.csv [Content-Type=text/csv]...
    - [1 files][ 60.4 KiB/ 60.4 KiB]
    Operation completed over 1 objects/60.4 KiB.  
  14.  now that we have copied a CSV file in the bucket, the function should fire! check the cloud function logs:
    $ gcloud functions logs read
    [ ... snipped for brevity ... ]
    D      csv_loader  274732139359754  2018-10-22 20:48:27.852  Function execution started
    I      csv_loader  274732139359754  2018-10-22 20:48:28.492  Starting job 9ca2f39c-539f-454d-aa8e-3299bc9f7287
    I      csv_loader  274732139359754  2018-10-22 20:48:28.492  Function=csv_loader, Version=v14
    I      csv_loader  274732139359754  2018-10-22 20:48:28.492  File: testdata2.csv
    I      csv_loader  274732139359754  2018-10-22 20:48:31.022  Job finished.
    I      csv_loader  274732139359754  2018-10-22 20:48:31.136  Loaded 1000 rows.
    D      csv_loader  274732139359754  2018-10-22 20:48:31.139  Function execution took 3288 ms, finished with status: 'ok'

    looks like the function ran as expected!

  15. lets eyeball the bigquery table again, and see if the row count has changed
    $ bq show csvtestdataset.csvtable
    Table rickts-dev-project:csvtestdataset.csvtable
    
       Last modified           Schema           Total Rows   Total Bytes   Expiration   Time Partitioning   Labels
     ----------------- ----------------------- ------------ ------------- ------------ ------------------- --------
      22 Oct 13:48:29   |- id: integer          1000         70950
                        |- first_name: string
                        |- last_name: string
                        |- email: string
                        |- gender: string
                        |- ip_address: string

    great! there are now 1000 rows. looking good.

  16. as a final check, lets compare the first 3 rows of the CSV
    $ egrep '^[1,2,3],' testdata.csv
    1,Andres,Berthot,aberthot0@pbs.org,Male,104.204.241.0
    2,Iris,Zwicker,izwicker1@icq.com,Female,61.87.224.4
    3,Aime,Gladdis,agladdis2@hugedomains.com,Female,29.55.250.191

    with the first 3 rows of the bigquery table

    $ bq query 'select * from csvtestdataset.csvtable \
       where id IN (1,2,3)'
    Waiting on bqjob_r6a3239576845ac4d_000001669d987208_1 ... (0s) Current status: DONE
    +----+------------+-----------+---------------------------+--------+---------------+
    | id | first_name | last_name |           email           | gender |  ip_address   |
    +----+------------+-----------+---------------------------+--------+---------------+
    |  1 | Andres     | Berthot   | aberthot0@pbs.org         | Male   | 104.204.241.0 |
    |  2 | Iris       | Zwicker   | izwicker1@icq.com         | Female | 61.87.224.4   |
    |  3 | Aime       | Gladdis   | agladdis2@hugedomains.com | Female | 29.55.250.191 |
    +----+------------+-----------+---------------------------+--------+---------------+

    and whaddyaknow, they match! w00t!

proof of concept: complete!

conclusion: cloud functions are pretty great.