This blog is last in the series on VPC native GKE clusters. In this blog, I will cover Network endpoint groups(NEG) and Container native load balancing. For the first part on GKE ip addressing, please refer here and the second part on VPC native clusters, please refer here.
Container load balancing and Network endpoint groups(NEG)
Following diagram shows how default Container load balancing works(without NEG). This is applicable to both http and network load balancer.
Traffic flows like this: Load balancer -> VM -> iptables -> Pods
Load balancer distributes traffic to instances in instance group. iptables rules in each node further distributes the traffic among pods. If the pod is not in the same node, packets get routed to another node. This causes a double hop problem as illustrated by the path below for a specific case:
Load balancer -> VM1(iptables) -> VM2(Pod3)
Double hop increases latency. To avoid double hop, Kubernetes provides a service annotation “onlyLocal” which restricts the iptables in each node to distribute traffic to only the pods in that specific node. This is how the path would look like for different flows:
The disadvantage with “onlyLocal” is that the traffic distribution between pods will not be equal. For example, in the above example, pod1 would get more traffic compared to pods 3 and 4.
Network endpoint groups(NEG):
NEG can be a backend for http load balancer similar to managed instance groups. NEGs are zonal resources and each endpoint in NEG is composed of a combination of IP address and port. IP address can be primary IP address of VM or alias IP addresses.
Container load balancing with NEG:
NEG allows Containers to be represented as first class citizens from the load balancer perspective. Before NEG, Load balancers were not aware of containers. With NEG, pod IP address and port are exposed as endpoints to Load balancer and this provides the following advantages:
Compared to regular load balancing, this approach provides better performance because it prevents double hop. Compared to “onlyLocal” approach, this provides an equal traffic distribution between the pods.
This approach provides greater visibility and troubleshooting as the iptables layer is not there from load balancing perspective.
Container load balancing feature is in beta currently. It is supported only with http load balancer.
Following diagram illustrates the block diagram with Container native load balancing:
Traffic flow would look like this: Load balancer-> NEG -> Pod1, Pod 3, Pod 4
To compare between the 3 load balancing approaches(native load balancing, native load balancing with onlyLocal, Container based load balancing with NEG), I created a Kubernetes cluster with 3 nodes and with IP aliasing enabled. I used the sample application illustrated here and deployed it with all 3 load balancing approaches. The goal was to highlight that native load balancing and container based load balancing with NEG does equal distribution of traffic between pods while “onlyLocal” approach does an uneven distribution.
Container based load balancing:
As a first step, I created a GKE cluster with IP alias enabled.
Following diagram shows how the pods are allocated between nodes:
Native load balancing with “onlyLocal”:
For this case, we need to modify the service to remove “neg” annotation and add annotation for “onlyLocal” as shown below.
Following is the service yaml for “onlyLocal” configuration:
name: neg-demo-svc # Name of Service
spec: # Service's specification
run: neg-demo-app # Selects Pods labelled run: neg-demo-app
- port: 80 # Service's port
Native load balancing:
For this case, we need to remove “neg” annotation as well as “onlyLocal” annotation.
For all 3 approaches above, I sent a request to load balancer IP 100 times and measured the distribution of traffic between the pods.
for ((i=1;i<=10;i++)); do curl "126.96.36.199"; done
What I observed is that in native load balancing and container based load balancing, traffic distribution between pods are equal. Each pod receives approximately 25% of the trafffic. In “onlyLocal” approach, I see that only 1/3 of the traffic is reaching 1 of the nodes that has both the pod scheduled. The traffic distribution looks like(pod1-33%, pod2-33%, pod3-17%, pod4-16%) with pod3 and pod4 scheduled on same node.
Between the 3 approaches mentioned above, Container load balancing with NEG provides best performance, distribution and visibility.
This blog is second in the series on VPC native GKE clusters. In this blog, I will cover overview of IP aliasing, IP alias creation options and creating VPC native clusters using alias IPs. For the first blog on GKE ip addressing, please refer here.
Overview IP aliases allows a single VM to have multiple internal IP addresses. These addresses are not the same as having multiple interfaces with each interface having a different IP address. Each of the multiple internal IP address can be used to allocate it for different services running in the VM. When the node is running containers, alias IPs can be used to allocate it to Container pods. GCP VPC network is aware of alias IPs, so the routing is taken care by VPC. Alias IP has significant advantages with GKE Containers since Containers have pod and service IP to manage in addition to the node IP and IP aliasing makes sure that these addresses are native to VPC allowing a tight integration with GCP services.
Alias IP Advantages
Alias IPs provides many advantages in the Container space as having VPC aware separate address ranges carved for pods and services gives better flexibility.
Alias IP addresses are VPC aware and routing is taken care by VPC. Without IP aliasing, pod routing was responsibility of the container orchestrator and orchestrator adds these routes manually. With IP aliasing, pod routing is taken care by VPC itself and there is no need to add manual routes to reach pod IP.
Without IP aliases, anti-spoofing checks had to be disabled in nodes that are part of GKE cluster. Traffic from the pods will have the source IP as pod IP and since VPC is not aware of pod IP, anti-spoofing checks needs to be disabled to allow this traffic to pass-thru. With IP aliases, VPC is aware of pod IPs so we can enable the anti-spoofing check. This prevents traffic with arbitrary source IPs to originate from the node.
IP aliasing prevents IP address conflicts between node IP address and cluster IP address since VPC is aware of both these addresses.
IP aliases helps in scenarios where its needed to reach the pod but not the node from external world. Since VPC is aware of alias address, we can setup different BGP and firewall rules between node and pod ip addresses.
Creating Alias IP
There are 3 ways to create Alias IP.
When creating VM, specify alias IP address that will be used by VM.
When creating subnet, create primary and secondary range or more ranges. During VM creation, use the alias IP ranges to create alias IP. Alias IP can be created for the subnets in either “auto” or “custom” network.
When GKE cluster is created with ip aliasing enabled, 2 subnet secondary ranges are created implicitly, 1 for pod IP and another for service IP. Containers pod and services gets ip allocated from this range. This approach is used by GKE.
Creating Alias IP – Approach 1:
In this approach, we will specify alias IP address when creating VM. The steps are mentioned below.
Create instance with ip address from primary and secondary range: For some reason, I could not get the CLI command to work and I was able to do this only from console. Following is the VM created with the 2 alias IPs(172.16.1.0/24, 10.0.0.1/32). These alias IPs are within the primary and secondary range specified in previous command.
Creating Alias IP – Approach 3:
In this approach, we will create alias IP for Containers.
Because we had enabled IP alias and we have used the default network, the above command automatically creates 2 secondary ranges as shown below. “10.64.0.0/14” is the first secondary range and “10.0.0.0/20” is the second secondary range. These ranges will be used for cluster IP range and service IP range.
We also have the option of manually specifying cluster IP range and service IP range like below:
The above command will create secondary ranges on the subnet using the addresses specified and use them as alias IP address for cluster and service IPs.
Following command shows the ip ranges for “default” network after creating a cluster with IP aliasing enabled. “10.128.0.0/20” is the subnet range for internal ip, “10.64.0.0./14” is the cluster ip range, “10.0.0.0/20” is the service IP range.
kubectl run web --image=gcr.io/google-samples/hello-app:1.0 --replicas=3 --port=8080
kubectl expose deployment web --target-port=8080 --type=NodePort
Let’s look at pod and service IP addresses:
kubectl get pods -o wide
NAME READY STATUS RESTARTS AGE IP NODE
web-6d695d4565-g7lbb 1/1 Running 0 1h 10.64.1.6 gke-cluster-vpc-default-pool-db114d00-2mz8
web-6d695d4565-nzdnx 1/1 Running 0 1h 10.64.0.4 gke-cluster-vpc-default-pool-db114d00-1l9m
web-6d695d4565-ttkr8 1/1 Running 0 1h 10.64.1.5 gke-cluster-vpc-default-pool-db114d00-2mz8
kubectl get services
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.0.0.1 <none> 443/TCP 1h
web NodePort 10.0.0.253 <none> 8080:30017/TCP 1h
As we can see from above output, pod IP address(10.64.0.x, 10.64.1.x) comes out of the cluster IP range and service IP address(10.0.0.253) comes out of the service IP address range. If we look at the routing table, we do not see routes specific to pod IP address as VPC is already aware of it.
This blog was written by me after a long gap of close to 7 months. Many reasons including busy work schedule, some health issues in the middle and a little bit of laziness contributed to this. I hope to be a more active blogger going forward.
In this blog series, I will cover the following topics:
GKE default IP address management(without IP aliasing)
The first blog in this series will talk about GKE default IP address management.
Following are the Kubernetes abstractions that needs IP addresses:
Node IP address – Assigned to individual nodes. The node ip address is assigned from the VPC subnet range.
Pod IP address – Assigned to individual pods. All containers within a single pod share same IP address.
Service IP address- Assigned to individual service
By default, “/14” address gets allocated for cluster IP range. Pod and service IP addresses comes out this pool. “/24” address that comes out of the cluster IP range gets assigned to each individual node and is used for pod IP allocation. “/20” address that comes out of the cluster IP range gets assigned for Kubernetes services. The user has a choice to select cluster IP range when creating the cluster.
To illustrate some of the above points, I have created a 3 node Kubernetes cluster with IP aliasing disabled. By default, VPC native clusters(ip aliasing enabled) is disabled and has to enabled manually. In the future GKE release, VPC native clusters will be the default mechanism.
Following output shows the 3 node GKE cluster created using default IP options in the “default” subnet.
$ kubectl get nodes -o wide
NAME STATUS ROLES AGE VERSION EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
gke-cluster-default-default-pool-0f6611b6-k68m Ready <none> 9d v1.12.7-gke.10 188.8.131.52 Container-Optimized OS from Google 4.14.106+ docker://17.3.2
gke-cluster-default-default-pool-0f6611b6-vrdc Ready <none> 9d v1.12.7-gke.10 184.108.40.206 Container-Optimized OS from Google 4.14.106+ docker://17.3.2
gke-cluster-default-default-pool-0f6611b6-xbql Ready <none> 9d v1.12.7-gke.10 220.127.116.11 Container-Optimized OS from Google 4.14.106+ docker://17.3.2
Cluster IP address: Following output shows the allocated range of cluster IP address(10.60.0.0/14). Pod and service ip addresses are allocated out of this range. If we do not specify the cluster IP, GKE automatically assigns a “/14” address for cluster IP. We can manually specify cluster IP using “–cluster-ipv4-cidr” option when we create the cluster.
Node IP address: Following output shows the node IP external and internal address, pod IP address allocated for each node. Node internal IP address(eg: 10.128.0.40) is assigned from the VPC subnet(10.128.0.0/20). Each node is allocated a /24 pod IP address range which allows maximum of 256 pods in a node. 10.60.2.0/24 is the pod ip address range allocated for node 1. To allow for pods going up and down, we need a buffer of pod IPs. Because of this reason, only 100 pod ip addresses(instead of 256) are allocated for each node.
Let’s deploy a sample application. The following commands will create 3 pods and create a “Nodeport” service on top.
kubectl run web --image=gcr.io/google-samples/hello-app:1.0 --replicas=3 --port=8080
kubectl expose deployment web --target-port=8080 --type=NodePort
Let’s look at the pod ip address allocated. 2 pods get allocated on single node, so it gets ip address out of “10.60.2.x/24” range and another pod gets IP out of “10.60.1.x/24” range.
$ kubectl get pods -o wide
NAME READY STATUS RESTARTS AGE IP NODE
web-6d695d4565-4cg6t 1/1 Running 0 1m 10.60.1.3 gke-cluster-default-default-pool-0f6611b6-xbql
web-6d695d4565-6dj79 1/1 Running 0 1m 10.60.2.4 gke-cluster-default-default-pool-0f6611b6-k68m
web-6d695d4565-dhfqn 1/1 Running 0 1m 10.60.2.5 gke-cluster-default-default-pool-0f6611b6-k68m
Let’s look at service IP. Service IP(10.63.243.144) is allocated out of the range(10.63.240.0/20) which in turn comes out of the cluster IP range.
$ kubectl get services
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.63.240.1 <none> 443/TCP 17m
web NodePort 10.63.243.144 <none> 8080:32668/TCP 2m
For the pods to talk across each nodes, gke needs to add explicit routes to the routing table as shown below. Since we do not have IP aliasing enabled, GCP VPC is not aware of these pod and service ip addresses and that’s the reason gke explicitly adds these routes to the routing table.
This blog is more of a quick reference notes rather than a real blog. I shared this with few Google internal folks and since they found it useful, I thought I will share these notes in the blog as well.
Please note that the Data engineer exam format changed a little from March 29, 2019. I took this exam in January 2019, so some of the notes below might not apply. The 2 big changes from before are that there are no case studies in the new format and certain new data and ML products like composer, Automl, kubeflow have been introduced.
There are many references on preparation steps for GCP Data engineer exam. Why am I writing 1 more? What I found reading through an individual’s experience is that I always get some new insight. I am hoping that this write up would help some new person preparing for the exam.
I cleared the exam last week of January 2019. I come from an infrastructure background, Data engineer was not my area of strength and this needed around a total of 8 week preparation time for me. The 8 weeks is not a dedicated time but few hours split over a 8 week period. I have been with Google cloud team for 1+ years and I found that the practical experience is important for the exam.
The exam is an objective choice with 50 questions to be done in 2 hours time frame. There are also questions asked regarding the 2 case studies. For my preparation, I used the data engineer modules of Coursera training and Linux academy. It will be good to review and breakdown the case studies beforehand so that we can avoid spending time reading the case study text in the exam. Qwiklabs and the labs associated with Coursera helps quite a bit. I have added the links in the references.
The certification tests both the theoretical knowledge as well as practical knowledge using the GCP products and technologies. My suggestion is to take the exam after getting some practical experience with GCP. Another thing that I noticed in the exam is for some questions, more than 1 answer would seem appropriate and it takes thorough analysis before answering. 1 tip would be to mark these questions for review and spend time on these after answering all other questions.
Under each topic, I have put rough notes below covering important points. The notes are not very structured and my request would be to read through the documentation/blogs associated with each topic to get more understanding.
Based on my understanding and reading through experiences of others, I have categorized the topics into following priorities. This is very subjective and I will suggest to use this at your own risk:)
Open source products – Like Hadoop, CI/CD(Jenkins)
Important topics that spans across modules:
I have cleared architect certification earlier. My take is that architect exam is oriented towards breadth of GCP infrastructure products and data engineer is oriented towards depth of GCP data products. I might be biased here considering my infrastructure background.
We can cover either by technology(Serverless, Streaming etc) like the way Coursera training does or by looking at individual products(Bigquery, Dataproc etc). For this document, I have covered by products to avoid replication. For Machine learning, I have combined all the topics into a single section. The topics are oriented towards certification.
Legacy vs Standard Sql
Legacy sql – , udf available in web console. Tables use “:” as separator
Standard sql – backtick is used, separator is . does not support TABLE_DATE_RANGE and TABLE_QUERY. Can be overcome using wildcard and table_suffix. Supports querying nested and repeated data.
COUNT(DISTINCT <expr>) is exact and scalable, providing the accuracy of EXACT_COUNT_DISTINCT without its limitations
Automatic predicate push-down through JOINs
Complex JOIN predicates, including arbitrary expressions
Table wildcards, table_suffix
Stricter timestamp checking
Avoid self-joins, use window function instead
If data is skewed like some partitions are huge, filter early. Use approximate_top_count to determine skew
Avoid joins that produces more output rows than input
Avoid point specific dml. Batch the dml statements
Sub-queries are more efficient than joins
Avoid self-joins, use window function instead
Use only columns that are needed
Filter using “WHERE” clause so that there are minimal rows
With joins, do bigger joins first. Left side of join must be the bigger table
Low cardinality “by groups” are faster. Low cardinality means that the column contains a lot of “repeats” in its data range
LIMIT doesnt affect cost as it controls only the display
Built-in functions are faster than js udf
Exact functions are slower than approximate built-in function, use approximate built-in if possible. For example, instead of using COUNT(DISTINCT), use APPROX_COUNT_DISTINCT()
Ordering on outermost query, not inner. Outer query is performed last, so put complex operations in the end when all filtering is done.
Wildcards – be more specific if possible
Performance – query time split between stages, can be seen using stackdriver as well.
Each stage – wait, read, write, compute
Tail skew – max time spent is significantly more than average. Some partitions are way bigger than other partitions. Tail skew can be found out using approximate aggregate function like APPROX_TOP_COUNT
Avoid tail skew – filter as early as possible
Batch load is free, streaming has a cost. Unless data is needed in real-time, use batch when possible.
Denormalize when possible. Still use structs and arrays.
External data sources are slow, use it only when needed.
Monitor query performance – using “details” page. Can find out if there is read, compute or write latency. Query plan shows different stages and shows breakup of time between different activities in a stage
Loading and exporting data
Can be done using UI, CLI or api.
Supported data formats:
JSON (newline delimited only)
Export from bq – to cloud storage(csv, avro, json). Export limited to 1gb. Can use wildcards to split into multiple files. “Bq extract” Bigquery transfer service – import data from other marketing apps. Adwords, doubleclick, youtube reports
Supported for cloud storage, bigtable, google drive
Create table definition file with schema, schema can also be autodetected.
Use either temporary or permanent tables. The data is not stored in these tables, its just the schema. Permanent tables can be used for access control and sharing since table is access controlled. Temporary tables are for 1-off use. Permanent tables are placed in a dataset.
Queries are not cached in this case
2 options – based on ingestion time or using a particular column already existing. In first scheme, _PARTITIONTIME is a pseudocolumn that gets added.
_PARTITIONTIME and _PARTITIONDATE are available only in ingestion-time partitioned tables.
For manual partitions, we can use any date or timestamp column.
Another approach to partitioning is to shard the data and put it into separate tables. This has more overhead because multiple tables are there, we need to maintain access control and schema for each table separately.
Query while data is getting streamed before data is written to disk100,000 rows/second insertion rate, use rest apis to insert Streaming data is available within seconds
storage cost similar to cloud storage Older unread data charged lesser query cost based on data processed. For ingest data, its based on streaming rate. pay based on usage. there is also a flat rate plan, but its mostly not used cost optimized by restricting the number of columns for which query is done Free part – loading, exporting, cached queries, queries on metadata, queries with error. Cached queries – to save on cost, per user. typical cache lifetime is 24 hours, but the cached results are best-effort and may be invalidated sooner Billing – done on the project where job is running irrespective of where the dataset is from
control at project, dataset, view. Views are virtual tables. There is no direct iam roles for controlling view access. Views are put in a new dataset and iam control is done at that dataset. This gives a virtual view access control. Called as authorized view.
Views can also be used to share rows based on particular user. Allowed user is added as a column and view will match with SESSION_USER to display only for those users
Roles – admin, data owner, editor, viewer, user(can run queries, more permissions than job user), job user. From primitive roles project owner, editor, viewer are available. For datasets, owner, writer and reader are available.
Public dataset – accessible to all authenticated users
Open source apache beam api. Can be executed in flink, spark as well. Jobs can be read, filter, group, transform Jobs are executed in parallel Tasks are written in Java or Python using beam sdk Same code is used for streaming and batch. For streaming, we bound the data by windowing using timeline, number of records Terms – source, sink, runner(pipeline execution), transform(each step in pipeline), transform applied on pcollection Pipeline is a directed graph of steps Source/Sink can be filesystem, gcs, bigquery, pub/sub Runner can be local laptop, dataflow(in cloud) Output data written can be sharded or unsharded. For unsharded, use unsharded option.
Pipeline.create Pipeline.apply – setup individual tasks Pipeline.run – pipeline started here Input and outputs are pcollection. Pcollection is not in-memory and can be unbounded. Each transform – give a name Read from source, write to sink
Pipeline in Java:
parDo – indicates to do in parallel Use “java classpath” or “mvn compile” “Mvn compile” with runner “dataflow” will run in dataflow
Pipeline in Python:
| – means apply Use “python <program>” or “python <program> dataflowrunner” Pipelines are localized to a region Shutdown options – cancel, drain. Drain is graceful.
Can be static like constant Can also be a list or map. If side input is a pcollection, we first convert to list or map and pass that as side input. Call parDo.withsideInputs with the map or list
Mapreduce in Dataflow
Map – operates in parallel, reduce – aggregates based on key parDo acts on one item at a time, similar to map operation in mapreduce, should not have state/history. Useful for filtering, mapping. In python, map done using map for 1:1, flatmap for non 1:1. In Java, done using parDo Example of map – filtering, convert type, extracting parts of input, calculating from different inputs Example of flatmap – The FlatMap example yields the line only for lines that contain the searchTerm.
groupBy: Groupby does the aggregation. Done using combine and groupby key. Combine is faster than groupby because it distributes across multiple workers. Use groupby for custom operations. In Java, groupby returns iterable. Combine example – sum, average. Groupby example – groupby state and zipcode. state is used for grouping. Combine by key – total sales by person
Size Scalable Fault-tolerant Programming model Unbounded data
Use window approach to process in dataflow For streaming data, pubsub has timestamp when data is inserted into pubsub For batching data, we can insert timestamp when data is read so that dataflow pipeline can be similar between streaming and batch In code, we set streaming option to be true. Window type – fixed, sliding, session id, global. Session id example is based on a particular user and its dynamic. Sliding window parameters – window duration, sliding window duration. (eg) 2 minute window done every 30 seconds Out of order from pubsub – taken care using aggregate Duplicate from pubsub – taken care using pubsub msgid. If sender themselves sends duplicates, pubsub wont be aware. In that case, sender can add id and dataflow can use it to remove duplicates. Dataflow can use this id instead of pubsub id in this case.
Watermark, triggers and accumulation:
Window – event arrival time window
Watermark – dataflow tracking how far processing time is behind event time. Watermark is dynamically calculated. It decides when to close the window. By default, watermark is based on message arrival in pub/sub. We can change this using option that can be set when pushing message to pub/sub.
Triggers – aggregation calculated at watermark. There is an option to calculate aggregate for each late data arrival or we can drop late data. We can also control when to trigger relative to watermark. Triggering api also helps in providing early results.
Default – all late arrival data will be discarded since default allowed_lateness is 0.
Types of triggers:
This is to handle late arrival of data Time based trigger Data driven triggers – Number of events based trigger composite(Combination of time and data based trigger )
Session window example. Useful for data irregularly distributed. Gap duration specifies that any data that is not idle for 2 minutes is grouped into same window.
With early firing scenarios:
Classic batch(no windows) Batch with fixed windows Triggering at watermark Complicated watermark and triggering Session windows
trigger set to .accumulatingFiredPanes always outputs all data in a given window, including any elements previously triggered. A trigger set to .discardingFiredPanes outputs incremental changes since the last time the trigger fired.
only project level access, no further division. Dataflow admin, developer, viewer, worker. Worker is specific to service account. Admin has access to storage bucket.
Choosing between dataproc and dataflow
Dataflow vs dataproc – existing hadoop/spark->dataproc. streaming->dataflow, complete serverless->dataflow
Supervised learning – learning based on pre-classified data. Pre-classified data is example with input and label.
Unsupervised learning – learning based on unclassified data using clustering approach.
Supervised learning types:
Linear Regression – predicts continuous number, MSE(mean square error) is reduced in this approach. Least squares and gradient descent are 2 methods to solve linear regression problem.
Finding the tip amount from data that has gender, hotel name, bill amount
Classification – predicts discrete output. Logistic regression is used. Cross-entropy function is used to minimize the error
Example: mail is spam or not
Compare linear regression and neural networks:
Regression is method dealing with linear dependencies, neural networks can deal with nonlinearities. So if your data will have some nonlinear dependencies, neural networks should perform better than regression.
Gradient descent is used to find the optimum weights. Co-efficient and biases are the weights. Gradient descent does not get us to an absolute minimum.
Start with random weight and then keep tweaking the weights to get the minimal error. The same applies when there are multiple inputs as well.
Evaluating the model is done in entire dataset either after certain number of epochs or after particular duration. If the model is good enough, stop there. This is done by measuring rmse on entire dataset.
Batch size – number of points over which we try out the weight changes. 30-100 samples
Epoch – 1 traversal thru entire dataset. Traversal is called evaluation
For continuous values, mean square error(MSE) is used as error rate for optimizing regression model. Square of the errors divided by count to get average.
For classification problems, Cross-entropy is used. Cross-entropy has a log based formula. Confusion matrix is used for classification problem to describe the performance of model. Cross-entropy is still used to minimize the error.
make note of true positive(TP), false negative(FN), false positive(FP), true negative(TN)
Accuracy – percentage of correct classification outcomes. Can be used for balanced dataset
Balanced dataset – covers both positive and negative cases. Unbalanced dataset – does not cover positive and negative cases equally.
For balanced dataset, we will use accuracy as metric. For unbalanced dataset, we will use precision and recall as metric
Precision = positive prediction value. TP/(TP+FP)
Recall – TP/(TP+FN)
Precision and recall by itself does not make sense. Threshold is important
Split into training, validation and test Training – data on which model training is done Validation – Stop the model complexity when validation data starts throwing more error than training data. Validation data should not be used to evaluate the final model. Validation dataset is used for hyper parameter tuning. Model complexity means more layers. Test – Used for final model evaluation
ML approaches 3 approaches – tensorflow, ml engine, prebuilt models
Layers: estimator api model abstraction layer(layers, error function) python low level library c++ low level library hardware(cpu, gpu, tpu)
Execution There is a build and run stage. Build stage builds the graphs. Run stage runs the tensor in a distributed manner. Placeholder and feed_dict is a dynamic way of passing values Data types are inferred implicitly tf.eager allows you to avoid the build-then-run stages. Used only for testing purposes.
Model types: Linear regressor DNregressor – deep neural network Linear classifier DNclassifier
Effective machine learning
Reading bigger data – pandas only reads it to memory. Split into multiple files, queue them as multiple epochs, use a reader and let tensor pick from queue.
Creating more features – using feature combinations(feature engineering)
Model training architecture – model itself is chosen dynamically using hyperparameter tuning
Does both training and prediction Supports tensorflow, xgboost, scikit learn, keras in beta
Advantages: Scale out training Hyper parameter tuning Preprocessing and feature creation same between training and prediction
Training: Package the python model code as python package and submit to ML engine.
Init.py Task.py – processes input variables Model.py – main model code, estimator api, input function
If the model will be used only for prediction, we can export model from tensorflow and store files in cloudstorage. Create model in gcp and link to cloud storage. Prediction types – online predictions, batch predictions. batch predictions – input and output is in gcs. Asynchronous, optimized to handle high volume of instances and complex models Online predictions – give response immediately, synchronous, json input
Feature requirements for feature engineering
Feature should be reasonably related
Feature value should be known at the time of prediction without any latency
Numeric with range of meaningful magnitude. Difficult for categorical inputs. Need to find vector/bitmask representation(1 hot encoding), for NL, we can use wordtovec. This is called sparse column in tf.
Need to have enough examples of each feature value. Atleast 5 counts of each value is a good metric. For continuous numbers, use windows.
Feature crosses can be done using human sights, this will simplify ml model.
Linear models work well for sparse features(like employee id). Dnn does not work well since we need many neurons for each bucket for employee id. Employee id will be using 1 shot encoding, only 1 of them will be set per row.
Neural network model works well for dense features(like price, color(rgb))
We can use a combination of 2. For some inputs that are sparse, we can use linear models and for sparse, dnn for dense. Dnnlinearcombinedclassifier can be used.
Wide and linear – combination of both, recommendation, search and ranking
Changing model parameters to find the right set of parameters for a particular result. We evaluate rmse with the parameter and keep tuning to lower rmse. We can define what metric we want for evaluation like rmse.
Provide arguments for parameters which needs to be tuned along with min and max for each
Provide evaluation metric – rmse
Tuning parameters – number of nodes, hidden layer count, batch size, learning rate, learning algorithm(dnn, regressor)
Master, worker, parameter servers. Parameter servers co-ordinates model state. Multiple workers need parameter server.
Tiers – basic, standard, premium, custom. Basic has only 1 worker node. Standard, premium and custom have a combination of master, worker and parameter servers.
Access control at project and models level. Jobs and operation roles are also there.
Vision api types – label detection, ocr, safe search, landmark, logo, image(dominant colors), crop, web links
How to use images, videos for ml api – gcs uri, base64 binary format
Label detection – mid(machine generated identifier for image from google knowledge source), confidence level, label
Sentiment analysis, entity
Sentiment – for document and sentence. Score gives overall sentiment, magnitude says amount of sentimental content
Entity analysis – for known entities like public figures, landmarks
Also does Syntax analysis, Classifying content
single node(experimental) standard – 1 master, multiple workers HA – 3 masters, multiple workers replication scale in hadoop – 3 copies. that means if we have 2 tb of worker storage in cluster, effective storage would be 650gb. better to not use hdfs because its difficult to persist data when cluster is destroyed, instead use gcs. preemptible worker nodes will not have hdfs storage, preemptible has same config as regular worker nodes. Workers can be preemptible. Atleast single regular worker is needed. Dataproc manages preemptible node addition and deletion
Parameters – choose dataproc version, cloud api access, initialization scripts.dataproc version controls hadoop, spark, pig versions.
Web ports – tcp port 8088 which is Hadoop, 9870 which is HDFS and 8080 which is Datalab
Dataproc allows specifying custom machine types
Replacing hdfs with google storage
goal is to separate close tie-in of compute and storage. Replacing nodes is complex with hdfs. Storage is replicated thrice with hadoop/hdfs. Storage needs to be pre-allocated in hdfs. hdfs does not allow for persistence when clusters are deleted. having gcs allows More preemptible nodes can be used for gcs for persistence. hdfs is still used for temporary data. not clear on the temporary data part since hadoop relies on sharding and keeping data close to compute.
change hadoop/spark code from hdfs:// to gs://
Steps: Move files to gcs change hadoop/spark code from hs:// to gs://
Dataproc using Apache bigtop Bigtop is an Apache Foundation project for Infrastructure Engineers and Data Scientists looking for comprehensive packaging, testing, and configuration of the leading open source big data components. Default install – Hadoop, spark, pyspark, pig additional install – using script that can be executed as part of cluster creation common script for master, worker. to do specific install, we can use metadata which specifies the master git repository available for common opensource software (https://github.com/GoogleCloudPlatform/dataproc-initialization-actions) cluster properties can also be changed
Bigquery with dataproc
do a sql query, put data into panda and then use pyspark to process. option 1 is useful for smaller results that can fit into panda after some computation.
use a bigquery connector where data is intermediated to gcs as json and then processed as pyspark. option 1 is not always possible since panda data set has to fit into memory and whole bigquery dataset will not fit into memory. We need to read it as RDD or spark dataframe, only RDD is supported now.
We cannot do a query directly from bigquery, only a whole table can be read into gcs. If we need query, run query first, export into another bq table and then read it.
Setup connector to read from bq
Load data using the BigQuery connector as a RDD
Run spark job
Output to sharded files in GCS
Bq load sharded data into bq
Cleanup temporary files
Iam access only at project level, applies to all clusters.
Editor, viewer, worker(Service accounts). Roles can be at dataproc level or project level.
Messages gets saved for 7 days Guaranteed for atleast a single delivery of the message Its possible that we can get duplicate or out of order delivery in the subscriber(dataflow takes care of this) Global, multi-tenanted, discoverable, durable, low latency, asynchronous Client libraries, autogenerated grpc also supported Topics and subscriptions
Publisher and Subscriber
Publisher: Create topic Publish message
Subscriber: Create subscription to topic 2 mechanisms – push, pull
Push scheme – pub/sub calls subscriber using https webhook provided by subscriber, faster. If the webhook is not available, pubsub does exponential backoff. Https response provides an implicit acknowledgement. (eg) App Engine Standard and Cloud Functions subscribers
Pull scheme – subscriber calls pub/sub, slower, better with many subscribers. Subscriber needs to acknowledge that it has processed the message
A single subscription can have multiple subscribers. This is useful for load balancing
Messages are opaque and its not processed by pubsub. We can send also metadata(key value pairs) to pubsub
Messages can be published as batch, this reduces network cost
Project for the topic can be different from the project for subscription
Subscriber has to ack within the timeout period
control access at project. topic, subscriber level. Nothing at publisher level. Roles – publisher, subscriber, editor, owner
key value nosql, wide column similar to hbase, cassandra, dynamodb. excels as a storage engine for batch MapReduce operations Throughput scales linearly with number of nodes indexed by row key, column key, and timestamp Data can be streamed into bigtable use cases – time series, recommendation engine, logs, analytics data, graphs Moving from hbase to bigtable is trivial No joins, transactions supported only within a single row Append only operation, no transactional.
Bigtable project is called instance, 1 instance can have multiple clusters(upto 2). Tables belongs to instances.
Development, production instance. Production instance can have 1 or 2 clusters with each cluster having 3 nodes. Development instance is a single cluster with 1 node. Second cluster is used for replication and failover. Each cluster resides in a single zone. Second cluster is in a different zone in same region
Access – hbase, cbt. Cbt is go based tool and is simpler
1 row key per table and that is indexed. We can do field promotion so that multiple columns can be included in row key
Multiple column families can be present
Row key needs to be chosen carefully so that contiguous rows are returned for queries. Row keys should also prevent hotspotting while writing.
Spread load across multiple nodes(prevent hotspotting)
Reverse domain names – so that 1 domain name is not present in same node causing hotspotting
For timestamping – reverse them so that new ones appear first. Dont put it in the first so that all new ones appear in 1 node and cause hot spotting
Dont put sequential ids as key. Salting of ids can be used.
Wide tables – for dense data. There is a single key and a bunch of properties in this case.
Narrow tables – for sparse data. User rating products. Create a key with user and product in the same key. This avoids wide tables where every product has to be listed for every user. Used for time series as well
Bigtable figures out the access patterns and improves itself by rebalancing.
Change schema to minimize data skew, allow bigtable to rebalance with real data, choose the right number of nodes, use ssd.
Key visualizer – gives bigtable usage performance
Ingest data into bigtable
1. Get/create table 2. Convert object to write into Mutation(s) 3. Write mutations to Bigtable
Read from bigtable using hbase client or using bigquery connector
project wide or instance level. Instance scope includes all clusters. Project scope includes all instances.
Difference with Datastore
data store – custom indexes, write is slow big table – for lot more data that is also accumulating fast, analytics data store built on top of big table data store provides more consistency and availability compared to big table. data store – like document nosql. bigtable – like column nosql datastore terabytes of data, bigtable petabytes of data Bigtable scales UP well, Datastore scales DOWN well
Nosql document database use cases – user profile, product catalogs, game state Datastore not good for Near zero latency, Analytics, Complex joins Serverless Availability – 99.95% for Multi-Region locations, and 99.9% for Regional locations query done using GQL, little similar to SQL
Terms and mapping to rdbms
Entity – single object (like row or document) Kind – category of object (Like table name) Property – individual data for an object(like column) Key – unique ID for each entity
Indexes – Created on every category which is not null Secondary and composite indexes – done using multiple fields Entity key – namespace, kind, key or identifier, ancestor(optional)
By default, 1 index per column is created, there are restrictions with combining multiple columns. To use multiple columns as indexes, we need to create composite indexes.
Creating composite index:
Cannot be done from gcp console.
gcloud datastore create-indexes index.yaml
Strong consistency – entity groups (this will slow down writes to 1 per second), regular read and write
Eventual consistency – global entities, indexes
Hierarchies or entity groups: set of entities connected through ancestry to a common root element. For example, “user” is the root entity, all “orders” placed by single user is the child entity. This forms entity group. Maximum write rate to per entity group is 1/sec.
Restrictions on queries. Query time is based on result returned and not based on size of dataset, so there are restrictions.
Cloud Datastore queries do not support substring matches, case-insensitive matches, or so-called full-text search. The NOT, OR, and != operators are not natively supported, but some client libraries may add support on top of Cloud Datastore.
Join operation is not supported, we need to denormalize table
Aggregation and “group by” are not supported
Inequality filters are limited to at most one property
primitive, pre-defined. pre-defined example: user, admin, viewer, index admin
Dimension for xaxis and metrics for yaxis does not allow aggregations on metrics Filter – filters some data in the report Filter controls – Filter controls give viewers a way to focus on subsets of the data. Viewers can use the filter control to select one or more dimension values from a list by which to filter the report. Caching types – query cache, pre-fetch cache. Query cache cannot be turned off. Pre-fetch cache can be turned off. Pre-fetch cache is smart cache. Pre-fetch cache is valid only for owner’s credentials for data access. Calculated fields – we can create new fields using formulas or functions and add them to charts.
not applicable, uses gsuite permissions. File stored in google drive
Owner’s credentials and viewer’s credentials is set at data source level. With owner’s credentials, data access is done using owner’s credentials, so viewer does not access to dataset. With viewer’s credentials, data access is done using viewer’s credentials, so if viewer does not have permission to dataset, they cannot view the report.
Done using dataflow Recipes done using wrangler or automatic suggestions can be used Iam – dataprep user, dataprep service agent(permission to access gcp resources) given to trifacta Pricing = 1.16*dataflow Flow – workspace. Datasets – data, recipe – applied to each dataset
Managed globally scaling database
Iam – project, instance or database level. Admin, database admin, database reader, viewer
Difference from regular sql:
Interleaved schema. Group primary key and foreign keys together. This allows for faster access to data using data locality and prevents 2 phase commit. Cascade on delete can be done with interleave. For example, if we have a “customer” and “invoice” table, rather than keeping them separate, combine them into 1 with nesting.
Primary keys – dont have sequential keys and make sure its distributed Secondary indexes – make the query efficient Avoid hot spotting like bigtable Use nested tables with primary keys called interleaving for faster access. Prevent hotspotting. No sequential numbers, timestamps. If needed, store timestamps in descending order.
Read replica can only be in same region Access – direct connection(needs whitelisting), proxy(whitelisting not needed), private access(this is new) Instance size is tied to network thruput Disk thruput and iops are tied to size of disk Binary logging – needed for point in time restore, failover replica Ssl access to sql instance is supported Innodb storage engine is preferred. For second generation instances, thats the only choice
Import – can be sql dump or csv. Sql dump cannot contain triggers, views, stored procedures. Done thru cloud storage
Instance access control is different from project access control. Instance access control is using whitelisting, proxy and adding database users. Project access control is using iam and it has editor, viewer, admin.
Linux Academy – GCP data engineer – This course gives a different perspective from Coursera and is oriented more towards exam preparation. Sample questions at the end of each chapter and practise exam is useful.
I did the following presentation “Devops with Kubernetes” in Kubernetes Sri Lanka inaugural meetup earlier this week. Kubernetes is one of the most popular open source projects in the IT industry currently. Kubernetes abstractions, design patterns, integrations and extensions make it very elegant for Devops. The slides delve little deep on these topics.
I presented this webinar “Top 3 reasons why you should run your enterprise workloads on GKE” at NEXT100 CIO forum earlier this week. Businesses are increasingly moving to Containers and Kubernetes to simplify and speed up their application development and deployment. The slides and demo covers the top reasons why Google Kubernetes engine(GKE) is one of the best Container management platforms for enterprises to deploy their containerized workloads.
This week, I did a presentation in Container Conference, Bangalore. The conference was well conducted and it was attended by 400+ quality attendees. I enjoyed some of the sessions and also had fun talking to attendees. The topic I presented was “Deep dive into Kubernetes Networking”. Other than covering Kubernetes networking basics, I also touched on Network control policy, Istio service mesh, hybrid cloud and best practises.
Recording of the Istio section of the demo: (the recording was not at conference)
As always, feedback is welcome.
I was out of blogging action for last 9 months as I was settling into my new Job at Google and I also had to take care of some personal stuff. Things are getting little clear now and I am hoping to start my blogging soon…