Controller Patterns¶
This guide explains how to implement controllers in AIM Engine using our standardized reconciliation pattern.
Quick Start¶
TL;DR: Implement these methods and you're done:
// 1. Fetch resources and implement GetComponentHealth
func (r *Reconciler) FetchRemoteState(
ctx context.Context,
c client.Client,
reconcileCtx controllerutils.ReconcileContext[*aimv1.MyResource],
) MyFetchResult {
pvc := &corev1.PersistentVolumeClaim{}
return MyFetchResult{
object: reconcileCtx.Object,
// The runtime config is fetched by the pipeline automatically
mergedRuntimeConfig: reconcileCtx.MergedRuntimeConfig,
// Use the utility Fetch function to fetch a Kubernetes resource, automatically wrapping any errors
pvc: controllerutils.Fetch(
ctx, c,
client.ObjectKey{Name: getPvcName(reconcileCtx.Object), Namespace: reconcileCtx.Object.Namespace},
pvc,
),
}
}
// Implement GetComponentHealth directly on the fetch result.
// For standard components you can use an existing health fetcher, or create a new one yourself
func (result MyFetchResult) GetComponentHealth(ctx context.Context, clientset kubernetes.Interface) []controllerutils.ComponentHealth {
return []controllerutils.ComponentHealth{
result.mergedRuntimeConfig.ToComponentHealth("RuntimeConfig", aimruntimeconfig.GetRuntimeConfigHealth),
result.pvc.ToComponentHealth("Storage", controllerutils.GetPvcHealth),
}
}
// 2. ComposeState is a thin passthrough (might be removed in the future)
type MyObservation struct {
MyFetchResult
}
func (r *Reconciler) ComposeState(
ctx context.Context,
reconcileCtx controllerutils.ReconcileContext[*aimv1.MyResource],
fetch MyFetchResult,
) MyObservation {
return MyObservation{MyFetchResult: fetch}
}
// 3. Plan desired state
func (r *Reconciler) PlanResources(
ctx context.Context,
reconcileCtx controllerutils.ReconcileContext[*aimv1.MyResource],
obs MyObservation,
) controllerutils.PlanResult {
result := controllerutils.PlanResult{}
// Add a resource to the list of resource to apply
result.Apply(buildInferenceService(reconcileCtx.Object))
return result
}
That's it. The state engine handles the rest automatically.
Note on ComposeState/Observation: ComposeState currently just wraps the fetch result in an observation struct. This is a thin passthrough that keeps the door open for more complex observation logic in the future, but it may be removed if this structure proves sufficient. The real work happens in FetchRemoteState and GetComponentHealth.
What You Get Automatically¶
When you implement GetComponentHealth(), the state engine automatically:
- ✅ Creates component conditions (ModelReady, TemplateReady, etc.)
- ✅ Creates parent conditions when needed (AuthValid, ConfigValid, DependenciesReachable) based on any errors encountered
- ✅ Sets Ready condition and status field
- ✅ Categorizes errors and decides requeue behavior
- ✅ Applies 10-second grace period for transient errors
- ✅ Emits events and logs when conditions change (and recurring ones for errors)
Three Approaches¶
Choose based on how much control you need:
1. Fully Automatic (Recommended)¶
Just implement GetComponentHealth().
Use when: Standard component tracking is all you need.
2. Automatic + Decorator¶
Implement GetComponentHealth() + DecorateStatus() to add custom fields.
Use when: You need automatic state management but want to add domain-specific status fields like ResolvedTemplate.
func (r *Reconciler) DecorateStatus(status *MyStatus, cm *ConditionManager, obs MyObservation) {
// State engine already set ModelReady, Ready, status.Status
// Just add your custom fields
if obs.template != nil {
status.ResolvedTemplate = &ResolvedReference{Name: obs.template.Name}
}
}
3. Fully Manual¶
Implement SetStatus() to control everything yourself.
Use when: Your status logic doesn't fit the component health model.
func (r *Reconciler) SetStatus(status *MyStatus, cm *ConditionManager, obs MyObservation) {
// You set ALL conditions and status.Status yourself
h := controllerutils.NewStatusHelper(status, cm)
h.Ready("AllGood", "Everything is working")
cm.MarkTrue("CustomCondition", "Reason", "Message", controllerutils.AsInfo())
}
The Four Phases¶
Every reconciliation follows this pattern:
1. FetchRemoteState¶
Fetch resources from Kubernetes. Use FetchResult[T] wrapper:
type MyFetch struct {
Model controllerutils.FetchResult[*aimv1.AIMModel]
Template controllerutils.FetchResult[*aimv1.AIMServiceTemplate]
Pods controllerutils.FetchResult[*corev1.PodList]
}
func (r *Reconciler) FetchRemoteState(
ctx context.Context,
c client.Client,
reconcileCtx controllerutils.ReconcileContext[*aimv1.MyResource],
) MyFetch {
return MyFetch{
Model: controllerutils.Fetch(ctx, c, modelKey, &aimv1.AIMModel{}),
Template: controllerutils.Fetch(ctx, c, templateKey, &aimv1.AIMServiceTemplate{}),
Pods: controllerutils.FetchList(ctx, c, &corev1.PodList{}, client.InNamespace(reconcileCtx.Object.Namespace)),
}
}
Key points:
- Use FetchResult[T] wrapper for convenient error handling and component health conversion
- All remote API calls should happen here (no client calls in ComposeState or PlanResources)
- The reconcileCtx parameter provides access to the object and merged runtime config
- This separation enables easy mocking for testing
2. ComposeState¶
Current pattern: This is a thin passthrough that wraps the fetch result. This keeps the door open for more complex observation logic in the future, but may be removed if this structure proves sufficient.
type MyObservation struct {
MyFetchResult // Embed the fetch result
}
func (r *Reconciler) ComposeState(
ctx context.Context,
reconcileCtx controllerutils.ReconcileContext[*aimv1.MyResource],
fetch MyFetchResult,
) MyObservation {
return MyObservation{MyFetchResult: fetch}
}
The real work happens in GetComponentHealth, which you implement on the fetch result:
func (result MyFetchResult) GetComponentHealth(ctx context.Context, clientset kubernetes.Interface) []controllerutils.ComponentHealth {
health := []controllerutils.ComponentHealth{
result.mergedRuntimeConfig.ToComponentHealth("RuntimeConfig", aimruntimeconfig.GetRuntimeConfigHealth),
result.pvc.ToComponentHealth("Storage", controllerutils.GetPvcHealth),
}
// Conditional health checking based on status
if result.object.Status.Status != constants.AIMStatusReady {
if result.downloadJob != nil {
health = append(health, result.downloadJob.ToComponentHealth("DownloadJob", controllerutils.GetJobHealth))
}
if result.downloadJobPods != nil {
health = append(health, result.downloadJobPods.ToComponentHealthWithContext(ctx, clientset, "Pods", controllerutils.GetPodsHealth))
}
}
return health
}
Health Inspection Utilities:
The framework provides ready-to-use health inspectors for common Kubernetes resources:
// For Jobs
result.Job.ToComponentHealth("DownloadJob", controllerutils.GetJobHealth)
// For PVCs
result.PVC.ToComponentHealth("Storage", controllerutils.GetPvcHealth)
// For Pods (requires context and clientset for log inspection)
result.Pods.ToComponentHealthWithContext(ctx, clientset, "Workload", controllerutils.GetPodsHealth)
These utilities automatically categorize errors (auth failures, storage exhaustion, etc.) and set appropriate states.
Custom inspection logic:
result.Model.ToComponentHealth("Model", func(m *aimv1.AIMModel) controllerutils.ComponentHealth {
if m.Status.Status == constants.AIMStatusReady {
return controllerutils.ComponentHealth{
State: constants.AIMStatusReady,
Reason: "ModelReady",
Message: "Model is ready"
}
}
return controllerutils.ComponentHealth{
State: constants.AIMStatusProgressing,
Reason: "ModelNotReady",
Message: "Waiting for model"
}
})
3. PlanResources¶
Decide what to create/update/delete. This is a pure function - no client calls, just derive desired state based on observations.
func (r *Reconciler) PlanResources(
ctx context.Context,
reconcileCtx controllerutils.ReconcileContext[*aimv1.MyResource],
obs MyObservation,
) controllerutils.PlanResult {
obj := reconcileCtx.Object
runtimeConfig := reconcileCtx.MergedRuntimeConfig.Value
result := controllerutils.PlanResult{}
// Only create resources when dependencies are ready
if obs.Model.OK() && obs.Model.Value.Status.Status == constants.AIMStatusReady {
inferenceService := buildInferenceService(obj, runtimeConfig)
result.Apply(inferenceService) // Creates/updates with owner reference
}
// Conditional resource creation
if obj.Status.Status != constants.AIMStatusReady && obs.Job.IsNotFound() {
job := buildDownloadJob(obj, runtimeConfig)
result.Apply(job)
}
// Shared resources (no owner reference)
configMap := buildSharedConfig(obj)
result.ApplyWithoutOwnerRef(configMap)
// Cleanup
if shouldCleanup(obs) {
oldResource := getOldResource(obj)
result.Delete(oldResource)
}
return result
}
Key methods:
- result.Apply(obj) - Creates/updates with owner reference (garbage collected when owner deleted)
- result.ApplyWithoutOwnerRef(obj) - Creates/updates without owner reference (survives owner deletion)
- result.Delete(obj) - Deletes the resource
4. (Optional) DecorateStatus¶
Add custom status fields:
func (r *Reconciler) DecorateStatus(status *MyStatus, cm *ConditionManager, obs MyObservation) {
// Add domain-specific fields here
}
Context-Aware Health Inspection¶
For advanced health checks that need to inspect logs or additional resources, use the context-aware pattern:
// Implement GetComponentHealth with context and clientset parameters
func (fetch MyFetch) GetComponentHealth(ctx context.Context, clientset kubernetes.Interface) []controllerutils.ComponentHealth {
health := []controllerutils.ComponentHealth{
fetch.RuntimeConfig.ToComponentHealth("RuntimeConfig", getRuntimeConfigHealth),
fetch.PVC.ToComponentHealth("Storage", controllerutils.GetPvcHealth),
}
// Conditionally check job/pods based on status
if fetch.Object.Status.Status != constants.AIMStatusReady {
health = append(health,
fetch.Job.ToComponentHealth("DownloadJob", controllerutils.GetJobHealth),
fetch.Pods.ToComponentHealthWithContext(ctx, clientset, "Pods", controllerutils.GetPodsHealth),
)
}
return health
}
The pipeline will detect and call this signature automatically, passing context and clientset from the Pipeline configuration.
Pod Health Inspection: The GetPodsHealth utility inspects pod logs to categorize failures:
- Auth errors (S3 credentials, HuggingFace tokens)
- Storage exhaustion (disk full, quota exceeded)
- Resource not found (404, missing models)
- OOM kills and other termination reasons
Error Handling¶
The state engine automatically categorizes errors:
| Error Type | Behavior | Sets Condition | Example |
|---|---|---|---|
| Infrastructure | Requeue with backoff, 10s grace period | DependenciesReachable=False |
Network timeout, API server down |
| Auth | Stop apply, fail resource | AuthValid=False |
Forbidden (403), invalid credentials |
| InvalidSpec | Stop apply, fail resource | ConfigValid=False |
Invalid configuration, conflicts |
| MissingUpstreamDependency | Stop apply, fail resource | ConfigValid=False |
User-referenced config/secret not found |
| MissingDownstreamDependency | Mark Progressing, continue | Component condition | Internal resource not ready yet |
| ResourceExhaustion | Stop apply, fail resource | Component condition | OOM, disk full, quota exceeded |
Just return errors in ComponentHealth.Errors - categorization is automatic.
Distinction: - MissingDownstreamDependency: Resources the controller is creating (pods starting, jobs running) - transient, expected to self-heal - MissingUpstreamDependency: User-referenced resources (configs, secrets) - requires user intervention
Observability¶
If you need to report custom conditions in the status update methods, you can use the ConditionManager helper.
Control event and log emission per condition:
// Info log + Normal event on transition (default)
cm.MarkTrue(condType, reason, msg)
// Silent
cm.MarkTrue(condType, reason, msg, controllerutils.Silent())
// Error log + Warning event on transition
cm.MarkFalse(condType, reason, msg, controllerutils.AsWarning())
// Error log + Warning event EVERY reconcile (for critical errors)
cm.MarkFalse(condType, reason, msg, controllerutils.AsError())
Quick Reference¶
Interfaces to implement:
// Required - always implement
type DomainReconciler[T, S, F, Obs] interface {
FetchRemoteState(ctx, client, obj) F
ComposeState(ctx, obj, fetched) Obs
PlanResources(ctx, obj, obs) PlanResult
}
// Automatic mode - implement in observation type
type ComponentHealthProvider interface {
GetComponentHealth() []ComponentHealth
}
// Optional - extend automatic status
type StatusDecorator[T, S, Obs] interface {
DecorateStatus(status, cm, obs)
}
// Manual mode - full control
type ManualStatusController[T, S, Obs] interface {
SetStatus(status, cm, obs)
}
When to use what:
| Need | Use |
|---|---|
| Standard component tracking | Approach 1: GetComponentHealth() |
| Custom status fields | Approach 2: GetComponentHealth() + DecorateStatus() |
| Full control | Approach 3: SetStatus() |
Resolved Reference Pattern¶
When a controller depends on upstream resources (e.g., AIMService → AIMModel → AIMServiceTemplate → AIMTemplateCache), use the resolved reference pattern to optimize fetching while maintaining health visibility.
Core Principles¶
-
Only "lock in" references when Ready: Store resolved references in status only when the upstream resource is in
Readystate. This prevents committing to a resource that may never become usable. -
Re-search if not Ready or deleted: If a resolved reference exists but the resource is not Ready (or was deleted), fall through to normal resolution logic to find a better alternative.
-
Always return something for health visibility: Even if no Ready resource is found, return the best available match so component health can reflect the current state.
-
Gate fetching on downstream existence: For resources only needed during initial creation (e.g., Model/Template for InferenceService), skip fetching if the downstream resource already exists.
Implementation Pattern¶
// In fetch function:
func fetchUpstreamResource(ctx context.Context, c client.Client, service *MyService) FetchResult[*UpstreamType] {
logger := log.FromContext(ctx)
// 1. Check if we have a resolved reference AND it's Ready
if service.Status.ResolvedUpstream != nil {
ref := service.Status.ResolvedUpstream
result := controllerutils.Fetch(ctx, c, ref.NamespacedName(), &UpstreamType{})
if result.OK() && result.Value.Status.Status == constants.AIMStatusReady {
logger.V(1).Info("using resolved upstream", "name", ref.Name)
return result
}
// Not Ready or deleted - log and fall through to search
if result.OK() {
logger.V(1).Info("resolved upstream not ready, searching for alternatives",
"name", ref.Name, "status", result.Value.Status.Status)
} else if result.IsNotFound() {
logger.V(1).Info("resolved upstream deleted, searching for alternatives", "name", ref.Name)
} else {
return result // Real error
}
}
// 2. Normal resolution logic (search, list, select best, etc.)
// ...
// 3. Return best match for health visibility (even if not Ready)
return bestMatch
}
// In DecorateStatus:
func (r *Reconciler) DecorateStatus(status *MyStatus, cm *ConditionManager, obs MyObservation) {
// Only set resolved reference if Ready
if obs.upstream.Value != nil && obs.upstream.Value.Status.Status == constants.AIMStatusReady {
status.ResolvedUpstream = &AIMResolvedReference{
Name: obs.upstream.Value.Name,
UID: obs.upstream.Value.UID,
// ...
}
}
// If not Ready, don't update the reference - let fetch re-search next time
}
When to Apply This Pattern¶
| Scenario | Behavior |
|---|---|
| Multiple caches exist for same template | Select the healthiest (Ready) one |
| Resolved cache becomes Failed | Re-search for a Ready alternative |
| Resolved template deleted | Fall through to re-select |
| ISVC exists, template deleted | Don't fetch template (config baked into ISVC) |
| ISVC deleted, need to recreate | Fetch and resolve dependencies again |
Gating Fetch on Downstream Existence¶
For upstream resources only needed during creation:
func (r *Reconciler) FetchRemoteState(ctx, c, reconcileCtx) FetchResult {
result := FetchResult{...}
// Always fetch downstream resources (we own these)
result.inferenceService = fetchInferenceService(ctx, c, service)
result.httpRoute = fetchHTTPRoute(ctx, c, service)
// Always fetch resources that cascade health (e.g., cache chain)
result.templateCache = fetchTemplateCache(ctx, c, service)
// Only fetch upstream if downstream needs (re)creation
if !result.inferenceService.OK() {
result.model = fetchModel(ctx, c, service)
result.template = fetchTemplate(ctx, c, service, result.model)
}
return result
}
This optimization: - Avoids false errors when upstream resources are deleted after successful creation - Reduces API calls for running services - Still catches cache health issues proactively (TemplateCache → Artifact → PVC chain)