Back to Blog
Engineering June 15, 2024 13 min read

Building Custom Kubernetes Operators

How we built operators to manage auction-based node provisioning and automatic bid optimization.

TH
Tom Huang
Staff Engineer, Platform

Kubernetes operators extend the platform by encoding domain-specific knowledge into custom controllers. At KubeBid, we've built several operators to manage our unique auction-based infrastructure. This post walks through how we built our node provisioning operator.

What is an Operator?

An operator is a pattern for extending Kubernetes. It consists of:

  1. Custom Resource Definition (CRD): A new API type that represents your domain concept
  2. Controller: A control loop that watches for changes and reconciles actual state to desired state

The operator pattern lets you manage complex systems declaratively—define what you want, and the operator figures out how to make it happen.

The BidNode Operator

Our primary operator manages "BidNodes"—compute capacity acquired through our auction system. Users declare what they want:

apiVersion: kubebid.io/v1
kind: BidNode
metadata:
  name: ml-training-node
  namespace: team-ml
spec:
  instanceType: a100-4x
  region: us-west-2
  maxBid: 4.50
  minNodes: 1
  maxNodes: 10
  bidStrategy: cost-optimized
  nodeSelector:
    workload: ml-training
  taints:
    - key: dedicated
      value: ml-training
      effect: NoSchedule

The operator handles bidding, node provisioning, health monitoring, and automatic replacement.

Designing the CRD

A good CRD captures the user's intent without exposing implementation details. Here's our full spec:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: bidnodes.kubebid.io
spec:
  group: kubebid.io
  versions:
  - name: v1
    served: true
    storage: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            required:
              - instanceType
              - maxBid
            properties:
              instanceType:
                type: string
                enum: [h100-8x, h100-4x, a100-8x, a100-4x, l40s-4x]
              region:
                type: string
              maxBid:
                type: number
                minimum: 0.01
              minNodes:
                type: integer
                minimum: 0
                default: 1
              maxNodes:
                type: integer
                minimum: 1
                default: 10
              bidStrategy:
                type: string
                enum: [cost-optimized, balanced, availability-optimized]
                default: balanced
              nodeSelector:
                type: object
                additionalProperties:
                  type: string
              taints:
                type: array
                items:
                  type: object
                  properties:
                    key:
                      type: string
                    value:
                      type: string
                    effect:
                      type: string
                      enum: [NoSchedule, PreferNoSchedule, NoExecute]
          status:
            type: object
            properties:
              phase:
                type: string
              activeNodes:
                type: integer
              pendingBids:
                type: integer
              averagePrice:
                type: number
              lastBidTime:
                type: string
                format: date-time
              conditions:
                type: array
                items:
                  type: object
                  properties:
                    type:
                      type: string
                    status:
                      type: string
                    reason:
                      type: string
                    message:
                      type: string
                    lastTransitionTime:
                      type: string
                      format: date-time
    subresources:
      status: {}
    additionalPrinterColumns:
    - name: Instance
      type: string
      jsonPath: .spec.instanceType
    - name: Nodes
      type: integer
      jsonPath: .status.activeNodes
    - name: Price
      type: number
      jsonPath: .status.averagePrice
    - name: Phase
      type: string
      jsonPath: .status.phase
  scope: Namespaced
  names:
    plural: bidnodes
    singular: bidnode
    kind: BidNode
    shortNames:
    - bn

The Controller

We built the controller using kubebuilder, the standard framework for writing operators. The core reconciliation loop:

package controllers

import (
    "context"
    "time"

    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"

    kubebidv1 "kubebid.io/api/v1"
    "kubebid.io/pkg/auction"
    "kubebid.io/pkg/provisioner"
)

type BidNodeReconciler struct {
    client.Client
    Scheme      *runtime.Scheme
    Auction     *auction.Client
    Provisioner *provisioner.Client
}

