End To End testing for Prometheus

Time 11 minute read

When you get started with building out a monitoring platform with Prometheus, one of the challenges is understanding how the metrics flow through all of the components. The Prometheus configuration provides a few methods to manipulate incoming and outgoing metrics through relabel_config and metric_relabel_configs. There are some tools available to help construct appropriate rules such as Relabeler by Prom Labs.

So what do you do when you think it’s all set up? How do you verify that you are not only correctly scraping metrics, but also correctly remote writing the data you intend to?

From my experience, this turns into a manual and very slow trial and error process.

Is there a better way?

Well I’m glad you asked, of course!

Here’s an approach we started using.

The goal being a repeatable way to validate the entire metrics flow from scraping metrics to writing them to a remote write target.

What we’ll do is make pod that runs a few containers - the Prometheus server with our desired configuration, one or more fake cadvisor containers, and a remote write destination. We’ll make use of the e2e-framework for programmatic ease of use and repeatability. This allows us to make use of an existing Kubernetes cluster or optionally create a new KinD cluster for our testing.

I’ve copied code blocks here, but all of the code can be found on my github.

We’ll get started with using cAdvisor metrics. So our first order of business is to get a static copy of the cAdvisor data. We can pick on an existing cluster to get some good samples. The following script can be used to capture this data from the cAdvisor built into kubelet.

#!/usr/bin/env bash

mkdir -p testdata

for node in $(kubectl get nodes -o go-template --template '{{ range .items }}{{ printf "%s\n" .metadata.name }}{{ end }}')
do
   # Compress and base64 encode the data
   kubectl get --raw /api/v1/nodes/${node}/proxy/metrics/cadvisor > testdata/${node}.cadvisor
done

This gives us some solid sample data to work with.

Next, we’ll need to have this data hosted by a simple web server so we can expose these metrics via /metrics endpoint. The web server consumes a base64 encoded gzipped file. This allows us to dump it into a configmap (spoiler alert). Here is the source for our fake cAdvisor, we dont need to do anything with this yet.

package main

import (
	"bytes"
	"compress/gzip"
	"encoding/base64"
	"flag"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
)

func main() {
	port := flag.Int("port", 8080, "Port to listen on")
	cadvisorFile := flag.String("cadvisorFile", "cadvisor.prom", "Path to cadvisor prometheus metrics file")
	flag.Parse()

	metrics, err := os.ReadFile(*cadvisorFile)
	if err != nil {
		log.Fatal(err)
	}

	metricData := bytes.NewReader(metrics)

	b64Decoder := base64.NewDecoder(base64.StdEncoding, metricData)
	gzReader, err := gzip.NewReader(b64Decoder)
	if err != nil {
		log.Fatal(err)
	}

	var buf bytes.Buffer
	if _, err := io.Copy(&buf, gzReader); err != nil {
		log.Fatal(err)
	}

	if err := gzReader.Close(); err != nil {
		log.Fatal(err)
	}

	data := buf.Bytes()

	http.HandleFunc("/metrics", func(w http.ResponseWriter, _ *http.Request) {
		w.Header().Set("Content-Type", "text/plain")

		_, _ = w.Write(data)
	})

	log.Fatal(http.ListenAndServe(fmt.Sprintf("localhost:%d", *port), nil))
}

The second half of our flow is to validate the metrics that have been sent to the remote write destination. We’ll accomplish this by setting up a remote write endpoint that exposes the received metrics as metrics. This will come in handy later on when we go to check the received metrics via the prometheus testutil package. Here’s a sample program that accepts remote write metrics via /write endpoint and exposes the received metrics via /metrics.

package main

import (
	"flag"
	"fmt"
	"log"
	"net/http"
	"strings"
	"sync"

	"github.com/prometheus/common/expfmt"
	"github.com/prometheus/common/model"

	"github.com/prometheus/prometheus/storage/remote"
)

func main() {
	port := flag.Int("port", 8080, "Port to listen on")
	flag.Parse()

	server := Server{remoteWriteMetrics: make(map[model.Fingerprint]model.Sample)}

	mux := http.NewServeMux()
	// TODO might be worth switching this out for remote.NewWriteHandler
	// but that may introduce more complexity than it's worth
	mux.Handle("/write", server.remoteWriteReceiver())
	mux.Handle("/metrics", server.metrics())

	log.Fatal(http.ListenAndServe(fmt.Sprintf("localhost:%d", *port), mux))
}

type Server struct {
	sync.Mutex
	remoteWriteMetrics map[model.Fingerprint]model.Sample
}

