zoobzio December 31, 2023 Edit this page

Common Patterns

Production-ready patterns for time-dependent operations.

Retry with Exponential Backoff

func RetryWithBackoff(clock clockz.Clock, operation func() error) error {
    backoff := 100 * time.Millisecond
    maxBackoff := 30 * time.Second

    for attempt := 0; attempt < 5; attempt++ {
        if err := operation(); err == nil {
            return nil
        }

        if attempt < 4 { // Don't sleep after last attempt
            clock.Sleep(backoff)
            backoff *= 2
            if backoff > maxBackoff {
                backoff = maxBackoff
            }
        }
    }

    return errors.New("operation failed after 5 attempts")
}

Testing:

func TestRetryWithBackoff(t *testing.T) {
    clock := clockz.NewFakeClockAt(time.Now())

    attempts := 0
    operation := func() error {
        attempts++
        if attempts < 3 {
            return errors.New("temporary failure")
        }
        return nil
    }

    done := make(chan error)
    go func() {
        done <- RetryWithBackoff(clock, operation)
    }()

    // Advance through backoff delays: 100ms, 200ms
    clock.Advance(100 * time.Millisecond)
    clock.Advance(200 * time.Millisecond)

    if err := <-done; err != nil {
        t.Fatalf("unexpected error: %v", err)
    }

    if attempts != 3 {
        t.Errorf("expected 3 attempts, got %d", attempts)
    }
}

Rate Limiter

Token bucket rate limiter with clock injection:

type RateLimiter struct {
    clock  clockz.Clock
    ticker clockz.Ticker
    tokens chan struct{}
}

func NewRateLimiter(clock clockz.Clock, rps int) *RateLimiter {
    rl := &RateLimiter{
        clock:  clock,
        ticker: clock.NewTicker(time.Second / time.Duration(rps)),
        tokens: make(chan struct{}, rps),
    }

    // Fill initial tokens
    for i := 0; i < rps; i++ {
        rl.tokens <- struct{}{}
    }

    // Refill tokens periodically
    go func() {
        for range rl.ticker.C() {
            select {
            case rl.tokens <- struct{}{}:
            default: // Bucket full
            }
        }
    }()

    return rl
}

func (rl *RateLimiter) Wait() {
    <-rl.tokens
}

func (rl *RateLimiter) Stop() {
    rl.ticker.Stop()
}

Testing:

func TestRateLimiter(t *testing.T) {
    clock := clockz.NewFakeClockAt(time.Now())
    limiter := NewRateLimiter(clock, 10) // 10 requests per second
    defer limiter.Stop()

    // Consume all initial tokens
    for i := 0; i < 10; i++ {
        limiter.Wait()
    }

    // Next request should block until token refills
    done := make(chan bool)
    go func() {
        limiter.Wait()
        done <- true
    }()

    // Advance to refill one token
    clock.Advance(100 * time.Millisecond)

    select {
    case <-done:
        // Success
    case <-time.After(100 * time.Millisecond):
        t.Fatal("Wait should have completed after token refill")
    }
}

Deadline Management

Process items with a deadline, abandoning work if time runs out:

func ProcessBatch(clock clockz.Clock, items []Item, deadline time.Time) error {
    for i, item := range items {
        if clock.Now().After(deadline) {
            return fmt.Errorf("deadline exceeded, processed %d/%d items", i, len(items))
        }

        remaining := deadline.Sub(clock.Now())
        ctx, cancel := clock.WithTimeout(context.Background(), remaining)

        err := processItem(ctx, item)
        cancel()

        if err != nil {
            return fmt.Errorf("failed to process item %d: %w", i, err)
        }
    }

    return nil
}

Scheduled Tasks

Run a function at a specific time:

type Scheduler struct {
    clock clockz.Clock
}

func (s *Scheduler) RunAt(target time.Time, fn func()) {
    duration := target.Sub(s.clock.Now())
    if duration <= 0 {
        fn()
        return
    }

    timer := s.clock.NewTimer(duration)
    <-timer.C()
    fn()
}

func (s *Scheduler) RunEvery(interval time.Duration, fn func()) (stop func()) {
    ticker := s.clock.NewTicker(interval)

    go func() {
        for range ticker.C() {
            fn()
        }
    }()

    return ticker.Stop
}

Measuring Elapsed Time

func MeasureOperation(clock clockz.Clock, operation func()) time.Duration {
    start := clock.Now()
    operation()
    return clock.Since(start)
}

Migration from time Package

time.After → clock.After

// Before
select {
case <-time.After(5 * time.Second):
    handleTimeout()
}

// After
select {
case <-clock.After(5 * time.Second):
    handleTimeout()
}

time.NewTimer → clock.NewTimer

// Before
timer := time.NewTimer(duration)
<-timer.C

// After
timer := clock.NewTimer(duration)
<-timer.C() // Note: C is a method, not a field

time.NewTicker → clock.NewTicker

// Before
ticker := time.NewTicker(interval)
defer ticker.Stop()
<-ticker.C

// After
ticker := clock.NewTicker(interval)
defer ticker.Stop()
<-ticker.C() // Note: C is a method, not a field

context.WithTimeout → clock.WithTimeout

// Before
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)

// After
ctx, cancel := clock.WithTimeout(ctx, 30*time.Second)

time.Sleep → clock.Sleep

// Before
time.Sleep(duration)

// After
clock.Sleep(duration)

time.Now → clock.Now

// Before
now := time.Now()

// After
now := clock.Now()

time.Since → clock.Since

// Before
elapsed := time.Since(start)

// After
elapsed := clock.Since(start)