func (r *BidNodeReconciler) Reconcile(ctx context.Context,
    req ctrl.Request) (ctrl.Result, error) {

    log := ctrl.LoggerFrom(ctx)

    // Fetch the BidNode
    var bidNode kubebidv1.BidNode
    if err := r.Get(ctx, req.NamespacedName, &bidNode); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    // Calculate desired state
    desired := r.calculateDesiredNodes(&bidNode)
    actual := r.countActiveNodes(ctx, &bidNode)

    log.Info("Reconciling BidNode",
        "desired", desired,
        "actual", actual,
    )

    // Scale up if needed
    if actual < desired {
        if err := r.scaleUp(ctx, &bidNode, desired-actual); err != nil {
            return ctrl.Result{RequeueAfter: 30 * time.Second}, err
        }
    }

    // Scale down if needed
    if actual > desired {
        if err := r.scaleDown(ctx, &bidNode, actual-desired); err != nil {
            return ctrl.Result{RequeueAfter: 30 * time.Second}, err
        }
    }

    // Update status
    if err := r.updateStatus(ctx, &bidNode); err != nil {
        return ctrl.Result{}, err
    }

    // Requeue to handle price changes
    return ctrl.Result{RequeueAfter: 60 * time.Second}, nil
}

func (r *BidNodeReconciler) scaleUp(ctx context.Context,
    bn *kubebidv1.BidNode, count int) error {

    log := ctrl.LoggerFrom(ctx)

    // Determine bid price based on strategy
    bidPrice := r.calculateBidPrice(bn)

    // Place bid with auction engine
    bid, err := r.Auction.PlaceBid(ctx, &auction.BidRequest{
        InstanceType: bn.Spec.InstanceType,
        Region:       bn.Spec.Region,
        MaxPrice:     bidPrice,
        Quantity:     count,
        Labels: map[string]string{
            "kubebid.io/bidnode":   bn.Name,
            "kubebid.io/namespace": bn.Namespace,
        },
    })
    if err != nil {
        return err
    }

    log.Info("Placed bid", "bidID", bid.ID, "price", bidPrice)

    // Wait for match (async—we'll handle this in next reconcile)
    return nil
}

func (r *BidNodeReconciler) calculateBidPrice(
    bn *kubebidv1.BidNode) float64 {

    // Get current market price
    marketPrice := r.Auction.GetMarketPrice(
        bn.Spec.InstanceType,
        bn.Spec.Region,
    )

    switch bn.Spec.BidStrategy {
    case "cost-optimized":
        return min(marketPrice*0.65, bn.Spec.MaxBid)
    case "availability-optimized":
        return min(marketPrice*1.2, bn.Spec.MaxBid)
    default: // balanced
        return min(marketPrice*0.9, bn.Spec.MaxBid)
    }
}

func (r *BidNodeReconciler) SetupWithManager(
    mgr ctrl.Manager) error {

    return ctrl.NewControllerManagedBy(mgr).
        For(&kubebidv1.BidNode{}).
        Complete(r)
}

Handling Async Operations

Node provisioning is inherently async—placing a bid doesn't immediately give you a node. We handle this with status conditions:

func (r *BidNodeReconciler) updateStatus(ctx context.Context,
    bn *kubebidv1.BidNode) error {

    // Get pending bids
    pendingBids, err := r.Auction.GetPendingBids(ctx, bn.Name)
    if err != nil {
        return err
    }

    // Get active nodes
    activeNodes := r.countActiveNodes(ctx, bn)

    // Calculate average price
    avgPrice := r.calculateAveragePrice(ctx, bn)

    // Determine phase
    var phase string
    switch {
    case activeNodes >= bn.Spec.MinNodes:
        phase = "Running"
    case len(pendingBids) > 0:
        phase = "Bidding"
    default:
        phase = "Pending"
    }

    // Update status
    bn.Status.Phase = phase
    bn.Status.ActiveNodes = activeNodes
    bn.Status.PendingBids = len(pendingBids)
    bn.Status.AveragePrice = avgPrice
    bn.Status.LastBidTime = &metav1.Time{Time: time.Now()}

    // Set conditions
    bn.Status.Conditions = []kubebidv1.BidNodeCondition{
        {
            Type:               "Ready",
            Status:             conditionStatus(activeNodes >= bn.Spec.MinNodes),
            Reason:             "NodesAvailable",
            Message:            fmt.Sprintf("%d/%d nodes active", activeNodes, bn.Spec.MinNodes),
            LastTransitionTime: metav1.Now(),
        },
    }

    return r.Status().Update(ctx, bn)
}