func (s *Server) remoteWriteReceiver() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		s.Lock()
		defer s.Unlock()

		req, err := remote.DecodeWriteRequest(r.Body)
		if err != nil {
			http.Error(w, err.Error(), http.StatusBadRequest)
			return
		}

		for _, ts := range req.Timeseries {
			m := make(model.Metric, len(ts.Labels))
			for _, l := range ts.Labels {
				m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
			}

			if len(ts.Samples) != 1 {
				log.Println("expected 1 sample, got", len(ts.Samples))
				continue
			}

			s.remoteWriteMetrics[m.Fingerprint()] = model.Sample{
				Metric:    m,
				Value:     model.SampleValue(ts.Samples[0].Value),
				Timestamp: model.Time(ts.Samples[0].Timestamp),
			}
		}
	}
}

func (s *Server) metrics() http.HandlerFunc {
	return func(w http.ResponseWriter, _ *http.Request) {
		s.Lock()
		rwMetrics := s.remoteWriteMetrics
		s.Unlock()

		var sb strings.Builder
		for _, sample := range rwMetrics {
			sb.WriteString(sample.Metric.String())
			sb.WriteString(" ")
			sb.WriteString(fmt.Sprintf("%f\n", sample.Value))
		}

		var p expfmt.TextParser
		metricFamilies, err := p.TextToMetricFamilies(strings.NewReader(sb.String()))
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			fmt.Fprintf(w, "error parsing metrics: %s", err)
			return
		}

		enc := expfmt.NewEncoder(w, expfmt.FmtText)
		for _, mf := range metricFamilies {
			if err := enc.Encode(mf); err != nil {
				w.WriteHeader(http.StatusInternalServerError)
				fmt.Fprintf(w, "error encoding metrics: %s", err)
				return
			}
		}
	}
}

So now that we’ve got our helper programs, we can start to glue it together with e2e-framework. We’ll start off defining our main_test.go. This will be responsible for creating a new kind cluster, a test namespace, and running our tests.

func TestMain(m *testing.M) {
	testenv = env.New()

	kindClusterName := envconf.RandomName("my-cluster", 16)

	namespace := envconf.RandomName("sample-ns", 16)

	testenv.Setup(
		envfuncs.CreateKindCluster(kindClusterName),
		envfuncs.CreateNamespace(namespace),
	)

	testenv.Finish(
		envfuncs.DeleteNamespace(namespace),
		envfuncs.DestroyKindCluster(kindClusterName),
	)

	os.Exit(testenv.Run(m))
}

Our test itself will be composed of the e2e-framework environment features. This means we will run through the following steps:

  • Setup
  • Assess
  • Teardown

For our setup step, we will run through composing our test pod and related configmaps.

The first thing we need to do is prepare the Prometheus configmap. We’ll load in our sample configuration and then update it to reflect our static environment. This means we will set the scrape target urls and remote write urls. The remote write target will be configured to use our remote write service presented above. The scrape targets will be statically configured to use the fake cAdvisor endpoints as well. Because we’re using a single pod with multiple containers, we can cheat a bit when generating the Prometheus configuration. We can ignore any url or service lookups and just use localhost with varying ports.

// prometheusConfig reads in a base prometheus config and modifies the config
// to add in a scrape config for each cAdvisor instance as well as a remote write
// destination.
func prometheusConfig(numcAdvisors int) ([]byte, error) {

	// Read in our base config
	data, err := os.ReadFile("testdata/prometheus.yml")
	if err != nil {
		return nil, err
	}

	promConfig := config.DefaultConfig

	if err = yaml.Unmarshal(data, &promConfig); err != nil {
		return nil, err
	}

	// Configure the remote write endpoint
	u, err := url.Parse("http://localhost:8099/write")
	if err != nil {
		return nil, err
	}

	promConfig.RemoteWriteConfigs = []*config.RemoteWriteConfig{
		{
			URL: &common.URL{URL: u},
			HTTPClientConfig: common.HTTPClientConfig{
				EnableHTTP2: true,
			},
		},
	}

	// Add in a static scrape config for each cAdvisor instance
	var cadvisorScrapeConfig *config.ScrapeConfig

	for _, scrapeConfig := range promConfig.ScrapeConfigs {
		if scrapeConfig.JobName == "kubernetes-cadvisor" {
			cadvisorScrapeConfig = scrapeConfig
		}
	}

	target := &targetgroup.Group{Targets: []model.LabelSet{}}

	for i := 0; i < numcAdvisors; i++ {
		target.Targets = append(target.Targets, model.LabelSet{
			model.AddressLabel: model.LabelValue(fmt.Sprintf("localhost:81%02d", i)),
		})
	}

	staticConfig := discovery.StaticConfig{target}

	// Reset service discovery settings so we can explicitly set static targets
	cadvisorScrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{staticConfig}

	// Clear out some existing settings
	cadvisorScrapeConfig.RelabelConfigs = nil
	cadvisorScrapeConfig.Scheme = "http"
	cadvisorScrapeConfig.HonorTimestamps = false

	promConfig.ScrapeConfigs = []*config.ScrapeConfig{cadvisorScrapeConfig}

	return yaml.Marshal(promConfig)
}

