A Kubernetes controller is a program that continuously monitors object states in a cluster and makes changes based on predefined logic. Kubernetes has several built-in controllers, but developers often write custom ones to expand the capabilities of Kubernetes.
In this article, you will discover an example of a custom controller and learn how to write integration tests for it. Rather than building a controller from scratch, we'll use an existing one written in .NET and focus on writing the tests.
kubernetes-reflector
is a custom Kubernetes Controller written in .NET that replicates secrets
, configmaps
, and certificates
. It monitors changes (creation, updating, or deletion) to configmaps
and secrets
, then reflects these changes to mirror resources in designated namespaces.
Since this project currently lacks integration tests, we will implement them.
To write a proper test for kubernetes-reflector
, you'll need testcontainers-dotnet and a Docker-compatible Kubernetes distribution like k3s or kind. Since testcontainers
offers a k3s module, we'll use that for simplicity.
I have already a fork of kubernetes-reflector on my GitHub account. Make sure to fork or clone the repo before continuing, then create a xUnit project and add the required NuGet packages.
git clone https://github.com/emberstack/kubernetes-reflector.git
cd kubernetes-reflector
mkdir tests
# Create the test project
dotnet new xunit -n ES.Kubernetes.Reflector.Tests -o tests\ES.Kubernetes.ReflectorTests
# Add the project to the solution
dotnet sln add .\tests\ES.Kubernetes.Reflector.Tests
# Add packages
dotnet add .\tests\ES.Kubernetes.Reflector.Tests\ES.Kubernetes.Reflector.Tests.csproj package Testcontainers.K3s
dotnet add .\tests\ES.Kubernetes.Reflector.Tests\ES.Kubernetes.Reflector.Tests.csproj package Microsoft.AspNetCore.Mvc.Testing
dotnet add .\tests\ES.Kubernetes.Reflector.Tests\ES.Kubernetes.Reflector.Tests.csproj package Shouldly
Next, expose the implicitly defined Program
class to make it accessible to the test project by making the Program
class public using a partial class declaration:
using ES.FX.Hosting.Lifetime;
// ... omitted for brevity
return await ProgramEntry.CreateBuilder(args).UseSerilog().Build().RunAsync(async _ =>
{
var builder = WebApplication.CreateBuilder(args);
// ... omitted for brevity
builder.IgniteKubernetesClient();
builder.Services.AddMediatR(config =>
config.RegisterServicesFromAssembly(typeof(Program).Assembly));
// ... omitted for brevity
builder.Services.AddHostedService<NamespaceWatcher>();
builder.Services.AddHostedService<SecretWatcher>();
builder.Services.AddHostedService<ConfigMapWatcher>();
// ... omitted for brevity
var app = builder.Build();
app.Ignite();
await app.RunAsync();
return 0;
});
// 👇
public partial class Program { }
WebApplicationFactory
allows to create the test server for the integration tests. We will customize it by replacing the kubernetes client registered in the DI container of kubernetes-reflector by a new one created using a kubeconfig file from testcontainers.
Create a class called CustomWebApplicationFactory
that inherits from WebApplicationFactory<TEntryPoint>
, where TEntryPoint represents the Program class.
using ES.Kubernetes.Reflector.Core.Configuration;
using k8s;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc.Testing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Testcontainers.K3s;
namespace ES.Kubernetes.Reflector.Tests;
public class CustomWebApplicationFactory : WebApplicationFactory<Program>, IAsyncLifetime
{
// 👇 Add the image tag that aligns with your controller's supported Kubernetes version
private readonly K3sContainer _container = new K3sBuilder()
.WithImage("rancher/k3s:v1.26.2-k3s1")
.Build();
protected override void ConfigureWebHost(IWebHostBuilder builder)
{
// 👇 Get the Kubeconfig file from K3sContainer
var kubeConfigContent = _container.GetKubeconfigAsync().GetAwaiter().GetResult();
if (string.IsNullOrWhiteSpace(kubeConfigContent))
{
throw new InvalidOperationException("Kubeconfig content is empty");
}
builder.ConfigureServices(services =>
{
// 👇 Remove the existing KubernetesClientConfiguration and IKubernetes registrations
var kubernetesClientConfiguration = services.SingleOrDefault(
d => d.ServiceType == typeof(KubernetesClientConfiguration));
if (kubernetesClientConfiguration is not null)
{
services.Remove(kubernetesClientConfiguration);
}
// 👇 Add a new KubernetesClientConfiguration to the DI container
services.AddSingleton(s =>
{
var reflectorOptions = s.GetRequiredService<IOptions<ReflectorOptions>>();
// 👇 Store the Kubeconfig file at a temporary location
var tempFile = Path.GetTempFileName();
File.WriteAllText(tempFile, kubeConfigContent);
var config = KubernetesClientConfiguration.BuildConfigFromConfigFile(tempFile);
config.HttpClientTimeout = TimeSpan.FromMinutes(30);
return config;
});
services.AddSingleton<IKubernetes>(s =>
new k8s.Kubernetes(s.GetRequiredService<KubernetesClientConfiguration>()));
});
}
public Task InitializeAsync()
{
return _container.StartAsync();
}
public new Task DisposeAsync()
{
return _container.DisposeAsync().AsTask();
}
}
💡 The implementation of IAsyncLifetime
lets us start the k8s container asynchronously without dealing with await/async in the constructor.
Now let's create a base class for all our test classes that encapsulate some reusable logic and make our test methods shorter.
When testing a custom controller, it's important to note that the Kubernetes API Server sometimes take time to send events to the controller, which then will bring the cluster to the desired state. As a result, we must implement a retry mechanism in our assertions to avoid false negatives.
For our base test class, we'll need the following elements:
kubernetes-reflector'
s replication featureLet's first add the Poll.Core
package to our test project
dotnet add .\tests\ES.Kubernetes.Reflector.Tests\ES.Kubernetes.Reflector.Tests.csproj package Polly.Core
using k8s;
using k8s.Models;
using Microsoft.Extensions.DependencyInjection;
using Polly;
using Polly.Retry;
using k8s.Autorest;
namespace ES.Kubernetes.Reflector.Tests;
public abstract class BaseIntegrationTest : IClassFixture<CustomWebApplicationFactory>
{
protected readonly HttpClient Client;
protected readonly IKubernetes K8SClient;
protected readonly ResiliencePipeline<bool> Pipeline;
protected BaseIntegrationTest(CustomWebApplicationFactory factory)
{
Client = factory.CreateClient();
var scope = factory.Services.CreateScope();
K8SClient = scope.ServiceProvider.GetRequiredService<IKubernetes>();
Pipeline = new ResiliencePipelineBuilder<bool>()
.AddRetry(new RetryStrategyOptions<bool>
{
ShouldHandle = new PredicateBuilder<bool>()
.Handle<HttpOperationException>(ex =>
ex.Response.StatusCode == System.Net.HttpStatusCode.NotFound)
.HandleResult(false),
MaxRetryAttempts = 5,
Delay = TimeSpan.FromSeconds(2),
})
.AddTimeout(TimeSpan.FromSeconds(30))
.Build();
}
}
When executing via the pipeline via pipeline.ExecuteAsync()
:
configmaps
or secrets
not found) or false result:TimeoutRejectedException
The liveness endpoint is available at /health/live
. Let's use the HTTP Client from BaseIntegrationTest
to test it.
public class HealthCheckTests(CustomWebApplicationFactory factory) :
BaseIntegrationTest(factory)
{
[Fact]
public async Task LivenessHealthCheck_Should_Return_Healthy()
{
var response = await Client.GetAsync("/health/live");
response.StatusCode.ShouldBe(HttpStatusCode.OK);
response.Content.Headers.ContentType?.MediaType.ShouldBe("text/plain");
var content = await response.Content.ReadAsStringAsync();
content.ShouldBe("Healthy");
}
}
kubernetes-reflector
replicates ConfigMaps and Secrets if specific annotations are added to the source object.
For instance, when you have a ConfigMap named conf001
in the dev
namespace and want to replicate it to the qa1-ns
and qa2-ns
namespaces, you should add the following annotations to conf001
.
apiVersion: v1
kind: ConfigMap
metadata:
name: conf001
namespace: dev
annotations:
reflector.v1.k8s.emberstack.com/reflection-allowed: "true"
reflector.v1.k8s.emberstack.com/reflection-allowed-namespaces: "qa1,qa2" # empty or * will allow all namespaces
reflector.v1.k8s.emberstack.com/reflection-auto-enabled: "true"
data:
app.log.level: "INFO"
app.cache.enabled: "true"
app.timeout.seconds: "30"
🔗 Checkout https://github.com/emberstack/kubernetes-reflector for more information.
Now let’s write the tests
public sealed class ConfigMapMirrorTests(CustomWebApplicationFactory factory)
: BaseIntegrationTest(factory)
{
[Fact]
public async Task Create_configMap_With_ReflectionEnabled_Should_Replicated_To_Allowed_Namespaces()
{
// Arrange - Creating the source namesapce
const string sourceNamespace = "dev";
const string destinationNamespace = "qa-ns";
string sourceConfigMap = "conf001";
var configMapData = new Dictionary<string, string>
{
{ "app.log.level", "INFO" },
{ "app.cache.enabled", "true" },
{ "app.timeout.seconds", "30"}
};
var createdSourceNs = await CreateNamespaceAsync(sourceNamespace); // 👈 Resusable method
createdSourceNs.ShouldBeCreated(sourceNamespace); // 👈 Custom extension method
// Act - Create the destination namespace and the source configmap
var createdDestinationNs = await CreateNamespaceAsync(destinationNamespace);
createdDestinationNs.ShouldBeCreated(destinationNamespace);
var reflectorAnnotations = new ReflectorAnnotations() // 👈 Return a dictionnary of annotations
.WithReflectionAllowed(true)
.WithAllowedNamespaces(destinationNamespace)
.WithAutoEnabled(true);
var createdConfigMap = await CreateConfigMapAsync(
sourceConfigMap,
configMapData,
sourceNamespace,
reflectorAnnotations);
createdConfigMap.ShouldBeCreated(sourceConfigMap);
// Assert
// 👇 Custom extension method that uses K8Client for the assertion
await K8SClient.ShouldFindReplicatedResourceAsync(createdConfigMap, destinationNamespace, Pipeline);
}
}
The test follows these steps:
Here are the code snippets for the remaining methods used in the test.
// protected methods in BaseIntegrationTest
protected async Task<V1Namespace?> CreateNamespaceAsync(string name)
{
var ns = new V1Namespace
{
ApiVersion = V1Namespace.KubeApiVersion,
Kind = V1Namespace.KubeKind,
Metadata = new V1ObjectMeta
{
Name = name
}
};
return await K8SClient.CoreV1.CreateNamespaceAsync(ns);
}
protected async Task<V1ConfigMap?> CreateConfigMapAsync(
string configMapName,
IDictionary<string, string> data,
string destinationNamespace,
ReflectorAnnotations reflectionAnnotations)
{
var configMap = new V1ConfigMap
{
ApiVersion = V1ConfigMap.KubeApiVersion,
Kind = V1ConfigMap.KubeKind,
Metadata = new V1ObjectMeta
{
Name = configMapName,
NamespaceProperty = destinationNamespace,
Annotations = reflectionAnnotations.Build()
},
Data = data
};
return await K8SClient.CoreV1.CreateNamespacedConfigMapAsync(configMap, destinationNamespace);
}
And finally the assertion extension class
public static class K8SResourceAssertionExtensions
{
public static void ShouldBeCreated<T>([NotNull] this T? resource, string metadataName)
where T : class, IKubernetesObject<V1ObjectMeta>
{
resource.ShouldNotBeNull();
resource.Metadata.ShouldNotBeNull();
resource.Metadata.Name.ShouldBe(metadataName);
}
public static void ShouldBeDeleted<T>([NotNull] this T resource, V1Status deletionStatus)
where T : class, IKubernetesObject<V1ObjectMeta>
{
resource.ShouldNotBeNull();
deletionStatus.ShouldNotBeNull();
deletionStatus.Status.ShouldBe("Success");
}
// Finds a resource in the specified namespace and compares it with another resource
public static async Task ShouldFindReplicatedResourceAsync<T>(this IKubernetes client,
T resource,
string namespaceName,
ResiliencePipeline<bool> pipeline,
CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
var result = await pipeline.ExecuteAsync(async token =>
{
IKubernetesObject<V1ObjectMeta>? retrievedResource = resource switch
{
V1ConfigMap => await client.CoreV1.ReadNamespacedConfigMapAsync(
resource.Metadata.Name,
namespaceName,
cancellationToken: token),
V1Secret => await client.CoreV1.ReadNamespacedSecretAsync(
resource.Metadata.Name,
namespaceName,
cancellationToken: token),
_ => throw new NotSupportedException($"Resource type {typeof(T).Name} is not supported")
};
if (retrievedResource is null)
return false;
return (resource, retrievedResource) switch
{
(V1ConfigMap sourceConfigMap, V1ConfigMap replicatedConfigMap) =>
sourceConfigMap.Data.IsEqualTo(replicatedConfigMap.Data),
(V1Secret sourceSecret, V1Secret replicatedSecret) =>
sourceSecret.Data.IsEqualTo(replicatedSecret.Data),
_ => false
};
}, cancellationToken);
result.ShouldBeTrue();
}
private static bool IsEqualTo<TKey, TValue>(this IDictionary<TKey, TValue>? dict1, IDictionary<TKey, TValue>? dict2)
where TKey : notnull
{
if (dict1 == null && dict2 == null)
return true;
if (dict1 == null || dict2 == null)
return false;
if (dict1.Count != dict2.Count)
return false;
return dict1.All(kvp => dict2.TryGetValue(kvp.Key, out var value) &&
AreValuesEqual(kvp.Value, value));
}
private static bool AreValuesEqual<TValue>(TValue value1, TValue value2)
{
if (value1 is byte[] bytes1 && value2 is byte[] bytes2)
return bytes1.SequenceEqual(bytes2);
return EqualityComparer<TValue>.Default.Equals(value1, value2);
}
}
You can find the complete source code in the Pull Request I submitted to the kubernetes-reflector
repository here.
If you found this blog post helpful, please share it on your favorite social media. Feel free to follow me on GitHub and Twitter. To get in touch, you can use the contact form or DM me on Twitter.
Quick Links