Pre-emption Handling

Auction-provisioned nodes can be pre-empted when prices rise. The operator needs to handle this gracefully:

func (r *BidNodeReconciler) handlePreemption(ctx context.Context,
    bn *kubebidv1.BidNode, preemptedNode string) error {

    log := ctrl.LoggerFrom(ctx)
    log.Info("Handling pre-emption", "node", preemptedNode)

    // Cordon the node to prevent new pods
    if err := r.cordonNode(ctx, preemptedNode); err != nil {
        return err
    }

    // Emit event for visibility
    r.Recorder.Event(bn, "Warning", "NodePreempted",
        fmt.Sprintf("Node %s pre-empted due to price increase", preemptedNode))

    // Trigger replacement bid
    return r.scaleUp(ctx, bn, 1)
}

Testing the Operator

We use envtest for integration tests:

var _ = Describe("BidNode Controller", func() {
    Context("When creating a BidNode", func() {
        It("Should place bids for minimum nodes", func() {
            bn := &kubebidv1.BidNode{
                ObjectMeta: metav1.ObjectMeta{
                    Name:      "test-bidnode",
                    Namespace: "default",
                },
                Spec: kubebidv1.BidNodeSpec{
                    InstanceType: "a100-4x",
                    Region:       "us-west-2",
                    MaxBid:       5.00,
                    MinNodes:     2,
                    MaxNodes:     5,
                },
            }

            Expect(k8sClient.Create(ctx, bn)).Should(Succeed())

            // Wait for reconciliation
            Eventually(func() int {
                var updated kubebidv1.BidNode
                k8sClient.Get(ctx, client.ObjectKeyFromObject(bn), &updated)
                return updated.Status.PendingBids
            }, timeout, interval).Should(BeNumerically(">=", 2))
        })
    })
})

Production Lessons

After running this operator in production for over a year, here's what we've learned:

  1. Idempotency is critical. Your reconcile function might run multiple times for the same event. Every operation must be safe to repeat.
  2. Status updates should be cheap. We reconcile every 60 seconds. Status updates that require external API calls add latency and can cause rate limiting.
  3. Use finalizers for cleanup. If your CRD creates external resources (like nodes), add a finalizer to ensure they're cleaned up on deletion.
  4. Observability from day one. Add metrics for reconcile duration, error rates, and resource counts. You'll need them.
// Metrics we track
var (
    reconcileTotal = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "bidnode_reconcile_total",
            Help: "Total number of reconciles",
        },
        []string{"result"},
    )

    reconcileDuration = prometheus.NewHistogram(
        prometheus.HistogramOpts{
            Name:    "bidnode_reconcile_duration_seconds",
            Help:    "Time spent in reconcile",
            Buckets: prometheus.ExponentialBuckets(0.01, 2, 10),
        },
    )

    activeNodes = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "bidnode_active_nodes",
            Help: "Number of active nodes per BidNode",
        },
        []string{"name", "namespace", "instance_type"},
    )
)

Getting Started

If you want to build your own operator:

  1. Install kubebuilder: brew install kubebuilder
  2. Initialize project: kubebuilder init --domain your.domain
  3. Create API: kubebuilder create api --group apps --version v1 --kind YourResource
  4. Implement controller logic in controllers/
  5. Write tests and deploy

The kubebuilder book is an excellent resource. And if you want to work on operators at scale, we're hiring.

Tom Huang is a Staff Engineer on the Platform team at KubeBid, where he builds the operators and controllers that power our infrastructure.

Related Posts