This is where we start to get a bit clever. I didn’t want to deal with image management, so we build all of our programs at runtime. This involves loading up the fake-cadvisor and remote-write program source files into a configmap. We then mount the configmaps into a Go builder image and run them. The main nuance to this is that we need to copy the data out of the configmaps so that we have have a writable directory to work with.

Here we set up our fake-cadvisor configurations:

func setupcAdvisors() ([]*corev1.ConfigMap, *corev1.ConfigMap, error) {
	cadvisorConfigs, err := gzipCadvisorConfigmaps()
	if err != nil {
		return nil, nil, err
	}

	cadvisorGoMod, err := os.ReadFile("./cmd/fake-cadvisor/go.mod")
	if err != nil {
		return nil, nil, err
	}

	cadvisorGoMain, err := os.ReadFile("./cmd/fake-cadvisor/main.go")
	if err != nil {
		return nil, nil, err
	}

	cadvisorProgram := &corev1.ConfigMap{
		ObjectMeta: metav1.ObjectMeta{Name: "cadvisor-go"},
		Data: map[string]string{
			"go.mod":  string(cadvisorGoMod),
			"main.go": string(cadvisorGoMain),
		},
	}

	return cadvisorConfigs, cadvisorProgram, nil
}

And here is where we add in the fake-cadvisor containers to our prometheus deployment:

	for i := 0; i < numcAdvisors; i++ {
		deployment.Spec.Template.Spec.Containers = append(deployment.Spec.Template.Spec.Containers, corev1.Container{
			Name:  fmt.Sprintf("cadvisor-%d", i),
			Image: "golang:latest",
			Command: []string{
				"/bin/bash",
			},
			Args: []string{
				"-c",
				"cd /go/cadvisor && go run . --cadvisorFile=/etc/cadvisor/cadvisor.prom --port=81" + fmt.Sprintf("%02d", i),
			},
			VolumeMounts: []corev1.VolumeMount{
				{
					Name:      fmt.Sprintf("cadvisor-%d", i),
					MountPath: "/etc/cadvisor",
				},
				{
					Name:      "cadvisor-go",
					MountPath: "/go/cadvisor",
				},
			},
		})

		deployment.Spec.Template.Spec.Volumes = append(deployment.Spec.Template.Spec.Volumes, corev1.Volume{

Here we set up our remote-write configurations:

func setupRemoteWrite() (*corev1.ConfigMap, error) {
	remoteWriteGoMod, err := os.ReadFile("./cmd/remote-write/go.mod")
	if err != nil {
		return nil, err
	}

	remoteWriteGoMain, err := os.ReadFile("./cmd/remote-write/main.go")
	if err != nil {
		return nil, err
	}

	remoteWriteProgram := &corev1.ConfigMap{
		ObjectMeta: metav1.ObjectMeta{Name: "remote-write-go"},
		Data: map[string]string{
			"go.mod":  string(remoteWriteGoMod),
			"main.go": string(remoteWriteGoMain),
		},
	}

	return remoteWriteProgram, nil
}

And this is where we add in the remote-write container to our prometheus deployment:

						{
							Name:  "remote-write",
							Image: "golang:latest",
							Command: []string{
								"/bin/bash",
							},
							Args: []string{
								"-c",
								// Need to do copy here so we can have a writable directory
								"cp -r /go/remote-write /go/remote-write-copy && cd /go/remote-write-copy && go mod tidy && go run . --port=8099",
							},
							VolumeMounts: []corev1.VolumeMount{
								{
									Name:      "remote-write-go",
									MountPath: "/go/remote-write",
								},
							},
						},

And lastly, we deploy our magnificently functional pod:

		objects := []k8s.Object{
			cadvisorProgram,
			remoteWriteProgram,
			promConfigMap,
			promDeployment,
		}

		for _, cm := range cadvisorConfigs {
			objects = append(objects, cm)
		}

		for _, object := range objects {
			object.SetNamespace(cfg.Namespace())

			if err := cfg.Client().Resources().Create(ctx, object); err != nil {
				t.Fatal(err)
			}
		}

		if err := waitForPrometheus(ctx, t, cfg, promDeployment, numcAdvisors); err != nil {
			t.Fatal(err)
		}

		return ctx

After deploying the pod, we wait a bit. This check will wait for the prometheus pod to be up and running. After the pod it running, we bring in our prometheus testutil.ScrapeAndCompare function. We exec into the pod and start polling the remote-write endpoint and check for the up jobs to show up. If we are seeing up jobs in the remote-write endpoint, that should mean that all of the scrape jobs are running and that remote write is working. This should be enough to allow us to move on to our next step - assess.

func waitForPrometheus(ctx context.Context, t *testing.T, cfg *envconf.Config, promDeployment *appsv1.Deployment, numcAdvisors int) error {
	// Handy function to wait for pod to be running
	err := wait.For(
		conditions.New(cfg.Client().Resources()).
			DeploymentConditionMatch(promDeployment, appsv1.DeploymentAvailable, corev1.ConditionTrue),
		wait.WithTimeout(time.Minute*1),
		wait.WithInterval(100*time.Millisecond),
	)
	if err != nil {
		return err
	}

	// Get prometheus pod
	podList := &corev1.PodList{}
	err = cfg.Client().Resources().WithNamespace(cfg.Namespace()).List(ctx, podList, resources.WithLabelSelector("app=prometheus"))
	if err != nil {
		return err
	}

	if len(podList.Items) != 1 {
		return fmt.Errorf("expected 1 prometheus pod but got %d", len(podList.Items))
	}

	expectedUpDog := generateUpDog(numcAdvisors)

	// Wait for all of the cadvsiors scrapes to complete
	// // This should be enough to signify that remote write is complete as well
	err = wait.For(
		func() (bool, error) {
			// Going to ignore the errors here because we want to retry
			done, _ := execAndGetMetrics(ctx, cfg, podList.Items[0].Name, strings.NewReader(expectedUpDog), []string{"up"})
			return done, nil
		},
		wait.WithInterval(1*time.Second),
		wait.WithImmediate(),
	)
	if err != nil {
		return err
	}

	return nil
}

At long last, we can finally assess this setup! As a reminder, we’re interested in making sure that we are properly filtering down the scraped metrics. Since we’ve confirmed that our remote write pipeline is working, we should be good to go. We’ll run through the same check we did for the up job. We’ll exec in pod, grab the metrics from the remote-write container, and run our ScrapeAndCompare against the output.

		// Load the remote write metrics
		expectedCadvisorMetrics, err := os.Open("./testdata/expected.cadvisor")
		if err != nil {
			t.Fatal(err)
		}

		defer expectedCadvisorMetrics.Close()

		// Compare the metrics
		_, err = execAndGetMetrics(ctx, cfg, podList.Items[0].Name, expectedCadvisorMetrics, []string{"container_cpu_usage_seconds_total", "container_memory_working_set_bytes"})
		if err != nil {
			t.Fatal(err)
		}

		return ctx

Lastly, we’ll clean up afterwards. Since we’re tearing down the cluster afterwards, this step isn’t strictly necessary, but I thought it was worthwhile to show off. In the teardown step, we’ll delete the prometheus pod and wait for it to go away.

func teardownPrometheus() features.Func {
	return func(ctx context.Context, _ *testing.T, _ *envconf.Config) context.Context {
		teardownKubeObjects([]k8s.Object{&appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "prometheus"}}}, map[string]string{"app": "prometheus"})

		return ctx
	}
}

func teardownKubeObjects(objects []k8s.Object, labelSelectors map[string]string) features.Func {
	return func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
		r, err := resources.New(cfg.Client().RESTConfig())
		if err != nil {
			t.Fatal(err)
		}

		for _, object := range objects {
			object.SetNamespace(cfg.Namespace())

			if err = r.Delete(ctx, object); err != nil {
				t.Fatal(err)
			}
		}

		if labelSelectors != nil {
			err = wait.For(
				conditions.New(r).ResourceListN(
					&corev1.PodList{},
					0,
					resources.WithLabelSelector(labels.FormatLabels(labelSelectors)),
				),
				wait.WithInterval(200*time.Millisecond),
			)
			if err != nil {
				t.Fatal(err)
			}
		}

		return ctx
	}
}

We got to have fun exploring how we can build on the e2e-framework and use it as a base for testing. We explored how we can use prometheus testutil.ScrapeAndCompare to check for expected metrics. And lastly we put together a testing pipeline to make sure our prometheus config is working as expected.

Again, if you want to see the test in whole, the code exists on my github.


Calendar Posted:
Person Posted By:
Folder Open Categories: Go Kubernetes Prometheus Testing