Brook Preloader


Moving data from GCS bucket to Big Query by applying transformations and checking data quality using cloud data fusion

This blog post is mainly intended to explain Cloud Data Fusion and creating Data Fusion Pipeline that incorporates services like GCS bucket, Data fusion, Cloud DataProc, Big Query. Here I’m using GCS bucket as a staging area, Data fusion as ETL service and Big Query as Data warehouse. We know that there are multiple services available in GCP for ETL processing and one of the familiar services is Cloud Dataflow. This blog also intended to let you know when to use cloud data flow and when to use Data fusion. Use data fusion when you’re dealing with lots of data that needs to be combined i.e. Joined or union/append operations with other data. Use cloud data flow once you got to perform a change that cannot be accomplished by just combining the info. When it comes to data fusion, For example, data types may need to be changed or adjusted, anomalies should be removed, and vague identifiers may need to be converted to more meaningful entries into records. Cloud Data Fusion may be a service for efficiently building ETL/ELT data pipelines. It uses Cloud Dataproc cluster to perform all transformations in the created pipeline.

Discussed Features:

• Implementing data fusion pipeline to move data from GCS bucket as data source to Big Query as the data sink using cloud data fusion

• Applying transformations and checking data quality using cloud data fusion which uses Dataproc cluster to perform all transforms in the pipeline

Implement Pipeline:

Step1: In the Cloud Console, click the Activate Cloud Shell button in the top right toolbar.

Once you are connected, you will be authenticated, and the project is set to your PROJECT_ID.

You can list the active account name and project ID with these commands:

  • gcloud auth list
  • gcloud config list project

Step2: You need to make sure that your project has the appropriate permissions within Identity and Access Management (IAM).

  1. On the Navigation menu, click IAM & Admin > IAM.
  2. Set the default to compute Service Account {project-number} to the editor role

Step3: Enable Cloud Data fusion API either using UI console or Command-line shell using commands

Step4: On the Navigation menu, select Data Fusion. To create a Cloud Data Fusion instance, click Create an Instance

Step5: Once the instance is created, grant the service account associated with the instance permissions on your project. Click on the instance name and Navigate to the instance detail page

  • Copy the service account & navigate to the IAM & Admin > IAM.
  • Within the IAM page, include the respective service account as a new role and provide Cloud Data Fusion API Service Agent role

Step6: Loading the data, create a GCS bucket to act as a staging area of the input file, and also create a backup storage bucket which data fusion asks in further process

Step7: Click the View Instance option on the created Cloud Data Fusion instance

Once you are in Cloud Data Fusion UI, you can see all the services available in data fusion. In this blog, I’ll be working on Wrangler which is an interactive, visual tool that allows you to perform transformations on a small subset of your data before dispatching large, parallel-processing jobs on the entire dataset.

Step8: Select the Wrangler feature, there you can find our existing buckets. Choose your bucket and make sure to upload your input file into the bucket and select that CSV file

Step9: Cleaning the data, I would like to perform some transformations to parse and clean the CSV file data which I’ve selected previously

  1. Select the body column and click the down arrow. There you can see the Parse option, inside that select CSV and also check box the Set first row as header and then click Apply. You can see the data split into multiple columns.
  2. Apply sample transformations with simple clicks. Click the Down arrow to trip_distance col. select THE Change data type and click on Float.
  3. There are few anomalies, such as negative values in some columns. You can filter out those negative values using Wrangler. Click the Down arrow next to the trip_distance column and choose Filter. Proceed with the Custom condition and keep input >0.0 and click on Apply

Step10: Creating the pipeline

After performing basic cleansing and transformations on the input file. Let’s create a data fusion pipeline. This pipeline runs on an ephemeral Cloud Dataproc cluster in the backend.

  1. Click Create a Pipeline and select Batch pipeline.

Once you’re redirected to Data Pipelines UI, you can see a GCS File source node connected to a Wrangler node that contains all the transformations you applied.

Step11: As per the requirement, I’d like to integrate BigQuery into my pipeline to perform data warehousing on the cleansed data. Create a bigQuery data set and table and take the schema of input file with zone names

Step12: Joining two sources

Now join the two data sources—taxi trip data and zone names—to generate more informative output.

  1. Click on the Analytics section in the Plugin Palette, select Joiner. A Joiner node appears on the canvas.
  2.  Connect the Wrangler node and the BigQuery node to the Joiner node.

3. Click Properties of Joiner. Keep the Join Type to Inner. Set the Join Condition to hitch the pickup_location_id column in the Wrangler node to the zone_id column within the BigQuery node.

Step13: Storing the output to BigQuery After joining the two nodes, select the sink section and choose your service to store the output. I’d like to take bigQuery as the sink here. I’m connecting the joiner node to the bigquery node as a destination node

Step14: Deploying and running the pipeline

Click on the Deploy button to deploy the created pipeline.

Once the pipeline is deployed, now click on Run to start processing the data. After that, you will get status as successful once the pipeline is created

Step15: Viewing the results

To view the results after the pipeline runs:

  1. Return to the tab where you have BigQuery open. Run the query below to ascertain the values within the trips_pickup_name table.

Intended Audience: Everyone

Keywords used in the blog:  Cloud data fusion ,building Pipelines, Transformations,  BigQuery

References / Sources of the information referred

Contact for further details:

Pavan Kumar Rudhramahanti
Associate Trainee – DWH & Data Lakes Analytics

0 0 votes
Inline Feedbacks
View all comments