VPC native GKE clusters – Container native LB

This blog is last in the series on VPC native GKE clusters. In this blog, I will cover Network endpoint groups(NEG) and Container native load balancing. For the first part on GKE ip addressing, please refer here and the second part on VPC native clusters, please refer here.

Container load balancing and Network endpoint groups(NEG)

Following diagram shows how default Container load balancing works(without NEG). This is applicable to both http and network load balancer.

Traffic flows like this:
Load balancer -> VM -> iptables -> Pods

Load balancer distributes traffic to instances in instance group. iptables rules in each node further distributes the traffic among pods. If the pod is not in the same node, packets get routed to another node. This causes a double hop problem as illustrated by the path below for a specific case:

Load balancer -> VM1(iptables) -> VM2(Pod3)

Double hop increases latency. To avoid double hop, Kubernetes provides a service annotation “onlyLocal” which restricts the iptables in each node to distribute traffic to only the pods in that specific node. This is how the path would look like for different flows:

Load balancer -> VM1(iptables) -> Pod1
Load balancer -> VM2(iptables) -> Pod3
Load balancer -> VM2(iptables) -> Pod4

The disadvantage with “onlyLocal” is that the traffic distribution between pods will not be equal. For example, in the above example, pod1 would get more traffic compared to pods 3 and 4.

Network endpoint groups(NEG):

NEG can be a backend for http load balancer similar to managed instance groups. NEGs are zonal resources and each endpoint in NEG is composed of a combination of IP address and port. IP address can be primary IP address of VM or alias IP addresses.

Container load balancing with NEG:

NEG allows Containers to be represented as first class citizens from the load balancer perspective. Before NEG, Load balancers were not aware of containers. With NEG, pod IP address and port are exposed as endpoints to Load balancer and this provides the following advantages:

  • Compared to regular load balancing, this approach provides better performance because it prevents double hop. Compared to “onlyLocal” approach, this provides an equal traffic distribution between the pods.
  • This approach provides greater visibility and troubleshooting as the iptables layer is not there from load balancing perspective.

Container load balancing feature is in beta currently. It is supported only with http load balancer.

Following diagram illustrates the block diagram with Container native load balancing:

Traffic flow would look like this:
Load balancer-> NEG -> Pod1, Pod 3, Pod 4

To compare between the 3 load balancing approaches(native load balancing, native load balancing with onlyLocal, Container based load balancing with NEG), I created a Kubernetes cluster with 3 nodes and with IP aliasing enabled. I used the sample application illustrated here and deployed it with all 3 load balancing approaches. The goal was to highlight that native load balancing and container based load balancing with NEG does equal distribution of traffic between pods while “onlyLocal” approach does an uneven distribution.

Container based load balancing:

As a first step, I created a GKE cluster with IP alias enabled.

gcloud container clusters create cluster-vpc --zone us-central1-b --enable-ip-alias 

The Service’s annotation, cloud.google.com/neg: '{"ingress": true}', enables container-native load balancing. 

Then, create deployment, service and ingress. Create 4 replicas so that 1 node has more than 1 pod while other nodes has 1 pod each. This application prints the details of node serving the traffic.

kubectl apply -f neg-demo-app.yaml
kubectl apply -f neg-demo-svc.yaml
kubectl apply -f neg-demo-ing.yaml
kubectl scale deployment neg-demo-app --replicas 4

Following command shows the pod distribution. As we can see, 2 pods are allocated in 1 node, other 2 are distributed between 2 other nodes.

 neg-demo-app-7bbd69746c-bwcl4    1/1       Running   0          1d   gke-cluster-vpc-default-pool-db114d00-1l9m
 neg-demo-app-7bbd69746c-drvhf    1/1       Running   0          1d   gke-cluster-vpc-default-pool-db114d00-2mz8
 neg-demo-app-7bbd69746c-qv8qx    1/1       Running   0          1d   gke-cluster-vpc-default-pool-db114d00-j9km
 neg-demo-app-7bbd69746c-tv5ll    1/1       Running   0          1d   gke-cluster-vpc-default-pool-db114d00-1l9m 

Following diagram shows how the pods are allocated between nodes:

Native load balancing with “onlyLocal”:

For this case, we need to modify the service to remove “neg” annotation and add annotation for “onlyLocal” as shown below.

Following is the service yaml for “onlyLocal” configuration:

 apiVersion: v1
 kind: Service
   name: neg-demo-svc # Name of Service
 spec: # Service's specification
   type: NodePort
   externalTrafficPolicy: Local
     run: neg-demo-app # Selects Pods labelled run: neg-demo-app
   - port: 80 # Service's port
     protocol: TCP
     targetPort: 9376 

Native load balancing:

For this case, we need to remove “neg” annotation as well as “onlyLocal” annotation.

Traffic test:

For all 3 approaches above, I sent a request to load balancer IP 100 times and measured the distribution of traffic between the pods.

 for ((i=1;i<=10;i++)); do curl ""; done 

What I observed is that in native load balancing and container based load balancing, traffic distribution between pods are equal. Each pod receives approximately 25% of the trafffic. In “onlyLocal” approach, I see that only 1/3 of the traffic is reaching 1 of the nodes that has both the pod scheduled. The traffic distribution looks like(pod1-33%, pod2-33%, pod3-17%, pod4-16%) with pod3 and pod4 scheduled on same node.

Between the 3 approaches mentioned above, Container load balancing with NEG provides best performance, distribution and visibility.


VPC native GKE clusters – IP aliasing

This blog is second in the series on VPC native GKE clusters. In this blog, I will cover overview of IP aliasing, IP alias creation options and creating VPC native clusters using alias IPs. For the first blog on GKE ip addressing, please refer here.

IP aliases allows a single VM to have multiple internal IP addresses. These addresses are not the same as having multiple interfaces with each interface having a different IP address. Each of the multiple internal IP address can be used to allocate it for different services running in the VM. When the node is running containers, alias IPs can be used to allocate it to Container pods. GCP VPC network is aware of alias IPs, so the routing is taken care by VPC. Alias IP has significant advantages with GKE Containers since Containers have pod and service IP to manage in addition to the node IP and IP aliasing makes sure that these addresses are native to VPC allowing a tight integration with GCP services.

