Automated insert of CSV data into Bigquery via GCS bucket + Python

i wanted to try out the automatic loading of CSV data into Bigquery, specifically using a Cloud Function that would automatically run whenever a new CSV file was uploaded into a Google Cloud Storage bucket.

it worked like a champ. here’s what i did to PoC:

  1. generate a CSV file with 1000 lines of dummy data via https://www.mockaroo.com/ with their default schema so it looks like:
$ head -3 testdata.csv

id,first_name,last_name,email,gender,ip_address

1,Andres,Berthot,[email protected],Male,104.204.241.0

2,Iris,Zwicker,[email protected],Female,61.87.224.4
  • make a test Google Cloud Storage bucket:
     $ gsutil mb gs://csvtestbucket 
  • install the necessary python bits & pieces:
    $ pip3 install google-cloud-bigquery --upgrade
  • make a Bigquery dataset:
    $ bq mk --dataset rickts-dev-project:csvtestdataset
  • make a table within that dataset to match the CSV schema:
  • $ bq mk -t csvtestdataset.csvtable \
    
    id:INTEGER,first_name:STRING,last_name:STRING,email:STRING,gender:STRING,ip_address:STRING
  • eyeball the table in the Bigquery dataset and verify it is clean and fresh:
  • now its time to write some python. here’s mine:
  • import os
    from google.cloud import bigquery
    def csv_loader(data, context):
    client = bigquery.Client()
    dataset_id = os.environ['DATASET']
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
    bigquery.SchemaField('id', 'INTEGER'),
    bigquery.SchemaField('first_name', 'STRING'),
    bigquery.SchemaField('last_name', 'STRING'),
    bigquery.SchemaField('email', 'STRING'),
    bigquery.SchemaField('gender', 'STRING'),
    bigquery.SchemaField('ip_address', 'STRING')
    ]
    job_config.skip_leading_rows = 1
    job_config.source_format = bigquery.SourceFormat.CSV
    # get the URI for uploaded CSV in GCS from 'data'
    uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
    # lets do this
    load_job = client.load_table_from_uri(
    uri,
    dataset_ref.table(os.environ['TABLE']),
    job_config=job_config)
    print('Starting job {}'.format(load_job.job_id))
    print('Function=csv_loader, Version=' + os.environ['VERSION'])
    print('File: {}'.format(data['name']))
    load_job.result() # wait for table load to complete.
    print('Job finished.')
    destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
    print('Loaded {} rows.'.format(destination_table.num_rows))
    view raw main.py hosted with ❤ by GitHub
  • because you don’t want to hardcode things like bucket/table/dataset names in code, create a yaml file that will store your deployment-specific configuration in environment variables
  •  BUCKET: csvtestbucket
    
    DATASET: csvtestdataset
    
    TABLE: csvtable
    
    VERSION: v14
  • create a requirements.txt file for the necessary imports
  • google-cloud
    
    google-cloud-bigquery
  • create a .gcloudignore file so that your yaml or CSV files will not be deployed into GCP
  • *csv
    
    *yaml
  • at this point, your folder should look something like this:
  • $ ls
    
    env.yaml  main.py  requirements.txt  testdata.csv
  • now we are ready to deploy the cloud function, we will add a trigger on the storage bucket that will fire every time a new file is added to the bucket. here we are creating a cloud function named “csv_loader”:
  • $ gcloud beta functions deploy csv_loader \
    
    --runtime=python37 \
    
    --trigger-resource=gs://csvtestbucket \
    
    --trigger-event=google.storage.object.finalize \
    
    --entry-point=csv_loader \
    
    --env-vars-file=env.yaml
  • ok, the function is deployed, yay! copy your test data CSV into the test bucket:
  • $ gsutil cp testdata.csv gs://csvtestbucket/
    
    Copying file://testdata.csv [Content-Type=text/csv]...
    
    - [1 files][ 60.4 KiB/ 60.4 KiB]
    
    Operation completed over 1 objects/60.4 KiB.  
  •  now that we have copied a CSV file in the bucket, the function should fire! check the cloud function logs:
  • $ gcloud functions logs read
    
    [ ... snipped for brevity ... ]
    
    D      csv_loader  274732139359754  2018-10-22 20:48:27.852  Function execution started
    
    I      csv_loader  274732139359754  2018-10-22 20:48:28.492  Starting job 9ca2f39c-539f-454d-aa8e-3299bc9f7287
    
    I      csv_loader  274732139359754  2018-10-22 20:48:28.492  Function=csv_loader, Version=v14
    
    I      csv_loader  274732139359754  2018-10-22 20:48:28.492  File: testdata2.csv
    
    I      csv_loader  274732139359754  2018-10-22 20:48:31.022  Job finished.
    
    I      csv_loader  274732139359754  2018-10-22 20:48:31.136  Loaded 1000 rows.
    
    D      csv_loader  274732139359754  2018-10-22 20:48:31.139  Function execution took 3288 ms, finished with status: 'ok'

    looks like the function ran as expected!

  • lets eyeball the bigquery table again, and see if the row count has changed
  • $ bq show csvtestdataset.csvtable
    
    Table rickts-dev-project:csvtestdataset.csvtable
    
    Last modified           Schema           Total Rows   Total Bytes   Expiration   Time Partitioning   Labels
    
    ----------------- ----------------------- ------------ ------------- ------------ ------------------- --------
    
    22 Oct 13:48:29   |- id: integer          1000         70950
    
    |- first_name: string
    
    |- last_name: string
    
    |- email: string
    
    |- gender: string
    
    |- ip_address: string

    great! there are now 1000 rows. looking good.

  • as a final check, lets compare the first 3 rows of the CSV
  • $ egrep '^[1,2,3],' testdata.csv
    
    1,Andres,Berthot,[email protected],Male,104.204.241.0
    
    2,Iris,Zwicker,[email protected],Female,61.87.224.4
    
    3,Aime,Gladdis,[email protected],Female,29.55.250.191

    with the first 3 rows of the bigquery table

    $ bq query 'select * from csvtestdataset.csvtable \
    
    where id IN (1,2,3)'
    
    Waiting on bqjob_r6a3239576845ac4d_000001669d987208_1 ... (0s) Current status: DONE
    
    +----+------------+-----------+---------------------------+--------+---------------+
    
    | id | first_name | last_name |           email           | gender |  ip_address   |
    
    +----+------------+-----------+---------------------------+--------+---------------+
    
    |  1 | Andres     | Berthot   | [email protected]         | Male   | 104.204.241.0 |
    
    |  2 | Iris       | Zwicker   | [email protected]         | Female | 61.87.224.4   |
    
    |  3 | Aime       | Gladdis   | [email protected] | Female | 29.55.250.191 |
    
    +----+------------+-----------+---------------------------+--------+---------------+

    and whaddyaknow, they match! w00t!

    proof of concept: complete!

    conclusion: cloud functions are pretty great.

    PoC: stream nginx access logs into Bigquery

    lets say you have some servers in a cluster serving vhost foo.com and you want to put all the access logs from all the webservers for that vhost into Bigquery so you can perform analyses, or you just want all the access logs in one place.

    in addition to having the raw weblog data, you also want to keep track of which webserver the hits were served by, and what the vhost (Host header) was.

    so, foreach() server, we will install fluentd, configure it to tail the nginx access log, and upload everything to Bigquery for us.

    it worked like a champ. here’s what i did to PoC:

    1. InstallĀ fluentd
      $ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-xenial-td-agent3.sh | sh
      
    2. Create a Bigquery dataset
       $ bq mk --dataset rickts-dev-project:nginxweblogs
      Dataset 'rickts-dev-project:nginxweblogs' successfully created.
    3. Create a JSON schema to handle the weblogs + server hostname + vhost name
      [
        {
          "name": "agent",
          "type": "STRING"
        },
        {
          "name": "code",
          "type": "STRING"
        },
        {
          "name": "host",
          "type": "STRING"
        },
        {
          "name": "method",
          "type": "STRING"
        },
        {
          "name": "path",
          "type": "STRING"
        },
        {
          "name": "referer",
          "type": "STRING"
        },
        {
          "name": "size",
          "type": "INTEGER"
        },
        {
          "name": "user",
          "type": "STRING"
        },
        {
          "name": "time",
          "type": "INTEGER"
        },
        {
          "name": "hostname",
          "type": "STRING"
        },
        {
          "name": "vhost",
          "type": "STRING"
        }
      ]
    4. Create a table in the Bigquery dataset to store the weblog data
      $ bq mk -t nginxweblogs.nginxweblogtable schema.json
      Table 'rickts-dev-project:nginxweblogs.nginxweblogtable' successfully created.
    5. Install the fluentd Google Bigquery plugins
      $ sudo /usr/sbin/td-agent-gem install fluent-plugin-bigquery --no-ri --no-rdoc -V
    6. Configure fluentd to read the nginx access log for this vhost and upload to Bigquery (while also adding the server hostname and vhost name) by creating an /etc/td-agent/td-agent.conf similar to this: https://gist.github.com/rickt/641e086d37ff7453b7ea202dc4266aa5 (unfortunately WordPress won’t render it properly, sorry)

      You’ll note we are using the record_transformer fluentd filter plugin to transform the access log entries with the webserver hostname and webserver virtualhost name before injection into Bigquery.

    7. After making sure that the user fluentd runs as (td-agent by default) has read access to your nginx access logs, start (or restart) fluentd
       $ sudo systemctl start td-agent.service
    8. Now make a call to your vhost (in my case, localhost)
       $ hostname
      hqvm
      $ curl http://localhost/index.html?text=helloworld
      you sent: "helloworld"
    9. Query Bigquery to look for that specific hit, first using the bq command line tool
       $ bq query 'SELECT * FROM nginxweblogs.nginxweblogtable WHERE path = "/index.html?text=helloworld"'
      +-------------+------+------+--------+-----------------------------+---------+------+------+------+----------+--------------------------+
      |    agent    | code | host | method |            path             | referer | size | user | time | hostname |          vhost           |
      +-------------+------+------+--------+-----------------------------+---------+------+------+------+----------+--------------------------+
      | curl/7.47.0 | 200  | ::1  | GET    | /index.html?text=helloworld | -       |   14 | -    | NULL | hqvm     | rickts-dev-box.fix8r.com |
      +-------------+------+------+--------+-----------------------------+---------+------+------+------+----------+--------------------------+
    10. Congratulations, you have just setup your web access logs to inject to a Bigquery table!

    proof of concept: complete!

    conclusion: pushing your web access logs into Bigquery is extremely easy, not to mention, a smart thing to do.

    the benefits exponentially increase as your server + vhost count increases. try consolidating, compressing and analyzing logs from N+ servers using months of data in-house and you’ll see the benefits of Bigquery right away.

    enjoy!