TechTutorial

Stay Informed with Kubernetes Informers

By May 1, 2019 May 17th, 2019 No Comments

FireHydrant.io has a changelog feature with a Kubernetes integration. Building this integration was challenging because the knowledge about creating an event oriented system with the Kubernetes client-go project was not as easy to understand as I would have hoped.

To get started, I explored using the simple watcher interface that is baked into the Go package. One of the problems with this interface, however, is you end up rolling your own event type routing and indexing. This is where the Informer types in the cache package come in. So let's build a simple tool that informs us Kubernetes resource updates!

The Cache Package

The client-go package comes with a subpackage that makes getting events easy: k8s.io/client-go/tools/cache. It allows us to easily add functions that will be called when certain events come in. It also allows us to store all of the objects in memory easily which is called an Store.

While the cache package provides the tools we need, initializing and using it can be cumbersome when it comes to receiving simple updates from the Kubernetes API. For reasons I can’t fully understand either, almost every blog post I found uses this package as is.

However, there’s another package that ties the concepts the cache package provides into one: The k8s.io/client-go/informers package. It comes with a simple factory for all Kubernetes resources that nearly mirrors the kubernetes.Interface type. Let's get to some code:

package main

import (
    "os"

    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {
    kubeconfig := os.Getenv("KUBECONFIG")
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        panic(err.Error())
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }
}

In this example, we’re setting up a Kubernetes client using our KUBECONFIG environment variable. This will allow us to easily create an informers factory which requires a kubernetes.Interface type. From here, let's initialize a informer factory:

factory := informers.NewSharedInformerFactory(clientset, 0)

This package returns a type called SharedInformerFactory. You can find the documentation for this type here. The first argument in this example is our client that connects and interacts with the Kubernetes API. The second argument is how often this informer should perform a resync. What this means is it will list all resources and rehydrate the informer's store. The reason this useful is it creates a higher guarantee that your informer's store has a perfect picture of the resources it is watching. There are situations where events can be missed entirely and resyncing every so often solves this. Setting to 0 disables resync.

This is great because now we can very easily start receiving events for resources we care about. Using the appropriate API Group method, let’s start receiving Add events for Pods.

factory := informers.NewSharedInformerFactory(clientset, 0)
informer := factory.Core().V1().Pods().Informer()
stopper := make(chan struct{})
defer close(stopper)`

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        // "k8s.io/apimachinery/pkg/apis/meta/v1" provides an Object
        // interface that allows us to get metadata easily
        mObj := obj.(v1.Object)
        log.Printf("New Pod Added to Store: %s", mObj.GetName())
    },
})

informer.Run(stopper)

Let’s break this code down. There’s a few things that are missing from this being “production” ready but this code will already display all pods in our cluster and print any new ones that are created in all namespaces.

informer := factory.Core().V1().Pods().Informer()
stopper := make(chan struct{})
defer close(stopper)

The first line initializes an informer that is geared for Pods in the Core/v1 API Group. The second creates a simple channel that we can pass to our Informer when we tell it to run. This channel is used to stop the Informer’s run loop that is watching / listing the Pod resource.

informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        // "k8s.io/apimachinery/pkg/apis/meta/v1" provides an Object
        // interface that allows us to get metadata easily
        mObj := obj.(v1.Object)
        log.Printf("New Pod Added to Store: %s", mObj.GetName())
    },
})

On this line, we’re adding handler functions (just the AddFunc one for now). This function is called every time an object is added to our Informer's underlying Store. The single parameter in our case will always be a Pod, but we need to convert it.

Most of the time your AddFunc will just be pushing events to a work queue (more on this later), so you're not too concerned with the actual type. But, I want to log the name of the Pod that was just added to our store.

I like using the k8s.io/apimachinery/pkg/apis/meta/v1 package for this because it provides an Object interface that makes retrieving Metadata on any Kubernetes object easy. So methods such as GetName() and GetNamespace() work for everything this function receives now.

informer.Run(stopper)

Lastly, we tell our Informer to run. This starts the run loop and waits for our stopper channel to be closed. We’re not closing this channel since this is the end of our main.go file so far.

The entirety of the file looks like this now:

# main.go
package main

import (
    "log"
    "os"

    "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
)

func main() {
    kubeconfig := os.Getenv("KUBECONFIG")
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        panic(err.Error())
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    factory := informers.NewSharedInformerFactory(clientset, 0)
    informer := factory.Core().V1().Pods().Informer()
    stopper := make(chan struct{})
    defer close(stopper)
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            // "k8s.io/apimachinery/pkg/apis/meta/v1" provides an Object
            // interface that allows us to get metadata easily
            mObj := obj.(v1.Object)
            log.Printf("New Pod Added to Store: %s", mObj.GetName())
        },
    })

    informer.Run(stopper)
}

So far our project connects to a Kubernetes cluster, sets up an informer for Pods in all namespaces, and then starts the Informer run loop. Our program will print all pods that are added to the cluster (and the initial warmup of pods when the Store syncs).

$ go run main.go
2018/08/13 19:23:19 New Pod Added to Store: firehydrant-k8s-changelog-7846b88785-68tqs
2018/08/13 19:23:19 New Pod Added to Store: kube-dns-86f4d74b45-2fxw2
2018/08/13 19:23:19 New Pod Added to Store: kube-scheduler-minikube
2018/08/13 19:23:19 New Pod Added to Store: storage-provisioner

What’s Next?

In the next blog post we’ll start hardening this implementation (this is not production ready) by adding a work queue and better control flow. I hope this intro was helpful! Stay tuned for the follow up.

Final result is here: https://github.com/firehydrant-io/blog-stay-informed

FireHydrant takes you from oops to ops

Manage deploys, incidents, and post mortems like it's no big deal.

Learn More
Bobby Tables

Bobby Tables

My name is Robert but people call me Bobby, Bobby Tables. I'm a long time software tinkerer and love building tools for other engineers and writing about it!