Alias IP Advantages

Alias IPs provides many advantages in the Container space as having VPC aware separate address ranges carved for pods and services gives better flexibility.

  • Alias IP addresses are VPC aware and routing is taken care by VPC. Without IP aliasing, pod routing was responsibility of the container orchestrator and orchestrator adds these routes manually. With IP aliasing, pod routing is taken care by VPC itself and there is no need to add manual routes to reach pod IP.
  • Without IP aliases, anti-spoofing checks had to be disabled in nodes that are part of GKE cluster. Traffic from the pods will have the source IP as pod IP and since VPC is not aware of pod IP, anti-spoofing checks needs to be disabled to allow this traffic to pass-thru. With IP aliases, VPC is aware of pod IPs so we can enable the anti-spoofing check. This prevents traffic with arbitrary source IPs to originate from the node.
  • IP aliasing prevents IP address conflicts between node IP address and cluster IP address since VPC is aware of both these addresses.
  • IP aliases helps in scenarios where its needed to reach the pod but not the node from external world. Since VPC is aware of alias address, we can setup different BGP and firewall rules between node and pod ip addresses.

Creating Alias IP

There are 3 ways to create Alias IP.

  1. When creating VM, specify alias IP address that will be used by VM.
  2. When creating subnet, create primary and secondary range or more ranges. During VM creation, use the alias IP ranges to create alias IP. Alias IP can be created for the subnets in either “auto” or “custom” network.
  3. When GKE cluster is created with ip aliasing enabled, 2 subnet secondary ranges are created implicitly, 1 for pod IP and another for service IP. Containers pod and services gets ip allocated from this range. This approach is used by GKE.

Creating Alias IP – Approach 1:

In this approach, we will specify alias IP address when creating VM. The steps are mentioned below.

Create network:

gcloud compute networks create aliasnet \

Create instance with alias ip address from the subnet:

 gcloud compute instances create vm5     --zone us-central1-b     --network-interface "subnet=s1,aliases=" 

Instance network detail:

Below output shows internal IP, external IP and alias IP.

Creating Alias IP – Approach 2:

In this approach, we will first create primary and secondary range in the subnet and specify IP alias within the primary and secondary range when creating the VM.

Create network:

gcloud compute networks create aliasnet \

Create subnets with primary and secondary range:

gcloud compute networks subnets create s1 \
    --network  aliasnet \
    --region us-central1 \
    --range \
    --secondary-range range1=, range2=

Create instance with ip address from primary and secondary range:
For some reason, I could not get the CLI command to work and I was able to do this only from console. Following is the VM created with the 2 alias IPs(, These alias IPs are within the primary and secondary range specified in previous command.

Creating Alias IP – Approach 3:

In this approach, we will create alias IP for Containers.

Create cluster with IP aliasing enabled:

 gcloud container clusters create cluster-vpc --zone us-central1-b --enable-ip-alias 

Because we had enabled IP alias and we have used the default network, the above command automatically creates 2 secondary ranges as shown below. “” is the first secondary range and “” is the second secondary range. These ranges will be used for cluster IP range and service IP range.

We also have the option of manually specifying cluster IP range and service IP range like below:

 gcloud container clusters create cluster-vpc --zone us-central1-b --enable-ip-alias --cluster-ipv4-cidr=<clusteriprange> --services-ipv4-cidr=<serviceiprange>

The above command will create secondary ranges on the subnet using the addresses specified and use them as alias IP address for cluster and service IPs.

Following command shows the ip ranges for “default” network after creating a cluster with IP aliasing enabled. “” is the subnet range for internal ip, “” is the cluster ip range, “” is the service IP range.

 gcloud compute networks subnets describe default | grep -i ipcidrrange
 - ipCidrRange:
 - ipCidrRange: 

From the cluster, let’s check the cluster IP range and service IP range. This will match the default ranges.

 gcloud container clusters describe cluster-vpc | grep -e servicesIpv4Cidr -e clusterIpv4Cidr

Let’s deploy a sample application:

kubectl run web --image=gcr.io/google-samples/hello-app:1.0 --replicas=3 --port=8080
kubectl expose deployment web --target-port=8080 --type=NodePort

Let’s look at pod and service IP addresses:

 kubectl get pods -o wide
 NAME                   READY     STATUS    RESTARTS   AGE       IP          NODE
 web-6d695d4565-g7lbb   1/1       Running   0          1h   gke-cluster-vpc-default-pool-db114d00-2mz8
 web-6d695d4565-nzdnx   1/1       Running   0          1h   gke-cluster-vpc-default-pool-db114d00-1l9m
 web-6d695d4565-ttkr8   1/1       Running   0          1h   gke-cluster-vpc-default-pool-db114d00-2mz8 

 kubectl get services
 NAME         TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)          AGE
 kubernetes   ClusterIP     <none>        443/TCP          1h
 web          NodePort   <none>        8080:30017/TCP   1h 

As we can see from above output, pod IP address(10.64.0.x, 10.64.1.x) comes out of the cluster IP range and service IP address( comes out of the service IP address range. If we look at the routing table, we do not see routes specific to pod IP address as VPC is already aware of it.

In the next blog, I will talk about Network endpoint group and Container native load balancing.


VPC native GKE clusters – IP address management

This blog was written by me after a long gap of close to 7 months. Many reasons including busy work schedule, some health issues in the middle and a little bit of laziness contributed to this. I hope to be a more active blogger going forward.

In this blog series, I will cover the following topics:

The first blog in this series will talk about GKE default IP address management.

Following are the Kubernetes abstractions that needs IP addresses:

  • Node IP address – Assigned to individual nodes. The node ip address is assigned from the VPC subnet range.
  • Pod IP address – Assigned to individual pods. All containers within a single pod share same IP address.
  • Service IP address- Assigned to individual service

By default, “/14” address gets allocated for cluster IP range. Pod and service IP addresses comes out this pool. “/24” address that comes out of the cluster IP range gets assigned to each individual node and is used for pod IP allocation. “/20” address that comes out of the cluster IP range gets assigned for Kubernetes services. The user has a choice to select cluster IP range when creating the cluster.

To illustrate some of the above points, I have created a 3 node Kubernetes cluster with IP aliasing disabled. By default, VPC native clusters(ip aliasing enabled) is disabled and has to enabled manually. In the future GKE release, VPC native clusters will be the default mechanism.

Cluster output:

Following output shows the 3 node GKE cluster created using default IP options in the “default” subnet.

 $ kubectl get nodes -o wide
 NAME                                             STATUS    ROLES     AGE       VERSION          EXTERNAL-IP      OS-IMAGE                             KERNEL-VERSION   CONTAINER-RUNTIME
 gke-cluster-default-default-pool-0f6611b6-k68m   Ready     <none>    9d        v1.12.7-gke.10   Container-Optimized OS from Google   4.14.106+        docker://17.3.2
 gke-cluster-default-default-pool-0f6611b6-vrdc   Ready     <none>    9d        v1.12.7-gke.10   Container-Optimized OS from Google   4.14.106+        docker://17.3.2
 gke-cluster-default-default-pool-0f6611b6-xbql   Ready     <none>    9d        v1.12.7-gke.10   Container-Optimized OS from Google   4.14.106+        docker://17.3.2  

Cluster IP address:
Following output shows the allocated range of cluster IP address( Pod and service ip addresses are allocated out of this range. If we do not specify the cluster IP, GKE automatically assigns a “/14” address for cluster IP. We can manually specify cluster IP using “–cluster-ipv4-cidr” option when we create the cluster.

 $ gcloud container clusters describe cluster-default | grep Ip
 nodeIpv4CidrSize: 24
   podIpv4CidrSize: 24

Node IP address:
Following output shows the node IP external and internal address, pod IP address allocated for each node. Node internal IP address(eg: is assigned from the VPC subnet( Each node is allocated a /24 pod IP address range which allows maximum of 256 pods in a node. is the pod ip address range allocated for node 1. To allow for pods going up and down, we need a buffer of pod IPs. Because of this reason, only 100 pod ip addresses(instead of 256) are allocated for each node.

 $ kubectl describe nodes | grep -i -e ip -e podcidr

Let’s deploy a sample application. The following commands will create 3 pods and create a “Nodeport” service on top.

kubectl run web --image=gcr.io/google-samples/hello-app:1.0 --replicas=3 --port=8080
kubectl expose deployment web --target-port=8080 --type=NodePort

Let’s look at the pod ip address allocated. 2 pods get allocated on single node, so it gets ip address out of “10.60.2.x/24” range and another pod gets IP out of “10.60.1.x/24” range.

$ kubectl get pods -o wide
NAME                   READY     STATUS    RESTARTS   AGE       IP          NODE
web-6d695d4565-4cg6t   1/1       Running   0          1m   gke-cluster-default-default-pool-0f6611b6-xbql
web-6d695d4565-6dj79   1/1       Running   0          1m   gke-cluster-default-default-pool-0f6611b6-k68m
web-6d695d4565-dhfqn   1/1       Running   0          1m   gke-cluster-default-default-pool-0f6611b6-k68m

Let’s look at service IP. Service IP( is allocated out of the range( which in turn comes out of the cluster IP range.

$ kubectl get services
NAME         TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)          AGE
kubernetes   ClusterIP     <none>        443/TCP          17m
web          NodePort   <none>        8080:32668/TCP   2m

For the pods to talk across each nodes, gke needs to add explicit routes to the routing table as shown below. Since we do not have IP aliasing enabled, GCP VPC is not aware of these pod and service ip addresses and that’s the reason gke explicitly adds these routes to the routing table.

gke-cluster-default-3db044-19175121-779c-11e9-b601-42010a800159  default    us-central1-b/instances/gke-cluster-default-default-pool-0f6611b6-vrdc  1000
gke-cluster-default-3db044-192c50f1-779c-11e9-b601-42010a800159  default    us-central1-b/instances/gke-cluster-default-default-pool-0f6611b6-xbql  1000
gke-cluster-default-3db044-193c1da9-779c-11e9-b601-42010a800159  default    us-central1-b/instances/gke-cluster-default-default-pool-0f6611b6-k68m  1000

In the next 2 blogs, I will cover VPC native clusters using IP aliasing and Container native load balancing.

My Data engineer certification notes

This blog is more of a quick reference notes rather than a real blog. I shared this with few Google internal folks and since they found it useful, I thought I will share these notes in the blog as well.

Please note that the Data engineer exam format changed a little from March 29, 2019. I took this exam in January 2019, so some of the notes below might not apply. The 2 big changes from before are that there are no case studies in the new format and certain new data and ML products like composer, Automl, kubeflow have been introduced.

There are many references on preparation steps for GCP Data engineer exam. Why am I writing 1 more? What I found reading through an individual’s experience is that I always get some new insight. I am hoping that this write up would help some new person preparing for the exam.

I cleared the exam last week of January 2019. I come from an infrastructure background, Data engineer was not my area of strength and this needed around a total of 8 week preparation time for me. The 8 weeks is not a dedicated time but few hours split over a 8 week period. I have been with Google cloud team for 1+ years and I found that the practical experience is important for the exam.

The exam is an objective choice with 50 questions to be done in 2 hours time frame. There are also questions asked regarding the 2 case studies. For my preparation, I used the data engineer modules of Coursera training and Linux academy. It will be good to review and breakdown the case studies beforehand so that we can avoid spending time reading the case study text in the exam. Qwiklabs and the labs associated with Coursera helps quite a bit. I have added the links in the references.

The certification tests both the theoretical knowledge as well as practical knowledge using the GCP products and technologies. My suggestion is to take the exam after getting some practical experience with GCP. Another thing that I noticed in the exam is for some questions, more than 1 answer would seem appropriate and it takes thorough analysis before answering. 1 tip would be to mark these questions for review and spend time on these after answering all other questions.

Under each topic, I have put rough notes below covering important points. The notes are not very structured and my request would be to read through the documentation/blogs associated with each topic to get more understanding.

Based on my understanding and reading through experiences of others, I have categorized the topics into following priorities. This is very subjective and I will suggest to use this at your own risk:)


  • Bigquery
  • Dataflow
  • Machine learning


  • pub/sub
  • Dataproc
  • Bigtable
  • Datastore
  • Datastudio


  • Datalab
  • Dataprep
  • Spanner
  • Cloud sql
  • Open source products – Like Hadoop, CI/CD(Jenkins)
  • Storage
  • Infra

Important topics that spans across modules:

  • Performance
  • Best practises
  • IAM
  • Stackdriver

I have cleared architect certification earlier. My take is that architect exam is oriented towards breadth of GCP infrastructure products and data engineer is oriented towards depth of GCP data products. I might be biased here considering my infrastructure background.


We can cover either by technology(Serverless, Streaming etc) like the way Coursera training does or by looking at individual products(Bigquery, Dataproc etc). For this document, I have covered by products to avoid replication. For Machine learning, I have combined all the topics into a single section. The topics are oriented towards certification.


Legacy vs Standard Sql

Legacy sql – [], udf available in web console. Tables use “:” as separator

Standard sql – backtick is used, separator is . does not support TABLE_DATE_RANGE and TABLE_QUERY. Can be overcome using wildcard and table_suffix. Supports querying nested and repeated data.

Standard sql advantages:

Best practises/Performance

  • Avoid self-joins, use window function instead
  • If data is skewed like some partitions are huge, filter early. Use approximate_top_count to determine skew
  • Avoid joins that produces more output rows than input
  • Avoid point specific dml. Batch the dml statements
  • Sub-queries are more efficient than joins
  • Avoid self-joins, use window function instead
  • Use only columns that are needed
  • Filter using “WHERE” clause so that there are minimal rows
  • With joins, do bigger joins first. Left side of join must be the bigger table
  • Low cardinality “by groups” are faster. Low cardinality means that the column contains a lot of “repeats” in its data range
  • LIMIT doesnt affect cost as it controls only the display
  • Built-in functions are faster than js udf
  • Exact functions are slower than approximate built-in function, use approximate built-in if possible.  For example, instead of using COUNT(DISTINCT), use APPROX_COUNT_DISTINCT()
  • Ordering on outermost query, not inner. Outer query is performed last, so put complex operations in the end when all filtering is done.
  • Wildcards – be more specific if possible
  • Performance – query time split between stages, can be seen using stackdriver as well.
  • Each stage – wait, read, write, compute
  • Tail skew – max time spent is significantly more than average. Some partitions are way bigger than other partitions. Tail skew can be found out using approximate aggregate function like APPROX_TOP_COUNT
  • Avoid tail skew – filter as early as possible
  • Batch load is free, streaming has a cost. Unless data is needed in real-time, use batch when possible.
  • Denormalize when possible. Still use structs and arrays.
  • External data sources are slow, use it only when needed.
  • Monitor query performance – using “details” page. Can find out if there is read, compute or write latency. Query plan shows different stages and shows breakup of time between different activities in a stage

Loading and exporting data

Can be done using UI, CLI or api.

Supported data formats:

  • CSV
  • JSON (newline delimited only)
  • Avro
  • Parquet
  • ORC

Export from bq – to cloud storage(csv, avro, json). Export limited to 1gb. Can use wildcards to split into multiple files. “Bq extract”
Bigquery transfer service – import data from other marketing apps. Adwords, doubleclick, youtube reports

External tables

Supported for cloud storage, bigtable, google drive

Create table definition file with schema, schema can also be autodetected.

Use either temporary or permanent tables. The data is not stored in these tables, its just the schema. Permanent tables can be used for access control and sharing since table is access controlled. Temporary tables are for 1-off use. Permanent tables are placed in a dataset.

Queries are not cached in this case


2 options – based on ingestion time or using a particular column already existing. In first scheme, _PARTITIONTIME is a pseudocolumn that gets added.

_PARTITIONTIME and _PARTITIONDATE are available only in ingestion-time partitioned tables.

For manual partitions, we can use any date or timestamp column.

Another approach to partitioning is to shard the data and put it into separate tables. This has more overhead because multiple tables are there, we need to maintain access control and schema for each table separately.

Advanced queries

Analytical window functions:
Aggregate – sum, count
Navigation – lead, lag
Ranking, numbering – rank, cume_dist
“Partition by” is similar to “group by” but doesnt aggregate. This is different from bq partitions how data is stored.
Types – struct, array, timestamp, int64, float64, string
Inner table can be using WITH
ARRAY_AGG – creates array. UNNEST – break array.
STRUCT – creates struct
User defined functions – sql udf as well as javascript udfs is possible
Udf has constraints – size of udf output is limited, native javascript not supported
Unnest – takes an array and returns table


Query while data is getting streamed before data is written to disk100,000 rows/second insertion rate, use rest apis to insert
Streaming data is available within seconds


storage cost similar to cloud storage
Older unread data charged lesser
query cost based on data processed. For ingest data, its based on streaming rate.
pay based on usage. there is also a flat rate plan, but its mostly not used
cost optimized by restricting the number of columns for which query is done
Free part – loading, exporting, cached queries, queries on metadata, queries with error.
Cached queries – to save on cost, per user.  typical cache lifetime is 24 hours, but the cached results are best-effort and may be invalidated sooner
Billing – done on the project where job is running irrespective of where the dataset is from


control at project, dataset, view. Views are virtual tables. There is no direct iam roles for controlling view access. Views are put in a new dataset and iam control is done at that dataset. This gives a virtual view access control. Called as authorized view.

Views can also be used to share rows based on particular user. Allowed user is added as a column and view will match with SESSION_USER to display only for those users

Roles – admin, data owner, editor, viewer, user(can run queries, more permissions than job user), job user. From primitive roles project owner, editor, viewer are available. For datasets, owner, writer and reader are available.

Public dataset – accessible to all authenticated users



Open source apache beam api. Can be executed in flink, spark as well.
Jobs can be read, filter, group, transform
Jobs are executed in parallel
Tasks are written in Java or Python using beam sdk
Same code is used for streaming and batch. For streaming, we bound the data by windowing using timeline, number of records
Terms – source, sink, runner(pipeline execution), transform(each step in pipeline), transform applied on pcollection
Pipeline is a directed graph of steps
Source/Sink can be filesystem, gcs, bigquery, pub/sub
Runner can be local laptop, dataflow(in cloud)
Output data written can be sharded or unsharded. For unsharded, use unsharded option.


Pipeline.apply – setup individual tasks
Pipeline.run – pipeline started here
Input and outputs are pcollection. Pcollection is not in-memory and can be unbounded.
Each transform – give a name
Read from source, write to sink

Pipeline in Java:

parDo – indicates to do in parallel
Use “java classpath” or “mvn compile”
“Mvn compile” with runner “dataflow” will run in dataflow

Pipeline in Python:

| – means apply
Use “python <program>” or “python <program> dataflowrunner”
Pipelines are localized to a region
Shutdown options – cancel, drain. Drain is graceful.

Side input

Can be static like constant
Can also be a list or map. If side input is a pcollection, we first convert to list or map and pass that as side input.
Call parDo.withsideInputs with the map or list

Mapreduce in Dataflow

Map – operates in parallel, reduce – aggregates based on key
parDo acts on one item at a time, similar to map operation in mapreduce, should not have state/history. Useful for filtering, mapping.
In python, map done using map for 1:1, flatmap for non 1:1. In Java, done using parDo
Example of map – filtering, convert type, extracting parts of input, calculating from different inputs
Example of flatmap – The FlatMap example yields the line only for lines that contain the searchTerm.

Groupby does the aggregation. Done using combine and groupby key. Combine is faster than groupby because it distributes across multiple workers. Use groupby for custom operations.
In Java, groupby returns iterable.
Combine example – sum, average. Groupby example – groupby state and zipcode. state is used for grouping. Combine by key – total sales by person



Programming model
Unbounded data


Use window approach to process in dataflow
For streaming data, pubsub has timestamp when data is inserted into pubsub
For batching data, we can insert timestamp when data is read so that dataflow pipeline can be similar between streaming and batch
In code, we set streaming option to be true.
Window type – fixed, sliding, session id, global. Session id example is based on a particular user and its dynamic.
Sliding window parameters – window duration, sliding window duration. (eg) 2 minute window done every 30 seconds
Out of order from pubsub – taken care using aggregate
Duplicate from pubsub – taken care using pubsub msgid. If sender themselves sends duplicates, pubsub wont be aware. In that case, sender can add id and dataflow can use it to remove duplicates. Dataflow can use this id instead of pubsub id in this case.

Watermark, triggers and accumulation:

Window – event arrival time window

Watermark – dataflow tracking how far processing time is behind event time. Watermark is dynamically calculated. It decides when to close the window. By default, watermark is based on message arrival in pub/sub. We can change this using option that can be set when pushing message to pub/sub.

Triggers – aggregation calculated at watermark. There is an option to calculate aggregate for each late data arrival or we can drop late data. We can also control when to trigger relative to watermark. Triggering api also helps in providing early results.

Default – all late arrival data will be discarded since default allowed_lateness is 0.

Types of triggers:

This is to handle late arrival of data
Time based trigger
Data driven triggers – Number of events based trigger
composite(Combination of time and data based trigger )

PCollection> avgSpeed = currentConditions // .apply(“TimeWindow”, Window.into(SlidingWindows// .of(Duration.standardMinutes(5)) .every(Duration.standardSeconds(60))))

  • Above example, window is a time based trigger with sliding window type. Since lateness is not specified, allowed lateness default is 0 which means ignore late data.

PCollection> scores = input .apply(Window.into(FixedWindows.of(Minutes(2)) .triggering(AfterWatermark() .withEarlyFirings(AtPeriod(Minutes(1))) .withLateFirings(AtCount(1)) .withAllowedLateness(Minutes(30) )

  • Trigger 1 minute before watermark, at watermark and trigger after each batch of N (here N=1) late events up to a maximum of 30 minutes.

PCollection> scores = input .apply(Window.into(Sessions.withGapDuration(Minutes(2)) .triggering(AfterWatermark() .withEarlyFirings(AtPeriod(Minutes(1))) .apply(Sum.integersPerKey());

  • Session window example. Useful for data irregularly distributed. Gap duration specifies that any data that is not idle for 2 minutes is grouped into same window.

With early firing scenarios:

Classic batch(no windows)
Batch with fixed windows
Triggering at watermark
Complicated watermark and triggering
Session windows

Accumulation mode:

trigger set to .accumulatingFiredPanes always outputs all data in a given window, including any elements previously triggered. A trigger set to .discardingFiredPanes outputs incremental changes since the last time the trigger fired.


only project level access, no further division. Dataflow admin, developer, viewer, worker. Worker is specific to service account. Admin has access to storage bucket.

Choosing between dataproc and dataflow

Dataflow vs dataproc – existing hadoop/spark->dataproc. streaming->dataflow, complete serverless->dataflow

Machine learning



Supervised learning – learning based on pre-classified data. Pre-classified data is example with input and label.

Unsupervised learning – learning based on unclassified data using clustering approach.

Supervised learning types:

Linear Regression – predicts continuous number, MSE(mean square error) is reduced in this approach. Least squares and gradient descent are 2 methods to solve linear regression problem.

  • Finding the tip amount from data that has gender, hotel name, bill amount

Classification – predicts discrete output. Logistic regression is used. Cross-entropy function is used to minimize the error

  • Example: mail is spam or not

Compare linear regression and neural networks:

Regression is method dealing with linear dependencies, neural networks can deal with nonlinearities. So if your data will have some nonlinear dependencies, neural networks should perform better than regression.

Gradient descent

Gradient descent is used to find the optimum weights. Co-efficient and biases are the weights. Gradient descent does not get us to an absolute minimum.

Start with random weight and then keep tweaking the weights to get the minimal error. The same applies when there are multiple inputs as well.

Evaluating the model is done in entire dataset either after certain number of epochs or after particular duration. If the model is good enough, stop there. This is done by measuring rmse on entire dataset.

Batch size – number of points over which we try out the weight changes. 30-100 samples

Epoch – 1 traversal thru entire dataset. Traversal is called evaluation

Error metric

For continuous values, mean square error(MSE) is used as error rate for optimizing regression model. Square of the errors divided by count to get average.

For classification problems, Cross-entropy is used. Cross-entropy has a log based formula. Confusion matrix is used for classification problem to describe the performance of model. Cross-entropy is still used to minimize the error.

Confusion matrix

  • make note of true positive(TP), false negative(FN), false positive(FP), true negative(TN)
  • Accuracy – percentage of correct classification outcomes. Can be used for balanced dataset
  • Balanced dataset – covers both positive and negative cases. Unbalanced dataset – does not cover positive and negative cases equally.
  • For balanced dataset, we will use accuracy as metric. For unbalanced dataset, we will use precision and recall as metric
  • Precision = positive prediction value. TP/(TP+FP)
  • Recall – TP/(TP+FN)
  • Precision and recall by itself does not make sense. Threshold is important

Dataset types

Split into training, validation and test
Training – data on which model training is done
Validation – Stop the model complexity when validation data starts throwing more error than training data. Validation data should not be used to evaluate the final model. Validation dataset is used for hyper parameter tuning. Model complexity means more layers.
Test – Used for final model evaluation

ML approaches
3 approaches – tensorflow, ml engine, prebuilt models



estimator api
model abstraction layer(layers, error function)
python low level library
c++ low level library
hardware(cpu, gpu, tpu)

There is a build and run stage. Build stage builds the graphs. Run stage runs the tensor in a distributed manner.
Placeholder and feed_dict is a dynamic way of passing values
Data types are inferred implicitly
tf.eager allows you to avoid the build-then-run stages. Used only for testing purposes.

Model types:
Linear regressor
DNregressor – deep neural network
Linear classifier

Effective machine learning

Reading bigger data – pandas only reads it to memory. Split into multiple files, queue them as multiple epochs, use a reader and let tensor pick from queue.

Creating more features – using feature combinations(feature engineering)

Model training architecture – model itself is chosen dynamically using hyperparameter tuning

ML engine


Does both training and prediction
Supports tensorflow, xgboost, scikit learn, keras in beta

Scale out training
Hyper parameter tuning
Preprocessing and feature creation same between training and prediction

Package the python model code as python package and submit to ML engine.

Directory structure:

Task.py – processes input variables
Model.py – main model code, estimator api, input function


If the model will be used only for prediction, we can export model from tensorflow and store files in cloudstorage. Create model in gcp and link to cloud storage.
Prediction types – online predictions, batch predictions.
batch predictions – input and output is in gcs. Asynchronous, optimized to handle high volume of instances and complex models
Online predictions – give response immediately, synchronous, json input

Feature requirements for feature engineering

  • Feature should be reasonably related
  • Feature value should be known at the time of prediction without any latency
  • Numeric with range of meaningful magnitude. Difficult for categorical inputs. Need to find vector/bitmask representation(1 hot encoding), for NL, we can use wordtovec. This is called sparse column in tf.
  • Need to have enough examples of each feature value. Atleast 5 counts of each value is a good metric. For continuous numbers, use windows.
  • Feature crosses can be done using human sights, this will simplify ml model.

Model architecture

Linear models work well for sparse features(like employee id). Dnn does not work well since we need many neurons for each bucket for employee id. Employee id will be using 1 shot encoding, only 1 of them will be set per row.

Neural network model works well for dense features(like price, color(rgb))

We can use a combination of 2. For some inputs that are sparse, we can use linear models and for sparse, dnn for dense. Dnnlinearcombinedclassifier can be used.

Regressor – linear, sparse features, specific, many features, memorization, employee id, nlp

Dnn – non-linear, dense features, generalization, color, price

Wide and linear – combination of both, recommendation, search and ranking


Changing model parameters to find the right set of parameters for a particular result. We evaluate rmse with the parameter and keep tuning to lower rmse. We can define what metric we want for evaluation like rmse.

Provide arguments for parameters which needs to be tuned along with min and max for each

Provide evaluation metric – rmse

Tuning parameters – number of nodes, hidden layer count, batch size, learning rate, learning algorithm(dnn, regressor)


Master, worker, parameter servers. Parameter servers co-ordinates model state. Multiple workers need parameter server.

Tiers – basic, standard, premium, custom. Basic has only 1 worker node. Standard, premium and custom have a combination of master, worker and parameter servers.


Access control at project and models level. Jobs and operation roles are also there.

ML api

Vision api

Vision api types – label detection, ocr, safe search, landmark, logo, image(dominant colors), crop, web links

How to use images, videos for ml api – gcs uri, base64 binary format

Label detection – mid(machine generated identifier for image from google knowledge source), confidence level, label

NL api

Sentiment analysis, entity

Sentiment – for document and sentence. Score gives overall sentiment, magnitude says amount of sentimental content

Entity analysis – for known entities like public figures, landmarks

Also does Syntax analysis, Classifying content


cluster configuration

single node(experimental)
standard – 1 master, multiple workers
HA – 3 masters, multiple workers
replication scale in hadoop – 3 copies. that means if we have 2 tb of worker storage
in cluster, effective storage would be 650gb.
better to not use hdfs because its difficult to persist data when cluster is destroyed, instead use gcs.
preemptible worker nodes will not have hdfs storage, preemptible has same config as regular worker nodes. Workers can be preemptible. Atleast single regular worker is needed. Dataproc manages preemptible node addition and deletion

Parameters – choose dataproc version, cloud api access, initialization scripts.dataproc version controls hadoop, spark, pig versions.

Web ports – tcp port 8088 which is Hadoop, 9870 which is HDFS and 8080 which is Datalab

Dataproc allows specifying custom machine types

Replacing hdfs with google storage

goal is to separate close tie-in of compute and storage.
Replacing nodes is complex with hdfs. Storage is replicated thrice with hadoop/hdfs. Storage needs to be pre-allocated in hdfs.
hdfs does not allow for persistence when clusters are deleted. having gcs allows
More preemptible nodes can be used for gcs
for persistence. hdfs is still used for temporary data. not clear on the
temporary data part since hadoop relies on sharding  and keeping data close to compute.

change hadoop/spark code from hdfs:// to gs://

Move files to gcs
change hadoop/spark code from hs:// to gs://

Customizing clusters

Dataproc using Apache bigtop
Bigtop is an Apache Foundation project for Infrastructure Engineers and Data Scientists
looking for comprehensive packaging, testing, and configuration of the leading open
source big data components.
Default install – Hadoop, spark, pyspark, pig
additional install – using script that can be executed as part of cluster creation
common script for master, worker. to do specific install, we can use metadata which specifies the master
git repository available for common opensource software
cluster properties can also be changed

Bigquery with dataproc

option 1:

do a sql query, put data into panda and then use pyspark to process. option 1 is
useful for smaller results that can fit into panda after some computation.

option 2:

use a bigquery connector where data is intermediated to gcs as json and then processed as pyspark. option 1 is not always possible since panda data set has to fit into memory and whole bigquery dataset will not fit into memory. We need to read it as RDD or spark dataframe, only RDD is supported now.

We cannot do a query directly from bigquery, only a whole table can be read into gcs. If we need query, run query first, export into another bq table and then read it.


  1. Setup connector to read from bq
  2. Load data using the BigQuery connector as a RDD
  3. Run spark job
  4. Output to sharded files in GCS
  5. Bq load sharded data into bq
  6. Cleanup temporary files


Iam access only at project level, applies to all clusters.

Editor, viewer, worker(Service accounts). Roles can be at dataproc level or project level.



Messages gets saved for 7 days
Guaranteed for atleast a single delivery of the message
Its possible that we can get duplicate or out of order delivery in the subscriber(dataflow takes care of this)
Global, multi-tenanted, discoverable, durable, low latency, asynchronous
Client libraries, autogenerated grpc also supported
Topics and subscriptions

Publisher and Subscriber

Create topic
Publish message

Create subscription to topic
2 mechanisms – push, pull

Push scheme – pub/sub calls subscriber using https webhook provided by subscriber, faster. If the webhook is not available, pubsub does exponential backoff. Https response provides an implicit acknowledgement. (eg) App Engine Standard and Cloud Functions subscribers

Pull scheme – subscriber calls pub/sub, slower, better with many subscribers. Subscriber needs to acknowledge that it has processed the message

A single subscription can have multiple subscribers. This is useful for load balancing

Messages are opaque and its not processed by pubsub. We can send also metadata(key value pairs) to pubsub

Messages can be published as batch, this reduces network cost

Project for the topic can be different from the project for subscription

Subscriber has to ack within the timeout period


control access at project. topic, subscriber level. Nothing at publisher level. Roles – publisher, subscriber, editor, owner



key value nosql, wide column
similar to hbase, cassandra, dynamodb.
excels as a storage engine for batch MapReduce operations
Throughput scales linearly with number of nodes
indexed by row key, column key, and timestamp
Data can be streamed into bigtable
use cases – time series, recommendation engine, logs, analytics data, graphs
Moving from hbase to bigtable is trivial
No joins, transactions supported only within a single row
Append only operation, no transactional.


Bigtable project is called instance, 1 instance can have multiple clusters(upto 2). Tables belongs to instances.

Development, production instance. Production instance can have 1 or 2 clusters with each cluster having 3 nodes. Development instance is a single cluster with 1 node. Second cluster is used for replication and failover. Each cluster resides in a single zone. Second cluster is in a different zone in same region

Access – hbase, cbt. Cbt is go based tool and is simpler


1 row key per table and that is indexed. We can do field promotion so that multiple columns can be included in row key

Multiple column families can be present

Design goals

  • Row key needs to be chosen carefully so that contiguous rows are returned for queries. Row keys should also prevent hotspotting while writing.
  • Spread load across multiple nodes(prevent hotspotting)
  • Keep related rows close to each other
  • Use short and narrow tables for timeseries
  • Keep row sizes below 100mb. Keep cell sizes below 10mb.

Good practises

Reverse domain names – so that 1 domain name is not present in same node causing hotspotting

For timestamping – reverse them so that new ones appear first. Dont put it in the first so that all new ones appear in 1 node and cause hot spotting

Dont put sequential ids as key. Salting of ids can be used.

Design choices

Wide tables – for dense data. There is a single key and a bunch of properties in this case.

Narrow tables – for sparse data. User rating products. Create a key with user and product in the same key. This avoids wide tables where every product has to be listed for every user. Used for time series as well

Improve performance

Bigtable figures out the access patterns and improves itself by rebalancing.

Change schema to minimize data skew, allow bigtable to rebalance with real data, choose the right number of nodes, use ssd.

Key visualizer – gives bigtable usage performance

Ingest data into bigtable

From dataflow:

1. Get/create table
2. Convert object to write into Mutation(s)
3. Write mutations to Bigtable

Read from bigtable using hbase client or using bigquery connector


project wide or instance level. Instance scope includes all clusters. Project scope includes all instances.

Difference with Datastore

data store – custom indexes, write is slow
big table – for lot more data that is also accumulating fast, analytics
data store built on top of big table
data store provides more consistency and availability compared to big table.
data store – like document nosql. bigtable – like column nosql
datastore terabytes of data, bigtable petabytes of data
Bigtable scales UP well, Datastore scales DOWN well



Nosql document database
use cases – user profile, product catalogs, game state
Datastore not good for Near zero latency, Analytics, Complex joins
Availability – 99.95% for Multi-Region locations, and 99.9% for Regional locations
query done using GQL, little similar to SQL

Terms and mapping to rdbms

Entity – single object (like row or document)
Kind – category of object (Like table name)
Property – individual data for an object(like column)
Key – unique ID for each entity


Indexes – Created on every category which is not null
Secondary and composite indexes – done using multiple fields
Entity key – namespace, kind, key or identifier, ancestor(optional)

Composite index

By default, 1 index per column is created, there are restrictions with combining multiple columns. To use multiple columns as indexes, we need to create composite indexes.

Creating composite index:

Cannot be done from gcp console.

Create index.yaml:

Create index:

gcloud datastore create-indexes index.yaml


Strong consistency – entity groups (this will slow down writes to 1 per second), regular read and write

Eventual consistency – global entities, indexes

Hierarchies or entity groups:
set of entities connected through ancestry to a common root element. For example, “user” is the root entity, all “orders” placed by single user is the child entity. This forms entity group. Maximum write rate to per entity group is 1/sec.

Query restrictions


  • Restrictions on queries. Query time is based on result returned and not based on size of dataset, so there are restrictions.
  • Cloud Datastore queries do not support substring matches, case-insensitive matches, or so-called full-text search. The NOT, OR, and != operators are not natively supported, but some client libraries may add support on top of Cloud Datastore.
  • Join operation is not supported, we need to denormalize table
  • Aggregation and “group by” are not supported
  • Inequality filters are limited to at most one property


primitive, pre-defined.
pre-defined example: user, admin, viewer, index admin



Dimension for xaxis and metrics for yaxis
does not allow aggregations on metrics
Filter – filters some data in the report
Filter controls – Filter controls give viewers a way to focus on subsets of the data. Viewers can use the filter control to select one or more dimension values from a list by which to filter the report.
Caching types – query cache, pre-fetch cache. Query cache cannot be turned off. Pre-fetch cache can be turned off. Pre-fetch cache is smart cache. Pre-fetch cache is valid only for owner’s credentials for data access.
Calculated fields – we can create new fields using formulas or functions and add them to charts.


not applicable, uses gsuite permissions. File stored in google drive

Owner’s credentials and viewer’s credentials is set at data source level. With owner’s credentials, data access is done using owner’s credentials, so viewer does not access to dataset. With viewer’s credentials, data access is done using viewer’s credentials, so if viewer does not have permission to dataset, they cannot view the report.


Built on Jupyter notebook
Python, sql, javascript, bash
Datalab notebooks are stored in csr
Sharing at project level and source repository level. Web level port 8081.
Notebooks are per user
Creating team notebooks – team lead creates notebooks or each user creates notebooks. Notebook source is in csr


Done using dataflow
Recipes done using wrangler or automatic suggestions can be used
Iam – dataprep user, dataprep service agent(permission to access gcp resources) given to trifacta
Pricing = 1.16*dataflow
Flow – workspace. Datasets – data, recipe – applied to each dataset


Managed globally scaling database

Iam – project, instance or database level. Admin, database admin, database reader, viewer

Difference from regular sql:

Interleaved schema. Group primary key and foreign keys together. This allows for faster access to data using data locality and prevents 2 phase commit. Cascade on delete can be done with interleave. For example, if we have a “customer” and “invoice” table, rather than keeping them separate, combine them into 1 with nesting.


Primary keys – dont have sequential keys and make sure its distributed
Secondary indexes – make the query efficient
Avoid hot spotting like bigtable
Use nested tables with primary keys called interleaving for faster access. Prevent hotspotting. No sequential numbers, timestamps. If needed, store timestamps in descending order.

Cloud sql

Mysql, postgres

Read replica can only be in same region
Access – direct connection(needs whitelisting), proxy(whitelisting not needed), private access(this is new)
Instance size is tied to network thruput
Disk thruput and iops are tied to size of disk
Binary logging – needed for point in time restore, failover replica
Ssl access to sql instance is supported
Innodb storage engine is preferred. For second generation instances, thats the only choice

Import – can be sql dump or csv. Sql dump cannot contain triggers, views, stored procedures. Done thru cloud storage

Instance access control is different from project access control. Instance access control is using whitelisting, proxy and adding database users. Project access control is using iam and  it has editor, viewer, admin.

limitations from mysql in cloud sql:

limitations from postgres in cloud sql:

  • Point-in-time recovery (PITR)
  • Import/export in CSV format using GCP Console or the gcloud command-line tool.


Choosing Storage option

(Reference: https://cloud.google.com/storage-options/)

Case studies


Logistics and trucking company

Current technology:

Single datacenter
Sql – 2 clusters, 8 servers. User data, inventory data
Cassandra – 3 servers, metadata, tracking messages
Kafka servers – messaging
Application servers – 60 vms across 20 servers. Tomcat, nginx, batch servers
Storage appliances – iscsi for vm, nas for images, fc for sql
Hadoop – 10. analytics

Problems with current technology:

Kafka not able to scale



Use their proprietary technology to track loads
Analytics and prediction


Streaming and batching
Migrate hadoop
Elastically scalable


Lift and shift to cloud
Map hadoop -> dataproc
Kafka -> pubsub + dataflow + bigquery
Sql -> cloud sql, spanner if more than 10tb,
Cassandra -> bigtable
Storage -> gcs for datalake
Migrate from hdfs -> gcs

MJ Telco

Greenfield telecom company



Scale data processing pipelines
Multiple staging environments
Secure communication
Use scalable machine learning models


10000 to 100000 data providers
2 yrs of data with 100m records/day


pub/sub for ingestion
Ml engine – Machine learning
Separate environments – separate projects
Security control – iam control
Data lake – cloud storage, bigquery


Certification details

Coursera Data engineering
– This course has 5 modules and a bunch of labs.

Coursera – Preparing for data engineer exam
– This course is targeted as an overview of important topics to help prepare for the exam. There are also sample questions and test.

Qwiklabs Data engineer quest

Linux Academy – GCP data engineer
– This course gives a different perspective from Coursera and is oriented more towards exam preparation. Sample questions at the end of each chapter and practise exam is useful.

Data engineer practise exam

Collection of many Certification articles

Devops with Kubernetes

I did the following presentation “Devops with Kubernetes” in Kubernetes Sri Lanka inaugural meetup earlier this week. Kubernetes is one of the most popular open source projects in the IT industry currently. Kubernetes abstractions, design patterns, integrations and extensions make it very elegant for Devops. The slides delve little deep on these topics.

NEXT 100 Webinar – Top 3 reasons why you should run your enterprise workloads on GKE

I presented this webinar “Top 3 reasons why you should run your enterprise workloads on GKE” at NEXT100 CIO forum earlier this week. Businesses are increasingly moving to Containers and Kubernetes to simplify and speed up their application development and deployment. The slides and demo covers the top reasons why Google Kubernetes engine(GKE) is one of the best Container management platforms for enterprises to deploy their containerized workloads.

Following are the slides and recording:

Recording link



Container Conference Presentation

This week, I did a presentation in Container Conference, Bangalore. The conference was well conducted and it was attended by 400+ quality attendees. I enjoyed some of the sessions and also had fun talking to attendees. The topic I presented was “Deep dive into Kubernetes Networking”. Other than covering Kubernetes networking basics, I also touched on Network control policy, Istio service mesh, hybrid cloud and best practises.



Demo code and Instructions:

Github link

Recording of the Istio section of the demo: (the recording was not at conference)

As always, feedback is welcome.

I was out of blogging action for last 9 months as I was settling into my new Job at Google and I also had to take care of some personal stuff. Things are getting little clear now and I am hoping to start my blogging soon…