Contents

Real-Time Video Analysis and Edge Processing with Go

Contents

Real-Time Video Analysis and Edge Processing with Go

Summary

  • Edge Processing: Processing data locally without sending it to the center
  • Go + Video Pipeline: High-performance video processing with goroutine and channel structure
  • Production Ready: Motion detection, object detection, event publishing, monitoring
  • Cost Savings: 95%+ savings compared to cloud processing

Note: This article shares the core components of a video analysis system used in production. Code examples and architectural decisions are based on real project experiences.

1. Introduction: Video Data and the Need for Edge Processing

Today, video constitutes the largest portion of generated data. CCTV systems, live streaming infrastructure, smart cities, industrial cameras, and IoT devices make real-time video analysis an inevitable need.

At this point, two fundamental problems arise:

  1. Latency - Sending data to the center takes time
  2. Bandwidth and cost - Raw video streams are quite expensive

One of the most effective ways to solve these problems is the Edge Processing approach.

Project Story

The need to write this article arose from my experience of rewriting a video analysis system I had previously developed in C language with Go (Golang). The system written in C was delivering excellent results in terms of performance and was successfully running in production. However, during the maintenance and development processes of the system, I realized how valuable some of the advantages that Go provides are for such a project.

During the rewriting process with Go, I was able to achieve performance very close to what I had in C, with a practically acceptable difference. In addition, I discovered how suitable the goroutine and channel structure is for video pipelines. Go’s concurrency model offers a cleaner and safer developer experience compared to C’s thread management and memory management approaches. Also, Go’s cross-platform compilation capability made it very easy for the system to run on different edge devices (Raspberry Pi, x86, ARM). While Go’s GC and runtime do introduce some overhead in theory, the gains in development speed and maintainability more than compensated for this in practice.

As a result of this experience, I saw that Go-based edge video processing systems are a strong alternative both in terms of performance and developer experience. C’s performance advantages are undeniable, but the modern language features and developer-friendly approach that Go offers can be an important reason for preference in such systems. This article has been prepared to guide developers who want to develop similar systems based on these experiences.

2. What is Edge Processing?

Edge processing is the processing of data without sending it to a central server, near the point where it is generated (camera, gateway, edge node).

2.1 Advantages

  • Low latency - Results are obtained in milliseconds since data is processed locally
  • Less bandwidth usage - Only analysis results (events) are sent, raw video is not sent
  • Better data privacy - Sensitive images are not sent to the center
  • Scalable architecture - Each edge node works independently, no single point of failure in the central system
  • Cost effectiveness - Cloud compute costs decrease

2.2 Use Cases

  • Smart security systems - Anomaly detection, face recognition
  • Industrial quality control - Fast decision making on production lines
  • Traffic management - Vehicle counting, traffic analysis
  • Retail analytics - Customer behavior analysis, heat map creation

3. Why Go (Golang)?

Go is a very strong candidate for edge and real-time systems:

  • High performance - Native binary, low GC overhead, C/C++ level speed
  • Concurrency - Concurrent video pipelines are easily managed with goroutine & channel structure
  • Low memory usage - Ideal for edge devices’ limited resources
  • Cross-platform compilation - Single source code for Linux ARM (Raspberry Pi), x86, Windows
  • Easy containerization - Compatible with Docker, k3s, Kubernetes Edge (K3s)
  • Static binary - All dependencies in binary, easy distribution
  • Rich standard library - net, context, sync packages are perfect for video processing

4. General Architecture

5. Quick Start Guide

Step-by-step guide to set up and run the system:

5.1 Requirements

1
2
3
4
5
6
7
8
9
# Go 1.21+ must be installed
go version

# FFmpeg must be installed
ffmpeg -version

# (Optional) For OpenCV and GoCV
# Linux: apt-get install libopencv-dev
# macOS: brew install opencv

5.2 Project Structure

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
video-processor/
โ”œโ”€โ”€ cmd/
โ”‚   โ””โ”€โ”€ main.go
โ”œโ”€โ”€ internal/
โ”‚   โ”œโ”€โ”€ pipeline/
โ”‚   โ”œโ”€โ”€ detector/
โ”‚   โ”œโ”€โ”€ source/
โ”‚   โ””โ”€โ”€ publisher/
โ”œโ”€โ”€ config/
โ”‚   โ””โ”€โ”€ config.yaml
โ”œโ”€โ”€ go.mod
โ””โ”€โ”€ Dockerfile

5.3 Basic Setup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Create the project
mkdir video-processor && cd video-processor
go mod init video-processor

# Add dependencies
go get github.com/eclipse/paho.mqtt.golang
go get gopkg.in/yaml.v3
go get github.com/sirupsen/logrus

# (Optional) GoCV
go get gocv.io/x/gocv

5.3.1 GoCV Installation Details and Cross-Platform Challenges

GoCV installation varies by platform:

Linux (Ubuntu/Debian)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Install OpenCV dependencies
sudo apt-get update
sudo apt-get install -y \
    libopencv-dev \
    pkg-config \
    libavcodec-dev \
    libavformat-dev \
    libavutil-dev \
    libswscale-dev

# Install GoCV
go get gocv.io/x/gocv

macOS

1
2
3
4
5
# Install OpenCV with Homebrew
brew install opencv pkg-config

# Install GoCV
go get gocv.io/x/gocv

Windows

1
2
3
4
5
6
7
8
9
# Install OpenCV with Chocolatey (or manually)
choco install opencv

# Set environment variables
set CGO_CPPFLAGS=-IC:\opencv\build\include
set CGO_LDFLAGS=-LC:\opencv\build\x64\vc15\lib

# Install GoCV
go get gocv.io/x/gocv

Portability with Docker

Managing OpenCV dependencies with Docker is the easiest method:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Build stage
FROM golang:1.23 AS builder

# Install OpenCV dependencies
RUN apt-get update && apt-get install -y \
    libopencv-dev \
    pkg-config \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=1 go build -o video-processor ./cmd/main.go

Common Issues and Solutions:

  1. CGO error: Cannot build without CGO_ENABLED=1
  2. pkg-config not found: Ensure the pkg-config package is installed
  3. OpenCV version incompatibility: GoCV works with specific OpenCV versions, check version compatibility
  4. Cross-compilation difficulty: Build natively for ARM or use Docker

Important Cross-Compilation Note (ARM / Edge Devices):

  • Because GoCV uses CGO, simply running GOOS=linux GOARCH=arm64 go build ... from an x86 machine to target Raspberry Pi / Jetson and other ARM devices will often not work out of the box.
  • During compilation you must target the correct architecture both for Go and for the native OpenCV libraries, which is non-trivial in practice.
  • Recommended approaches:
    • Prefer native builds on the ARM device itself (e.g. run go build directly on the Raspberry Pi).
    • Or use Docker buildx + QEMU to build multi-arch Docker images (building arm64 images from an amd64 host).
    • In production scenarios, defining separate build pipelines per architecture (amd64, armv7, arm64) is usually the most reliable solution.

5.4 First Test

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// cmd/main.go
package main

import (
    "context"
    "log"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // Test video source (from file)
    source := NewVideoSource("file://test.mp4", 640, 480)
    
    if err := source.Start(ctx); err != nil {
        log.Fatal(err)
    }
    
    // Read first 10 frames
    frameCount := 0
    for frame := range source.Frames() {
        log.Printf("Frame %d received: %dx%d", frameCount, frame.Width, frame.Height)
        frameCount++
        if frameCount >= 10 {
            break
        }
    }
    
    source.Stop()
    log.Println("Test completed!")
}

5.5 Production Readiness Checklist

Before moving to production, check the following checklist:

  • Error handling and retry mechanisms added
  • Logging and monitoring set up
  • Health check endpoints working
  • Configuration files ready
  • Docker image built and tested
  • Load test performed
  • Memory leak test performed
  • Graceful shutdown working
  • Alerting mechanism set up
  • Backup and recovery plan ready

6. Video Codec and Format Selection

Choosing the right codec and format for edge processing is critical. Advantages and disadvantages of different codecs:

6.1 Codec Comparison

Codec Compression Ratio CPU Usage Edge Suitability Recommended Usage
H.264 1.0x (baseline) Low โญโญโญโญโญ General use, most common
H.265/HEVC 1.5-2.0x Medium-High โญโญโญโญ High quality, limited bandwidth
AV1 2.0-2.5x Very High โญโญ Future use, high-performance devices
MJPEG 0.3-0.5x Very Low โญโญโญ Low latency, frame-by-frame processing

6.2 Format Selection Strategy

Recommended format for edge processing: RGB24 or YUV420

RGB24:

  • โœ… 3 bytes per pixel (R, G, B)
  • โœ… Suitable for direct image processing
  • โœ… Easy integration with OpenCV/GoCV
  • โŒ Larger memory usage

YUV420:

  • โœ… Smaller memory usage (50% less)
  • โœ… Native format for video codecs
  • โŒ May require conversion to RGB
1
2
3
4
5
6
// Format selection in FFmpeg
// For RGB24:
args := []string{"-pix_fmt", "rgb24"}

// For YUV420 (smaller):
args := []string{"-pix_fmt", "yuv420p"}

7. Getting Frames from Video Source

To get frames from a video source, we’ll run FFmpeg as a process within our Go application. FFmpeg can read video from RTSP, USB cameras, files, and many other sources.

7.1 Different Video Sources

In production environments, you may encounter different types of video sources:

RTSP Stream (IP Camera)

The most common use case. IP cameras typically provide video over the RTSP protocol.

USB Camera

For USB cameras, we can use FFmpeg’s video4linux2 (v4l2) support:

1
ffmpeg -f v4l2 -i /dev/video0 -f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1

Video File

Reading from video files for testing and development:

1
ffmpeg -i test_video.mp4 -f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1

HTTP/HTTPS Stream

Some modern cameras provide MJPEG or HLS streams over HTTP.

7.2 FFmpeg Command

1
2
ffmpeg -rtsp_transport tcp -i rtsp://username:password@camera-ip:554/stream \
       -f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1

This command:

  • -rtsp_transport tcp: Uses TCP for RTSP connection (reliable)
  • -f rawvideo: Raw video format output
  • -pix_fmt rgb24: RGB24 pixel format (3 bytes per pixel)
  • -s 640x480: Resolution setting (can be changed as needed)
  • pipe:1: Writes to stdout (we’ll read it in Go)

7.3 FFmpeg Integration with Go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package main

import (
    "bufio"
    "context"
    "fmt"
    "io"
    "log"
    "os/exec"
    "sync"
    "time"
)

type Frame struct {
    Data      []byte
    Width     int
    Height    int
    Timestamp time.Time
}

type VideoSource struct {
    sourceURL string
    width     int
    height    int
    cmd       *exec.Cmd
    stdout    io.ReadCloser
    frames    chan *Frame
    errors    chan error
    wg        sync.WaitGroup
}

func NewVideoSource(sourceURL string, width, height int) *VideoSource {
    return &VideoSource{
        sourceURL: sourceURL,
        width:     width,
        height:    height,
        frames:    make(chan *Frame, 10), // Buffer 10 frames
        errors:    make(chan error, 1),
    }
}

func (vs *VideoSource) Start(ctx context.Context) error {
    // Create FFmpeg command
    args := []string{
        "-rtsp_transport", "tcp",
        "-i", vs.sourceURL,
        "-f", "rawvideo",
        "-pix_fmt", "rgb24",
        "-s", fmt.Sprintf("%dx%d", vs.width, vs.height),
        "pipe:1",
    }
    
    vs.cmd = exec.CommandContext(ctx, "ffmpeg", args...)
    
    var err error
    vs.stdout, err = vs.cmd.StdoutPipe()
    if err != nil {
        return fmt.Errorf("stdout pipe: %w", err)
    }

    stderr, err := vs.cmd.StderrPipe()
    if err != nil {
        return fmt.Errorf("stderr pipe: %w", err)
    }

    if err := vs.cmd.Start(); err != nil {
        return fmt.Errorf("start ffmpeg: %w", err)
    }

    // Log FFmpeg stderr output for debugging
    go func() {
        scanner := bufio.NewScanner(stderr)
        for scanner.Scan() {
            log.Printf("FFmpeg: %s", scanner.Text())
        }
    }()
    
    // Frame reading goroutine
    vs.wg.Add(1)
    go vs.readFrames(ctx)
    
    return nil
}

func (vs *VideoSource) readFrames(ctx context.Context) {
    defer vs.wg.Done()
    
    frameSize := vs.width * vs.height * 3 // RGB24: 3 bytes per pixel
    
    frameBuffer := make([]byte, frameSize)
    
    for {
        select {
        case <-ctx.Done():
            // When context is cancelled, also terminate the FFmpeg process
            if vs.cmd != nil && vs.cmd.Process != nil {
                _ = vs.cmd.Process.Kill()
            }
            return
        default:
            // Read a complete frame
            n, err := io.ReadFull(vs.stdout, frameBuffer)
            if err != nil {
                if err != io.EOF && err != io.ErrUnexpectedEOF {
                    select {
                    case vs.errors <- fmt.Errorf("read frame: %w", err):
                    case <-ctx.Done():
                    }
                }
                return
            }
            
            if n != frameSize {
                continue // Incomplete frame, skip
            }
            
            // Copy new frame buffer (goroutine safe)
            frameData := make([]byte, frameSize)
            copy(frameData, frameBuffer)
            
            frame := &Frame{
                Data:      frameData,
                Width:     vs.width,
                Height:    vs.height,
                Timestamp: time.Now(),
            }
            
            select {
            case vs.frames <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

func (vs *VideoSource) Frames() <-chan *Frame {
    return vs.frames
}

func (vs *VideoSource) Errors() <-chan error {
    return vs.errors
}

func (vs *VideoSource) Stop() error {
    if vs.cmd != nil && vs.cmd.Process != nil {
        vs.cmd.Process.Kill()
    }
    vs.wg.Wait()
    close(vs.frames)
    close(vs.errors)
    return nil
}

8. Video Pipeline with Go

Go’s goroutine and channel structure is perfect for video processing pipelines. Each stage runs as a goroutine and frames are passed through channels.

8.1 Pipeline Architecture

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Pipeline struct {
    source    *VideoSource
    processors []FrameProcessor
    wg        sync.WaitGroup
}

type FrameProcessor interface {
    Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame)
    Name() string
}

func NewPipeline(source *VideoSource) *Pipeline {
    return &Pipeline{
        source:     source,
        processors: make([]FrameProcessor, 0),
    }
}

func (p *Pipeline) AddProcessor(processor FrameProcessor) {
    p.processors = append(p.processors, processor)
}

func (p *Pipeline) Run(ctx context.Context) error {
    // Start video source
    if err := p.source.Start(ctx); err != nil {
        return err
    }
    
    // Create channels
    channels := make([]chan *Frame, len(p.processors)+1)
    for i := range channels {
        channels[i] = make(chan *Frame, 10)
    }
    
    // Copy from source to first channel
    p.wg.Add(1)
    go func() {
        defer p.wg.Done()
        defer close(channels[0])
        for frame := range p.source.Frames() {
            select {
            case channels[0] <- frame:
            case <-ctx.Done():
                return
            }
        }
    }()
    
    // Run each processor
    for i, proc := range p.processors {
        p.wg.Add(1)
        go func(idx int, processor FrameProcessor) {
            defer p.wg.Done()
            defer func() {
                if r := recover(); r != nil {
                    fmt.Printf("Processor panic: %v\n", r)
                }
                close(channels[idx+1])
            }()
            processor.Process(ctx, channels[idx], channels[idx+1])
        }(i, proc)
    }
    
    // Error handling
    go func() {
        for err := range p.source.Errors() {
            // Log error (example)
            fmt.Printf("Video source error: %v\n", err)
        }
    }()
    
    return nil
}

func (p *Pipeline) Stop() error {
    return p.source.Stop()
}

8.2 Frame Struct Enhancement

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type Frame struct {
    Data      []byte
    Width     int
    Height    int
    Timestamp time.Time
    Metadata  map[string]interface{} // For analysis results
}

func NewFrame(data []byte, width, height int) *Frame {
    return &Frame{
        Data:      data,
        Width:     width,
        Height:    height,
        Timestamp: time.Now(),
        Metadata:  make(map[string]interface{}),
    }
}

9. Real-Time Video Analysis

9.0 ROI (Region of Interest) Detection

Instead of processing the entire frame, we can significantly reduce CPU usage by processing only relevant regions (ROI):

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import (
    "image"
    "sync"
)

type ROIDetector struct {
    regions []image.Rectangle
    mutex   sync.RWMutex
}

func NewROIDetector() *ROIDetector {
    return &ROIDetector{
        regions: make([]image.Rectangle, 0),
    }
}

// Determine ROI based on motion detection results
func (rd *ROIDetector) UpdateROI(motionMap []bool, width, height int) {
    rd.mutex.Lock()
    defer rd.mutex.Unlock()
    
    // Detect motion regions and create bounding box
    // Simple implementation: bounding box of motion pixels
    minX, minY := width, height
    maxX, maxY := 0, 0
    
    for y := 0; y < height; y++ {
        for x := 0; x < width; x++ {
            idx := y*width + x
            if idx < len(motionMap) && motionMap[idx] {
                if x < minX {
                    minX = x
                }
                if x > maxX {
                    maxX = x
                }
                if y < minY {
                    minY = y
                }
                if y > maxY {
                    maxY = y
                }
            }
        }
    }
    
    if maxX > minX && maxY > minY {
        // Add padding
        padding := 20
        roi := image.Rect(
            maxInt(0, minX-padding),
            maxInt(0, minY-padding),
            minInt(width, maxX+padding),
            minInt(height, maxY+padding),
        )
        rd.regions = []image.Rectangle{roi}
    }
}

func maxInt(a, b int) int {
    if a > b {
        return a
    }
    return b
}

func minInt(a, b int) int {
    if a < b {
        return a
    }
    return b
}

// Extract only the ROI region from frame
func (rd *ROIDetector) ExtractROI(frame *Frame) []*Frame {
    rd.mutex.RLock()
    defer rd.mutex.RUnlock()
    
    roiFrames := make([]*Frame, 0, len(rd.regions))
    
    for _, roi := range rd.regions {
        roiData := extractRegion(frame.Data, frame.Width, frame.Height, roi)
        roiFrame := &Frame{
            Data:      roiData,
            Width:     roi.Dx(),
            Height:    roi.Dy(),
            Timestamp: frame.Timestamp,
            Metadata: map[string]interface{}{
                "roi":        roi,
                "original_width":  frame.Width,
                "original_height": frame.Height,
            },
        }
        roiFrames = append(roiFrames, roiFrame)
    }
    
    return roiFrames
}

func extractRegion(data []byte, width, height int, roi image.Rectangle) []byte {
    roiWidth := roi.Dx()
    roiHeight := roi.Dy()

    // Bounds check once (ROI'nin frame sฤฑnฤฑrlarฤฑ iรงinde olduฤŸundan emin ol)
    if roi.Min.X < 0 || roi.Min.Y < 0 || roi.Max.X > width || roi.Max.Y > height {
        return nil
    }

    roiData := make([]byte, roiWidth*roiHeight*3)

    for y := 0; y < roiHeight; y++ {
        srcY := roi.Min.Y + y
        srcOffset := (srcY*width + roi.Min.X) * 3
        dstOffset := (y * roiWidth) * 3

        // Copy whole row at once (faster, fewer bounds checks)
        copy(roiData[dstOffset:dstOffset+roiWidth*3],
            data[srcOffset:srcOffset+roiWidth*3])
    }

    return roiData
}

Advantages of ROI Usage:

  • 60-80% CPU savings - Process only relevant regions
  • Faster object detection - Inference on smaller ROIs
  • Less memory usage - Store only ROI frames

9.1 Motion Detection

Motion detection detects movement by calculating differences between frames. It’s a lightweight and fast method for edge systems.

Frame Differencing Algorithm

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main

import (
    "context"
    "sync"
)

type MotionDetector struct {
    threshold     float64
    previousFrame []byte
    width, height int
    mutex         sync.RWMutex // thread-safety for previousFrame
}

func NewMotionDetector(threshold float64, width, height int) *MotionDetector {
    return &MotionDetector{
        threshold: threshold,
        width:     width,
        height:    height,
    }
}

func (md *MotionDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    for frame := range in {
        select {
        case <-ctx.Done():
            return
        default:
            hasMotion := md.detectMotion(frame)
            frame.Metadata["motion_detected"] = hasMotion
            
            if hasMotion {
                frame.Metadata["motion_score"] = md.calculateMotionScore(frame)
            }
            
            select {
            case out <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

func (md *MotionDetector) Name() string {
    return "MotionDetector"
}

func (md *MotionDetector) detectMotion(frame *Frame) bool {
    md.mutex.Lock()
    defer md.mutex.Unlock()
    
    if md.previousFrame == nil {
        // First frame, save and no motion
        md.previousFrame = make([]byte, len(frame.Data))
        copy(md.previousFrame, frame.Data)
        return false
    }
    
    diff := md.calculateDifference(frame.Data, md.previousFrame)
    motionRatio := float64(diff) / float64(len(frame.Data))
    
    // Reuse previous frame buffer (reallocate only if size changed)
    if md.previousFrame == nil || len(md.previousFrame) != len(frame.Data) {
        md.previousFrame = make([]byte, len(frame.Data))
    }
    copy(md.previousFrame, frame.Data)
    
    return motionRatio > md.threshold
}

func (md *MotionDetector) calculateDifference(current, previous []byte) int {
    diff := 0
    for i := 0; i < len(current) && i < len(previous); i++ {
        if current[i] > previous[i] {
            diff += int(current[i] - previous[i])
        } else {
            diff += int(previous[i] - current[i])
        }
    }
    return diff
}

func (md *MotionDetector) calculateMotionScore(frame *Frame) float64 {
    md.mutex.RLock()
    defer md.mutex.RUnlock()
    
    if md.previousFrame == nil {
        return 0.0
    }
    
    diff := md.calculateDifference(frame.Data, md.previousFrame)
    maxDiff := len(frame.Data) * 255
    return float64(diff) / float64(maxDiff)
}

9.2 Object Detection

For object detection, GoCV (OpenCV binding) or external inference services can be used.

Simple Object Detection with GoCV

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main

import (
    "context"
    "fmt"
    "image"
    "os"
    "strings"
    "gocv.io/x/gocv"
)

type ObjectDetector struct {
    net    gocv.Net
    classes []string
}

func NewObjectDetector(modelPath, configPath, classesPath string) (*ObjectDetector, error) {
    net := gocv.ReadNet(modelPath, configPath)
    if net.Empty() {
        return nil, fmt.Errorf("failed to load model")
    }
    
    // Use GPU if available (optional)
    net.SetPreferableBackend(gocv.NetBackendOpenVINO)
    net.SetPreferableTarget(gocv.NetTargetCPU) // or NetTargetVPU
    
    // Read classes file
    classes, err := loadClasses(classesPath)
    if err != nil {
        return nil, fmt.Errorf("failed to load classes: %w", err)
    }
    
    return &ObjectDetector{
        net:     net,
        classes: classes,
    }, nil
}

func (od *ObjectDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    for frame := range in {
        select {
        case <-ctx.Done():
            return
        default:
            detections := od.detect(frame)
            frame.Metadata["detections"] = detections
            
            select {
            case out <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

func (od *ObjectDetector) Name() string {
    return "ObjectDetector"
}

func (od *ObjectDetector) detect(frame *Frame) []Detection {
    // Convert frame data to gocv.Mat
    img, err := gocv.NewMatFromBytes(
        frame.Height,
        frame.Width,
        gocv.MatTypeCV8UC3,
        frame.Data,
    )
    if err != nil {
        return nil
    }
    defer img.Close()
    
    // Create blob (model input format)
    blob := gocv.BlobFromImage(img, 1.0/255.0, image.Pt(416, 416), gocv.NewScalar(0, 0, 0, 0), true, false)
    defer blob.Close()
    
    // Inference
    od.net.SetInput(blob, "")
    prob := od.net.Forward("")
    defer prob.Close()
    
    // Parse results
    detections := parseDetections(prob, frame.Width, frame.Height)
    
    return detections
}

type Detection struct {
    Class      string
    Confidence float64
    BBox       image.Rectangle
}

func parseDetections(prob gocv.Mat, imgWidth, imgHeight int) []Detection {
    // Parse according to model output
    // This example is simplified, real implementation depends on model
    return []Detection{}
}

func loadClasses(path string) ([]string, error) {
    data, err := os.ReadFile(path)
    if err != nil {
        return nil, fmt.Errorf("failed to read classes file: %w", err)
    }
    
    lines := strings.Split(string(data), "\n")
    classes := make([]string, 0, len(lines))
    for _, line := range lines {
        line = strings.TrimSpace(line)
        if line != "" {
            classes = append(classes, line)
        }
    }
    
    return classes, nil
}

External Inference Service (HTTP/REST)

If the model is very large or there’s specialized hardware (NVIDIA Jetson, Intel Neural Compute Stick), we can move inference to a separate service:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package main

import (
    "bytes"
    "context"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "net/http"
    "time"
)

type ExternalDetector struct {
    client  *http.Client
    apiURL  string
    timeout time.Duration
}

func (ed *ExternalDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    for frame := range in {
        select {
        case <-ctx.Done():
            return
        default:
            detections, err := ed.callInferenceService(frame)
            if err == nil {
                frame.Metadata["detections"] = detections
            } else {
                frame.Metadata["detection_error"] = err.Error()
            }
            
            select {
            case out <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

func (ed *ExternalDetector) callInferenceService(frame *Frame) ([]Detection, error) {
    // Base64 encode frame or send binary
    reqBody, err := json.Marshal(map[string]interface{}{
        "image": base64.StdEncoding.EncodeToString(frame.Data),
        "width": frame.Width,
        "height": frame.Height,
    })
    if err != nil {
        return nil, err
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), ed.timeout)
    defer cancel()
    
    req, err := http.NewRequestWithContext(ctx, "POST", ed.apiURL, bytes.NewReader(reqBody))
    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/json")
    
    resp, err := ed.client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    
    var result struct {
        Detections []Detection `json:"detections"`
    }
    
    if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
        return nil, err
    }
    
    return result.Detections, nil
}

10. Edge Decision Layer

The Edge Decision Layer makes decisions based on analysis results and sends only important events to the cloud. This layer:

  • Threshold filtering - Filters low confidence detections
  • Event deduplication - Doesn’t send the same event repeatedly
  • Rate limiting - Prevents sending too many events
  • Event aggregation - Combines multiple events

10.1 Event Publisher Implementations

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type EdgeDecisionLayer struct {
    minConfidence  float64
    eventPublisher EventPublisher
    lastEvents     map[string]time.Time
    mutex          sync.RWMutex
    cooldown       time.Duration
}

type EventPublisher interface {
    PublishWithContext(ctx context.Context, event *Event) error
}

type Event struct {
    Type       string                 `json:"type"`
    Timestamp  time.Time              `json:"timestamp"`
    Confidence float64                `json:"confidence"`
    Detections []Detection            `json:"detections,omitempty"`
    Metadata   map[string]interface{} `json:"metadata"`
}

func NewEdgeDecisionLayer(minConfidence float64, publisher EventPublisher, cooldown time.Duration) *EdgeDecisionLayer {
    return &EdgeDecisionLayer{
        minConfidence:  minConfidence,
        eventPublisher: publisher,
        lastEvents:     make(map[string]time.Time),
        cooldown:       cooldown,
    }
}

func (edl *EdgeDecisionLayer) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    for frame := range in {
        select {
        case <-ctx.Done():
            return
        default:
            edl.processFrame(frame)
            
            // Continue frame in pipeline
            select {
            case out <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

func (edl *EdgeDecisionLayer) Name() string {
    return "EdgeDecisionLayer"
}

func (edl *EdgeDecisionLayer) processFrame(frame *Frame) {
    // Check if detections exist
    detections, ok := frame.Metadata["detections"].([]Detection)
    if !ok {
        return
    }
    
    // Filter high confidence detections
    significantDetections := edl.filterByConfidence(detections)
    if len(significantDetections) == 0 {
        return
    }
    
    // Determine event type
    eventType := edl.determineEventType(significantDetections)
    
    // Cooldown check (don't send same event too frequently)
    if !edl.shouldSendEvent(eventType) {
        return
    }
    
    // Create event
    event := &Event{
        Type:       eventType,
        Timestamp:  frame.Timestamp,
        Confidence: edl.calculateMaxConfidence(significantDetections),
        Detections: significantDetections,
        Metadata: map[string]interface{}{
            "frame_width":  frame.Width,
            "frame_height": frame.Height,
        },
    }

    // Publish asynchronously with timeout to avoid blocking the pipeline
    go func(evt *Event, evtType string) {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        if err := edl.eventPublisher.PublishWithContext(ctx, evt); err != nil {
            fmt.Printf("Failed to publish event: %v\n", err)
            return
        }

        // Update cooldown on successful publish
        edl.mutex.Lock()
        edl.lastEvents[evtType] = time.Now()
        edl.mutex.Unlock()
    }(event, eventType)
}

func (edl *EdgeDecisionLayer) filterByConfidence(detections []Detection) []Detection {
    filtered := make([]Detection, 0)
    for _, det := range detections {
        if det.Confidence >= edl.minConfidence {
            filtered = append(filtered, det)
        }
    }
    return filtered
}

func (edl *EdgeDecisionLayer) determineEventType(detections []Detection) string {
    // Simple logic: use first detection's class
    // Real application can have more complex logic
    if len(detections) > 0 {
        return detections[0].Class + "_detected"
    }
    return "unknown"
}

func (edl *EdgeDecisionLayer) shouldSendEvent(eventType string) bool {
    edl.mutex.RLock()
    lastTime, exists := edl.lastEvents[eventType]
    edl.mutex.RUnlock()
    
    if !exists {
        return true
    }
    
    return time.Since(lastTime) > edl.cooldown
}

func (edl *EdgeDecisionLayer) calculateMaxConfidence(detections []Detection) float64 {
    max := 0.0
    for _, det := range detections {
        if det.Confidence > max {
            max = det.Confidence
        }
    }
    return max
}

MQTT Publisher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
    "encoding/json"
    mqtt "github.com/eclipse/paho.mqtt.golang"
)

type MQTTPublisher struct {
    client mqtt.Client
    topic  string
}

func NewMQTTPublisher(brokerURL, topic, clientID string) (*MQTTPublisher, error) {
    opts := mqtt.NewClientOptions()
    opts.AddBroker(brokerURL)
    opts.SetClientID(clientID)
    
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        return nil, token.Error()
    }
    
    return &MQTTPublisher{
        client: client,
        topic:  topic,
    }, nil
}

func (mp *MQTTPublisher) PublishWithContext(ctx context.Context, event *Event) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }
    
    token := mp.client.Publish(mp.topic, 1, false, data)

    done := make(chan struct{})
    go func() {
        token.Wait()
        close(done)
    }()

    select {
    case <-ctx.Done():
        return ctx.Err()
    case <-done:
        return token.Error()
    }
}

func (mp *MQTTPublisher) Close() error {
    if mp.client != nil && mp.client.IsConnected() {
        // 250ms timeout for pending operations
        mp.client.Disconnect(250)
    }
    return nil
}

HTTP Webhook Publisher

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
)

type WebhookPublisher struct {
    client  *http.Client
    webhookURL string
}

func (wp *WebhookPublisher) PublishWithContext(ctx context.Context, event *Event) error {
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }
    
    req, err := http.NewRequestWithContext(ctx, "POST", wp.webhookURL, bytes.NewReader(data))
    if err != nil {
        return err
    }
    req.Header.Set("Content-Type", "application/json")
    
    resp, err := wp.client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode >= 400 {
        return fmt.Errorf("webhook returned status %d", resp.StatusCode)
    }
    
    return nil
}

11. Multi-Camera Support

In production environments, you typically need to manage multiple cameras. We can do parallel processing by creating separate pipelines for each camera:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
import (
    "context"
    "fmt"
    "log"
    "sync"
)

type MultiCameraManager struct {
    cameras   map[string]*CameraPipeline
    mutex     sync.RWMutex
    ctx       context.Context
    cancel    context.CancelFunc
}

type CameraPipeline struct {
    ID       string
    Source   *VideoSource
    Pipeline *Pipeline
    Config   CameraConfig
}

type CameraConfig struct {
    URL           string
    Width         int
    Height        int
    MotionEnabled bool
    ObjectEnabled bool
}

func NewMultiCameraManager() *MultiCameraManager {
    ctx, cancel := context.WithCancel(context.Background())
    return &MultiCameraManager{
        cameras: make(map[string]*CameraPipeline),
        ctx:     ctx,
        cancel:  cancel,
    }
}

func (mcm *MultiCameraManager) AddCamera(id string, config CameraConfig) error {
    mcm.mutex.Lock()
    defer mcm.mutex.Unlock()
    
    source := NewVideoSource(config.URL, config.Width, config.Height)
    pipeline := NewPipeline(source)
    
    // Add motion detector
    if config.MotionEnabled {
        motionDetector := NewMotionDetector(0.05, config.Width, config.Height)
        pipeline.AddProcessor(motionDetector)
    }
    
    // Add object detector
    if config.ObjectEnabled {
        objectDetector, err := NewObjectDetector("model.onnx", "config.yaml", "classes.txt")
        if err == nil {
            pipeline.AddProcessor(objectDetector)
        }
    }
    
    cameraPipeline := &CameraPipeline{
        ID:       id,
        Source:   source,
        Pipeline: pipeline,
        Config:   config,
    }
    
    mcm.cameras[id] = cameraPipeline
    
    // Start pipeline
    go func() {
        if err := pipeline.Run(mcm.ctx); err != nil {
            log.Printf("Camera %s pipeline error: %v", id, err)
        }
    }()
    
    return nil
}

func (mcm *MultiCameraManager) RemoveCamera(id string) error {
    mcm.mutex.Lock()
    defer mcm.mutex.Unlock()
    
    if camera, exists := mcm.cameras[id]; exists {
        camera.Pipeline.Stop()
        delete(mcm.cameras, id)
    }
    
    return nil
}

func (mcm *MultiCameraManager) GetCameraStatus(id string) (map[string]interface{}, error) {
    mcm.mutex.RLock()
    defer mcm.mutex.RUnlock()
    
    camera, exists := mcm.cameras[id]
    if !exists {
        return nil, fmt.Errorf("camera %s not found", id)
    }
    
    return map[string]interface{}{
        "id":       camera.ID,
        "url":      camera.Config.URL,
        "status":   "running",
        "width":    camera.Config.Width,
        "height":   camera.Config.Height,
    }, nil
}

12. Performance and Optimization

12.1 Memory Pooling

Video processing requires a lot of memory allocation. We can reduce memory allocation overhead using sync.Pool:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
var framePool = sync.Pool{
    New: func() interface{} {
        // Pool for frame buffer
        return make([]byte, 640*480*3) // RGB24, 640x480
    },
}

// Get from pool when reading frames (correct usage example)
func (vs *VideoSource) readFrames(ctx context.Context) {
    frameSize := vs.width * vs.height * 3

    for {
        frameBuffer := framePool.Get().([]byte)

        n, err := io.ReadFull(vs.stdout, frameBuffer)
        if err != nil {
            // Immediately return buffer to pool on error
            framePool.Put(frameBuffer)

            if err == io.EOF || err == io.ErrUnexpectedEOF {
                return
            }

            select {
            case vs.errors <- fmt.Errorf("read frame: %w", err):
            case <-ctx.Done():
            }
            return
        }

        if n != frameSize {
            framePool.Put(frameBuffer)
            continue
        }

        // Copy frame data so it is independent from the pool buffer
        frameData := make([]byte, frameSize)
        copy(frameData, frameBuffer)

        // Return buffer to pool
        framePool.Put(frameBuffer)

        frame := &Frame{
            Data:      frameData,
            Width:     vs.width,
            Height:    vs.height,
            Timestamp: time.Now(),
            Metadata:  make(map[string]interface{}),
        }

        select {
        case vs.frames <- frame:
        case <-ctx.Done():
            return
        }
    }
}

12.2 Frame Rate Control

In some cases, we don’t need to process all frames. We can reduce CPU usage with frame skipping:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type FrameRateController struct {
    targetFPS    int
    frameCounter int
    skipRatio    int
}

func NewFrameRateController(targetFPS int) *FrameRateController {
    // For example: to reduce from 30 FPS to 10 FPS, take 1 out of every 3 frames
    skipRatio := 30 / targetFPS
    return &FrameRateController{
        targetFPS: targetFPS,
        skipRatio: skipRatio,
    }
}

func (frc *FrameRateController) ShouldProcess() bool {
    frc.frameCounter++
    return frc.frameCounter%frc.skipRatio == 0
}

12.3 Context and Cancellation

Use context cancellation in long-running operations to properly clean up resources:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import (
    "os"
    "os/signal"
    "syscall"
)

func (p *Pipeline) Run(ctx context.Context) error {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    
    // Signal handling for graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    
    go func() {
        <-sigChan
        cancel()
    }()
    
    // Run pipeline
    // ...
    
    return nil
}

12.4 Benchmarking

To collect performance metrics:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type PerformanceMetrics struct {
    FramesProcessed int64
    ProcessingTime  time.Duration
    FrameRate       float64
    startTime       time.Time
    mutex           sync.RWMutex
}

func NewPerformanceMetrics() *PerformanceMetrics {
    return &PerformanceMetrics{
        startTime: time.Now(),
    }
}

func (pm *PerformanceMetrics) RecordFrame(processingTime time.Duration) {
    pm.mutex.Lock()
    defer pm.mutex.Unlock()
    pm.FramesProcessed++
    pm.ProcessingTime += processingTime
    elapsed := time.Since(pm.startTime).Seconds()
    if elapsed > 0 {
        pm.FrameRate = float64(pm.FramesProcessed) / elapsed
    }
}

12.5 Performance Metrics (Real Test Results)

Some real performance values we obtained in the test environment:

  • Hardware: Raspberry Pi 4 (4GB RAM), 640x480 resolution
  • Motion Detection: ~25-30 FPS (CPU: 15-20%)
  • Object Detection (CPU): ~2-3 FPS (CPU: 80-90%)
  • Object Detection (GPU/OpenVINO): ~8-10 FPS (CPU: 40-50%)
  • Memory Usage: ~150-200 MB (with memory pool)
  • Network Latency (MQTT): ~10-50ms (local broker)

Important Note: CPU usage for object detection is very high. In production, using GPU or dedicated AI accelerator (Jetson Nano, Neural Compute Stick) is recommended.

12.6 Error Handling and Resilience

Retry Mechanism

Video source connections can be interrupted. Let’s add a retry mechanism:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (vs *VideoSource) StartWithRetry(ctx context.Context, maxRetries int) error {
    var lastErr error
    for i := 0; i < maxRetries; i++ {
        if err := vs.Start(ctx); err == nil {
            return nil
        }
        lastErr = err
        
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(time.Second * time.Duration(i+1)):
            // Exponential backoff
        }
    }
    return fmt.Errorf("failed after %d retries: %w", maxRetries, lastErr)
}

Graceful Degradation

If the object detection service is not working, we can continue with only motion detection:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (od *ObjectDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    for frame := range in {
        select {
        case <-ctx.Done():
            return
        default:
            detections, err := od.detect(frame)
            if err != nil {
                // Log error but don't stop pipeline
                frame.Metadata["detection_error"] = err.Error()
            } else {
                frame.Metadata["detections"] = detections
            }
            
            select {
            case out <- frame:
            case <-ctx.Done():
                return
            }
        }
    }
}

12.7 Monitoring and Logging

Structured Logging

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import "github.com/sirupsen/logrus"

var logger = logrus.New()

func init() {
    logger.SetFormatter(&logrus.JSONFormatter{})
    logger.SetLevel(logrus.InfoLevel)
}

// Usage
logger.WithFields(logrus.Fields{
    "frame_id":    frameID,
    "fps":         fps,
    "detections":  len(detections),
    "processing_time_ms": processingTime.Milliseconds(),
}).Info("Frame processed")

Metrics Export (Prometheus)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import "github.com/prometheus/client_golang/prometheus"

var (
    framesProcessed = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "video_frames_processed_total",
            Help: "Total number of frames processed",
        },
        []string{"source"},
    )
    
    processingDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "video_frame_processing_seconds",
            Help:    "Frame processing duration",
            Buckets: prometheus.DefBuckets,
        },
        []string{"stage"},
    )
)

func init() {
    prometheus.MustRegister(framesProcessed)
    prometheus.MustRegister(processingDuration)
}

Metrics HTTP Endpoint

Let’s add an HTTP endpoint to expose Prometheus metrics:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import (
    "net/http"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

func startMetricsServer(port string) error {
    http.Handle("/metrics", promhttp.Handler())
    return http.ListenAndServe(":"+port, nil)
}

// In main.go
go func() {
    if err := startMetricsServer("2112"); err != nil {
        log.Printf("Metrics server error: %v", err)
    }
}()

12.8 Profiling and Performance Analysis

We can perform performance analysis with Go’s built-in profiling tools.

CPU Profiling with pprof

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import (
    _ "net/http/pprof"
    "net/http"
)

func main() {
    // Add profiling endpoints
    go func() {
        log.Println(http.ListenAndServe("localhost:6060", nil))
    }()
    
    // ... your application code
}

// To collect profile:
// go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30
// go tool pprof http://localhost:6060/debug/pprof/heap

Memory Profiling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import (
    "runtime"
    "runtime/pprof"
    "os"
)

func captureMemoryProfile(filename string) error {
    f, err := os.Create(filename)
    if err != nil {
        return err
    }
    defer f.Close()
    
    runtime.GC() // Force GC
    return pprof.WriteHeapProfile(f)
}

// Usage: Periodically capture memory profile
go func() {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()
    for range ticker.C {
        captureMemoryProfile("memprofile.prof")
    }
}()

Goroutine Profiling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import (
    "runtime/pprof"
    "os"
)

func captureGoroutineProfile(filename string) error {
    f, err := os.Create(filename)
    if err != nil {
        return err
    }
    defer f.Close()
    
    return pprof.Lookup("goroutine").WriteTo(f, 0)
}

13. Testing

For writing tests in video processing systems, we should use mocks and test utilities. A comprehensive test strategy is critical for production systems.

13.1 Unit Test Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
    "testing"
    "time"
)

func TestMotionDetector(t *testing.T) {
    detector := NewMotionDetector(0.05, 640, 480)
    
    // First frame - should not have motion
    frame1 := &Frame{
        Data:      make([]byte, 640*480*3),
        Width:     640,
        Height:    480,
        Timestamp: time.Now(),
        Metadata:  make(map[string]interface{}),
    }
    
    hasMotion := detector.detectMotion(frame1)
    if hasMotion {
        t.Error("First frame should not have motion")
    }
    
    // Second frame - make changes
    frame2 := &Frame{
        Data:      make([]byte, 640*480*3),
        Width:     640,
        Height:    480,
        Timestamp: time.Now(),
        Metadata:  make(map[string]interface{}),
    }
    
    // Make changes in frame2 (change pixels)
    for i := 0; i < len(frame2.Data); i += 100 {
        frame2.Data[i] = 255
    }
    
    hasMotion = detector.detectMotion(frame2)
    if !hasMotion {
        t.Error("Frame with changes should have motion")
    }
}

13.2 Integration Test

Integration tests that test real components working together:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func TestVideoPipelineIntegration(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    
    // Test with real FFmpeg process (use test video file)
    source := NewVideoSource("test_video.mp4", 640, 480)
    
    pipeline := NewPipeline(source)
    motionDetector := NewMotionDetector(0.05, 640, 480)
    objectDetector := NewMockObjectDetector()
    edgeDecision := NewEdgeDecisionLayer(0.7, 5*time.Second)
    
    pipeline.AddProcessor(motionDetector)
    pipeline.AddProcessor(objectDetector)
    pipeline.AddProcessor(edgeDecision)
    
    // Collect events
    events := make([]Event, 0)
    go func() {
        for event := range edgeDecision.Events() {
            events = append(events, event)
        }
    }()
    
    // Run pipeline
    if err := pipeline.Run(ctx); err != nil {
        t.Fatalf("Failed to start pipeline: %v", err)
    }
    
    // Wait 10 seconds
    time.Sleep(10 * time.Second)
    pipeline.Stop()
    
    // At least one event expected
    if len(events) == 0 {
        t.Error("No events generated")
    }
    
    // Check that events are in correct format
    for _, event := range events {
        if event.Timestamp.IsZero() {
            t.Error("Event timestamp is empty")
        }
        if event.Type == "" {
            t.Error("Event type is empty")
        }
    }
}

13.3 End-to-End (E2E) Test

E2E tests that test the entire system working together:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func TestE2EVideoProcessing(t *testing.T) {
    // Start test environment
    testEnv := setupTestEnvironment(t)
    defer testEnv.Cleanup()
    
    // Start MQTT broker (for testing)
    mqttBroker := startTestMQTTBroker(t)
    defer mqttBroker.Stop()
    
    // Start video processing service
    config := &Config{
        VideoSource: "rtsp://test-camera:554/stream",
        MQTTBroker:  "localhost:1883",
    }
    
    service := NewVideoProcessingService(config)
    if err := service.Start(); err != nil {
        t.Fatalf("Failed to start service: %v", err)
    }
    defer service.Stop()
    
    // Send test video stream
    go sendTestVideoStream(t, "rtsp://test-camera:554/stream")
    
    // Listen to events from MQTT
    events := subscribeToMQTT(t, "video/events/+")
    
    // Event expected within 30 seconds
    timeout := time.After(30 * time.Second)
    select {
    case event := <-events:
        // Validate event
        if err := validateEvent(event); err != nil {
            t.Errorf("Invalid event: %v", err)
        }
        t.Logf("Event received: %+v", event)
    case <-timeout:
        t.Error("Event timeout - no events received")
    }
}

13.4 Load Testing

Load tests that test system performance under load:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func TestLoadMultipleCameras(t *testing.T) {
    if testing.Short() {
        t.Skip("Skipping load test in short mode")
    }
    
    numCameras := 10
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()
    
    // Create multi-camera manager
    manager := NewMultiCameraManager()
    
    // Add 10 cameras
    for i := 0; i < numCameras; i++ {
        cameraID := fmt.Sprintf("camera-%d", i)
        sourceURL := fmt.Sprintf("rtsp://test-camera-%d:554/stream", i)
        
        if err := manager.AddCamera(cameraID, sourceURL, 640, 480); err != nil {
            t.Fatalf("Failed to add camera: %v", err)
        }
    }
    
    // Start all cameras
    if err := manager.StartAll(ctx); err != nil {
        t.Fatalf("Failed to start cameras: %v", err)
    }
    
    // Run for 2 minutes
    time.Sleep(2 * time.Minute)
    
    // Collect metrics
    metrics := manager.GetMetrics()
    
    // Performance check
    if metrics.AverageFPS < 20 {
        t.Errorf("Low FPS: %f (expected: >= 20)", metrics.AverageFPS)
    }
    
    if metrics.CPUUsage > 80 {
        t.Errorf("High CPU usage: %f%% (expected: < 80%%)", metrics.CPUUsage)
    }
    
    if metrics.MemoryUsage > 500*1024*1024 { // 500MB
        t.Errorf("High memory usage: %d bytes", metrics.MemoryUsage)
    }
    
    manager.StopAll()
}

13.5 Stress Testing

Stress tests that test system limits:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func TestStressHighFrameRate(t *testing.T) {
    if testing.Short() {
        t.Skip("Skipping stress test in short mode")
    }
    
    // Test with very high frame rate
    highFPSSource := NewMockVideoSourceWithFPS(60) // 60 FPS
    
    pipeline := NewPipeline(highFPSSource)
    // Add all processors
    
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
    defer cancel()
    
    // Monitor frame drop count
    frameDrops := atomic.Int64{}
    
    go func() {
        for frame := range pipeline.Frames() {
            if frame.Metadata["dropped"] == true {
                frameDrops.Add(1)
            }
        }
    }()
    
    if err := pipeline.Run(ctx); err != nil {
        t.Fatalf("Failed to start pipeline: %v", err)
    }
    
    time.Sleep(30 * time.Second)
    pipeline.Stop()
    
    dropRate := float64(frameDrops.Load()) / 1800.0 // 30 seconds * 60 FPS
    if dropRate > 0.1 { // More than 10% drop
        t.Logf("High frame drop rate: %.2f%%", dropRate*100)
        // This means backpressure mechanism is working
    }
}

13.6 Pipeline Integration Test

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func TestPipeline(t *testing.T) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    // Create mock video source
    source := &MockVideoSource{
        frames: make(chan *Frame, 10),
    }
    
    // Add test frames
    for i := 0; i < 5; i++ {
        source.frames <- &Frame{
            Data:      make([]byte, 640*480*3),
            Width:     640,
            Height:    480,
            Timestamp: time.Now(),
            Metadata:  make(map[string]interface{}),
        }
    }
    close(source.frames)
    
    // Create pipeline
    pipeline := NewPipeline(source)
    motionDetector := NewMotionDetector(0.05, 640, 480)
    pipeline.AddProcessor(motionDetector)
    
    // Run pipeline
    if err := pipeline.Run(ctx); err != nil {
        t.Fatalf("Failed to start pipeline: %v", err)
    }
    
    // Check results
    time.Sleep(100 * time.Millisecond)
    pipeline.Stop()
}

13.7 Mock Video Source

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type MockVideoSource struct {
    frames chan *Frame
    errors chan error
}

func (m *MockVideoSource) Start(ctx context.Context) error {
    return nil
}

func (m *MockVideoSource) Frames() <-chan *Frame {
    return m.frames
}

func (m *MockVideoSource) Errors() <-chan error {
    return m.errors
}

func (m *MockVideoSource) Stop() error {
    close(m.frames)
    close(m.errors)
    return nil
}

13.8 Benchmark Test

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func BenchmarkMotionDetection(b *testing.B) {
    detector := NewMotionDetector(0.05, 640, 480)
    frame := &Frame{
        Data:      make([]byte, 640*480*3),
        Width:     640,
        Height:    480,
        Timestamp: time.Now(),
        Metadata:  make(map[string]interface{}),
    }
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        detector.detectMotion(frame)
        // Change frame
        frame.Data[i%len(frame.Data)]++
    }
}

// To run: go test -bench=BenchmarkMotionDetection -benchmem

13.9 Rate Limiting Test

To test that the rate limiting mechanism works correctly:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import (
    "sync"
    "testing"
    "time"
    
    "golang.org/x/time/rate"
)

func TestRateLimiting(t *testing.T) {
    publisher := &MockPublisher{
        published: make([]*Event, 0),
    }
    
    // Create rate limiter with 10 events/second limit
    limiter := rate.NewLimiter(10, 1) // 10 per second, burst 1
    rateLimited := &RateLimitedPublisher{
        publisher: publisher,
        limiter:   limiter,
    }
    
    // Send 20 events
    start := time.Now()
    for i := 0; i < 20; i++ {
        event := &Event{
            Type:      "test",
            Timestamp: time.Now(),
            Data:      map[string]interface{}{"id": i},
        }
        if err := rateLimited.Publish(event); err != nil {
            t.Errorf("Publish error: %v", err)
        }
    }
    elapsed := time.Since(start)
    
    // At least 1 second required for 20 events (10 event/sec limit)
    // Give some tolerance (0.9 seconds)
    if elapsed < 900*time.Millisecond {
        t.Errorf("Rate limiting not working: %v (expected: >= 1s)", elapsed)
    }
    
    // Check that all events were published
    if len(publisher.published) != 20 {
        t.Errorf("Expected 20 events, got: %d", len(publisher.published))
    }
}

type MockPublisher struct {
    published []*Event
    mutex     sync.Mutex
}

func (mp *MockPublisher) Publish(event *Event) error {
    mp.mutex.Lock()
    defer mp.mutex.Unlock()
    mp.published = append(mp.published, event)
    return nil
}

14. Configuration Management

We can use YAML or TOML for configuration management.

YAML Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import (
    "os"
    "gopkg.in/yaml.v3"
)

type Config struct {
    VideoSource struct {
        URL    string `yaml:"url"`
        Width  int    `yaml:"width"`
        Height int    `yaml:"height"`
    } `yaml:"video_source"`
    
    Processing struct {
        MotionThreshold float64 `yaml:"motion_threshold"`
        MinConfidence   float64 `yaml:"min_confidence"`
        TargetFPS       int     `yaml:"target_fps"`
    } `yaml:"processing"`
    
    MQTT struct {
        BrokerURL string `yaml:"broker_url"`
        Topic     string `yaml:"topic"`
        ClientID  string `yaml:"client_id"`
    } `yaml:"mqtt"`
    
    Server struct {
        MetricsPort string `yaml:"metrics_port"`
        HealthPort  string `yaml:"health_port"`
    } `yaml:"server"`
}

func LoadConfig(path string) (*Config, error) {
    data, err := os.ReadFile(path)
    if err != nil {
        return nil, err
    }
    
    var config Config
    if err := yaml.Unmarshal(data, &config); err != nil {
        return nil, err
    }
    
    return &config, nil
}

config.yaml Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
video_source:
  url: "rtsp://camera:554/stream"
  width: 640
  height: 480

processing:
  motion_threshold: 0.05
  min_confidence: 0.7
  target_fps: 10

mqtt:
  broker_url: "tcp://mqtt:1883"
  topic: "video/events"
  client_id: "video-processor"

server:
  metrics_port: "2112"
  health_port: "8080"

15. Health Checks and Observability

Health check endpoints are critical in production environments.

Health Check Endpoint

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import (
    "encoding/json"
    "net/http"
    "sync/atomic"
)

type HealthStatus struct {
    Status    string            `json:"status"`
    Version   string            `json:"version"`
    Uptime    string            `json:"uptime"`
    Frames    int64             `json:"frames_processed"`
    Errors    int64             `json:"errors"`
    Details   map[string]string `json:"details,omitempty"`
}

type HealthChecker struct {
    startTime time.Time
    frames    int64
    errors    int64
    version   string
}

func NewHealthChecker(version string) *HealthChecker {
    return &HealthChecker{
        startTime: time.Now(),
        version:   version,
    }
}

func (hc *HealthChecker) RecordFrame() {
    atomic.AddInt64(&hc.frames, 1)
}

func (hc *HealthChecker) RecordError() {
    atomic.AddInt64(&hc.errors, 1)
}

func (hc *HealthChecker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    status := "healthy"
    
    // Unhealthy if no frames processed in last 1 minute
    frames := atomic.LoadInt64(&hc.frames)
    if frames == 0 {
        status = "unhealthy"
    }
    
    uptime := time.Since(hc.startTime).String()
    
    health := HealthStatus{
        Status:  status,
        Version: hc.version,
        Uptime:  uptime,
        Frames:  frames,
        Errors:  atomic.LoadInt64(&hc.errors),
    }
    
    w.Header().Set("Content-Type", "application/json")
    if status == "healthy" {
        w.WriteHeader(http.StatusOK)
    } else {
        w.WriteHeader(http.StatusServiceUnavailable)
    }
    
    json.NewEncoder(w).Encode(health)
}

// Usage
healthChecker := NewHealthChecker("1.0.0")
http.Handle("/health", healthChecker)
go http.ListenAndServe(":8080", nil)

Readiness and Liveness Probes (Kubernetes)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
livenessProbe:
  httpGet:
    path: /health
    port: 8080
  initialDelaySeconds: 30
  periodSeconds: 10
  timeoutSeconds: 5
  failureThreshold: 3

readinessProbe:
  httpGet:
    path: /health
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 5
  timeoutSeconds: 3
  failureThreshold: 2

16. Backpressure Handling

When frame rate is high in video pipelines, channel buffers can fill up. Let’s add a backpressure mechanism.

Adaptive Frame Dropping

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import (
    "math"
    "math/rand"
    "sync"
    "sync/atomic"
)

type FrameDropper struct {
    dropRatio    float64
    bufferFull   bool
    droppedCount int64
    mutex        sync.RWMutex
}

func NewFrameDropper() *FrameDropper {
    return &FrameDropper{
        dropRatio: 0.0,
    }
}

func (fd *FrameDropper) ShouldDrop(bufferUsage float64) bool {
    fd.mutex.Lock()
    defer fd.mutex.Unlock()
    
    // Start frame drop if buffer is more than 80% full
    if bufferUsage > 0.8 {
        fd.bufferFull = true
        fd.dropRatio = math.Min(0.5, (bufferUsage-0.8)*2.5) // Max 50% drop
        return true
    }
    
    if fd.bufferFull && bufferUsage > 0.5 {
        // Buffer still high, continue dropping
        return rand.Float64() < fd.dropRatio
    }
    
    // Buffer returned to normal
    fd.bufferFull = false
    fd.dropRatio = 0.0
    return false
}

func (fd *FrameDropper) RecordDrop() {
    atomic.AddInt64(&fd.droppedCount, 1)
}

Channel Buffer Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func monitorChannelBuffer(ch <-chan *Frame, bufferSize int) float64 {
    currentLen := len(ch)
    return float64(currentLen) / float64(bufferSize)
}

// Usage in pipeline
select {
case out <- frame:
    // Frame sent
case <-time.After(10 * time.Millisecond):
    // Timeout - buffer may be full
    bufferUsage := monitorChannelBuffer(out, cap(out))
    if frameDropper.ShouldDrop(bufferUsage) {
        frameDropper.RecordDrop()
        // Skip frame
        continue
    }
    // Retry
    select {
    case out <- frame:
    case <-ctx.Done():
        return
    }
}

17. Network Optimization and Video Streaming Protocols

Network optimization is critical in edge processing systems. Video streams require high bandwidth and network delays directly affect system performance.

17.1 Video Streaming Protocols

RTSP (Real-Time Streaming Protocol)

RTSP is the most common protocol for IP cameras:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// RTSP URL format
rtspURL := "rtsp://username:password@192.168.1.100:554/stream1"

// RTSP connection with FFmpeg
args := []string{
    "-rtsp_transport", "tcp",  // Use TCP (instead of UDP)
    "-i", rtspURL,
    "-vf", "scale=640:480",
    "-r", "15",  // Frame rate limit
    "-pix_fmt", "rgb24",
    "-f", "rawvideo",
    "pipe:1",
}

RTSP Optimizations:

  • Use TCP transport (prevents UDP packet loss)
  • Set timeout settings (fast recovery on connection drops)
  • Add reconnection logic (automatic reconnection)

WebRTC (Web Real-Time Communication)

WebRTC is ideal for browser-based real-time streaming:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import (
    "bytes"
    "image"
    "image/color"
    "image/jpeg"
    "sync"

    "github.com/gorilla/websocket"
)

// Convert frame to JPEG (helper function)
func frameToJPEG(frame *Frame, quality int) ([]byte, error) {
    img := image.NewRGBA(image.Rect(0, 0, frame.Width, frame.Height))
    
    for y := 0; y < frame.Height; y++ {
        for x := 0; x < frame.Width; x++ {
            idx := (y*frame.Width + x) * 3
            if idx+2 >= len(frame.Data) {
                continue
            }
            img.Set(x, y, color.RGBA{
                R: frame.Data[idx],
                G: frame.Data[idx+1],
                B: frame.Data[idx+2],
                A: 255,
            })
        }
    }
    
    var buf bytes.Buffer
    if err := jpeg.Encode(&buf, img, &jpeg.Options{Quality: quality}); err != nil {
        return nil, err
    }
    return buf.Bytes(), nil
}

// Send frames over WebSocket for WebRTC
type WebRTCStreamer struct {
    clients map[string]*websocket.Conn
    mutex   sync.RWMutex
}

func (ws *WebRTCStreamer) BroadcastFrame(frame *Frame) {
    ws.mutex.RLock()
    defer ws.mutex.RUnlock()
    
    // Convert frame to JPEG
    jpegData, err := frameToJPEG(frame, 80)
    if err != nil {
        return
    }
    
    // Send to all clients
    for _, conn := range ws.clients {
        if err := conn.WriteMessage(websocket.BinaryMessage, jpegData); err != nil {
            // Remove faulty connection
            delete(ws.clients, conn.RemoteAddr().String())
        }
    }
}

17.2 Bandwidth Optimization

Adaptive Bitrate Streaming

Dynamically adjusting quality based on network conditions:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import (
    "fmt"
    "sync"
)

type AdaptiveBitrate struct {
    currentQuality string
    bandwidth      float64  // Mbps
    frameRate      int
    resolution     string
    mutex          sync.RWMutex
}

func (ab *AdaptiveBitrate) AdjustQuality(measuredBandwidth float64) {
    ab.mutex.Lock()
    defer ab.mutex.Unlock()
    
    // Determine quality level based on bandwidth
    if measuredBandwidth < 1.0 {
        ab.resolution = "320x240"
        ab.frameRate = 10
    } else if measuredBandwidth < 3.0 {
        ab.resolution = "640x480"
        ab.frameRate = 15
    } else {
        ab.resolution = "1280x720"
        ab.frameRate = 30
    }
    
    ab.bandwidth = measuredBandwidth
}

func (ab *AdaptiveBitrate) GetFFmpegArgs() []string {
    ab.mutex.RLock()
    defer ab.mutex.RUnlock()
    
    width, height := parseResolution(ab.resolution)
    
    return []string{
        "-vf", fmt.Sprintf("scale=%d:%d", width, height),
        "-r", fmt.Sprintf("%d", ab.frameRate),
    }
}

func parseResolution(resolution string) (int, int) {
    var width, height int
    fmt.Sscanf(resolution, "%dx%d", &width, &height)
    return width, height
}

Frame Throttling

Throttling frames to reduce network load:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type FrameThrottler struct {
    maxFPS      int
    lastFrame   time.Time
    minInterval time.Duration
    mutex       sync.Mutex
}

func NewFrameThrottler(maxFPS int) *FrameThrottler {
    return &FrameThrottler{
        maxFPS:      maxFPS,
        minInterval: time.Second / time.Duration(maxFPS),
    }
}

func (ft *FrameThrottler) ShouldProcess() bool {
    ft.mutex.Lock()
    defer ft.mutex.Unlock()
    
    now := time.Now()
    if now.Sub(ft.lastFrame) >= ft.minInterval {
        ft.lastFrame = now
        return true
    }
    return false
}

17.3 Network Resilience

Connection Retry and Backoff

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type NetworkRetry struct {
    maxRetries int
    backoff    time.Duration
    mutex      sync.Mutex
}

func (nr *NetworkRetry) ConnectWithRetry(connectFn func() error) error {
    nr.mutex.Lock()
    defer nr.mutex.Unlock()
    
    backoff := nr.backoff
    for i := 0; i < nr.maxRetries; i++ {
        if err := connectFn(); err == nil {
            return nil
        }
        
        if i < nr.maxRetries-1 {
            time.Sleep(backoff)
            backoff *= 2  // Exponential backoff
        }
    }
    
    return fmt.Errorf("connection failed after %d retries", nr.maxRetries)
}

Network Quality Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type NetworkMonitor struct {
    latency     []time.Duration
    packetLoss  float64
    bandwidth   float64
    mutex       sync.RWMutex
}

func (nm *NetworkMonitor) MeasureLatency() time.Duration {
    start := time.Now()
    // Send ping or test packet
    // ...
    latency := time.Since(start)
    
    nm.mutex.Lock()
    nm.latency = append(nm.latency, latency)
    if len(nm.latency) > 100 {
        nm.latency = nm.latency[1:]
    }
    nm.mutex.Unlock()
    
    return latency
}

func (nm *NetworkMonitor) GetAverageLatency() time.Duration {
    nm.mutex.RLock()
    defer nm.mutex.RUnlock()
    
    if len(nm.latency) == 0 {
        return 0
    }
    
    var sum time.Duration
    for _, l := range nm.latency {
        sum += l
    }
    return sum / time.Duration(len(nm.latency))
}

17.4 Video Compression Strategies

Compressing frames in events sent from edge to cloud:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Compression based on JPEG quality setting
func compressFrame(frame *Frame, quality int) ([]byte, error) {
    img := &image.RGBA{
        Pix:    frame.Data,
        Stride: frame.Width * 3,
        Rect:   image.Rect(0, 0, frame.Width, frame.Height),
    }
    
    var buf bytes.Buffer
    opts := &jpeg.Options{Quality: quality}
    
    if err := jpeg.Encode(&buf, img, opts); err != nil {
        return nil, err
    }
    
    return buf.Bytes(), nil
}

// Quality levels:
// - 90-100: High quality, large file
// - 70-89:  Medium quality, medium file
// - 50-69:  Low quality, small file

18. Deployment and Containerization

Dockerfile

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# Build stage - Cache optimization with multi-stage build
FROM golang:1.23-alpine AS builder

WORKDIR /app

# FFmpeg and OpenCV dependencies
RUN apk add --no-cache \
    ffmpeg \
    pkgconfig \
    gstreamer \
    gstreamer-dev \
    opencv-dev

# Dependencies first (for cache - unchanged files)
COPY go.mod go.sum ./
RUN go mod download

# Then code (changing files)
COPY . .

# Build binary with optimization
RUN CGO_ENABLED=1 GOOS=linux go build \
    -ldflags="-w -s" \
    -o video-processor ./cmd/main.go

# Runtime image - Minimal and secure
FROM alpine:latest

RUN apk add --no-cache \
    ffmpeg \
    ca-certificates \
    libc6-compat

# Create non-root user (security best practice)
RUN addgroup -g 1000 appuser && \
    adduser -D -u 1000 -G appuser appuser

WORKDIR /app

# Copy binary from builder
COPY --from=builder /app/video-processor /app/video-processor

# Change ownership
RUN chown -R appuser:appuser /app

# Run as non-root user
USER appuser

CMD ["/app/video-processor"]

Docker Compose Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
version: '3.8'

services:
  video-processor:
    build: .
    environment:
      - RTSP_URL=rtsp://camera:554/stream
      - MQTT_BROKER=mqtt://broker:1883
      - LOG_LEVEL=info
    volumes:
      - ./config:/app/config
    restart: unless-stopped
    
  mqtt-broker:
    image: eclipse-mosquitto:latest
    ports:
      - "1883:1883"

K3s/Kubernetes Deployment

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
apiVersion: apps/v1
kind: Deployment
metadata:
  name: video-processor
spec:
  replicas: 1
  selector:
    matchLabels:
      app: video-processor
  template:
    metadata:
      labels:
        app: video-processor
    spec:
      containers:
      - name: video-processor
        image: video-processor:latest
        env:
        - name: RTSP_URL
          valueFrom:
            configMapKeyRef:
              name: video-config
              key: rtsp-url
        resources:
          requests:
            memory: "256Mi"
            cpu: "500m"
          limits:
            memory: "512Mi"
            cpu: "1000m"

18.3 CI/CD Pipeline

CI/CD pipeline example for production deployment:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# .github/workflows/deploy.yml
name: Build and Deploy

on:
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-go@v4
        with:
          go-version: '1.23'
      
      - name: Run tests
        run: |
          go test -v -race -coverprofile=coverage.out ./...          
      
      - name: Upload coverage
        uses: codecov/codecov-action@v3
        with:
          file: ./coverage.out
  
  build:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Build Docker image
        run: |
          docker build -t video-processor:${{ github.sha }} .
          docker tag video-processor:${{ github.sha }} video-processor:latest          
      
      - name: Push to registry
        run: |
          echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin
          docker push video-processor:${{ github.sha }}
          docker push video-processor:latest          
  
  deploy:
    needs: build
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Kubernetes
        run: |
          kubectl set image deployment/video-processor \
            video-processor=video-processor:${{ github.sha }} \
            -n production          

GitLab CI example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# .gitlab-ci.yml
stages:
  - test
  - build
  - deploy

test:
  stage: test
  image: golang:1.23
  script:
    - go test -v -race ./...
    - go vet ./...

build:
  stage: build
  image: docker:latest
  services:
    - docker:dind
  script:
    - docker build -t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA .
    - docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
    - docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA $CI_REGISTRY_IMAGE:latest
    - docker push $CI_REGISTRY_IMAGE:latest

deploy:
  stage: deploy
  image: bitnami/kubectl:latest
  script:
    - kubectl set image deployment/video-processor video-processor=$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
  only:
    - main

18.4 Blue-Green Deployment

Blue-green strategy for zero downtime deployment:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# blue-green-deployment.yaml
apiVersion: v1
kind: Service
metadata:
  name: video-processor-service
spec:
  selector:
    app: video-processor
    version: blue  # or green
  ports:
  - port: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: video-processor-blue
spec:
  replicas: 3
  template:
    metadata:
      labels:
        app: video-processor
        version: blue
    spec:
      containers:
      - name: video-processor
        image: video-processor:v1.0.0
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: video-processor-green
spec:
  replicas: 0  # Closed initially
  template:
    metadata:
      labels:
        app: video-processor
        version: green
    spec:
      containers:
      - name: video-processor
        image: video-processor:v1.1.0

Blue-green deployment script:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/bash
# blue-green-deploy.sh

CURRENT_VERSION=$(kubectl get service video-processor-service -o jsonpath='{.spec.selector.version}')
NEW_VERSION=$([ "$CURRENT_VERSION" == "blue" ] && echo "green" || echo "blue")
NEW_IMAGE="video-processor:v1.2.0"

echo "Current version: $CURRENT_VERSION"
echo "Deploying to: $NEW_VERSION"

# Scale up new version
kubectl scale deployment video-processor-$NEW_VERSION --replicas=3
kubectl set image deployment/video-processor-$NEW_VERSION video-processor=$NEW_IMAGE

# Health check
echo "Waiting for new version to be ready..."
kubectl wait --for=condition=available --timeout=300s deployment/video-processor-$NEW_VERSION

# Route service to new version
kubectl patch service video-processor-service -p "{\"spec\":{\"selector\":{\"version\":\"$NEW_VERSION\"}}}"

# Scale down old version
kubectl scale deployment video-processor-$CURRENT_VERSION --replicas=0

echo "Deployment complete. New version: $NEW_VERSION"

18.5 Rollback Mechanism

Mechanism for quick rollback:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
    "context"
    "fmt"
    "os/exec"
    "time"
)

type DeploymentManager struct {
    currentVersion string
    previousVersion string
    kubeconfig     string
}

func (dm *DeploymentManager) Rollback() error {
    if dm.previousVersion == "" {
        return fmt.Errorf("no previous version to rollback to")
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()
    
    // Rollback to old version
    cmd := exec.CommandContext(ctx, "kubectl", 
        "set", "image", "deployment/video-processor",
        fmt.Sprintf("video-processor=video-processor:%s", dm.previousVersion),
    )
    
    if err := cmd.Run(); err != nil {
        return fmt.Errorf("rollback failed: %w", err)
    }
    
    // Health check
    if err := dm.waitForDeployment(ctx); err != nil {
        return fmt.Errorf("rollback health check failed: %w", err)
    }
    
    dm.currentVersion, dm.previousVersion = dm.previousVersion, dm.currentVersion
    return nil
}

func (dm *DeploymentManager) waitForDeployment(ctx context.Context) error {
    cmd := exec.CommandContext(ctx, "kubectl", 
        "wait", "--for=condition=available",
        "--timeout=300s", "deployment/video-processor",
    )
    return cmd.Run()
}

Kubernetes rollback command:

1
2
3
4
5
6
7
8
# Rollback to last successful revision
kubectl rollout undo deployment/video-processor

# Rollback to specific revision
kubectl rollout undo deployment/video-processor --to-revision=3

# View rollout history
kubectl rollout history deployment/video-processor

18.6 Canary Deployment

Canary strategy for gradual deployment:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
apiVersion: v1
kind: Service
metadata:
  name: video-processor
spec:
  selector:
    app: video-processor
  ports:
  - port: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: video-processor-stable
spec:
  replicas: 9
  template:
    metadata:
      labels:
        app: video-processor
        version: stable
    spec:
      containers:
      - name: video-processor
        image: video-processor:v1.0.0
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: video-processor-canary
spec:
  replicas: 1  # 10% traffic
  template:
    metadata:
      labels:
        app: video-processor
        version: canary
    spec:
      containers:
      - name: video-processor
        image: video-processor:v1.1.0

18.7 Pre-Deployment Checklist

Checklist before deploying to production:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import (
    "context"
    "fmt"
    "os/exec"
    "time"
)

type PreDeploymentChecklist struct {
    checks []Check
}

type Check struct {
    Name        string
    Description string
    Validate    func() error
}

func NewPreDeploymentChecklist() *PreDeploymentChecklist {
    return &PreDeploymentChecklist{
        checks: []Check{
            {
                Name:        "Unit Tests",
                Description: "Are all unit tests passing?",
                Validate: func() error {
                    cmd := exec.Command("go", "test", "./...")
                    return cmd.Run()
                },
            },
            {
                Name:        "Integration Tests",
                Description: "Are integration tests passing?",
                Validate: func() error {
                    cmd := exec.Command("go", "test", "-tags=integration", "./...")
                    return cmd.Run()
                },
            },
            {
                Name:        "Security Scan",
                Description: "Has security scan been performed?",
                Validate: func() error {
                    // Scan with Trivy, Snyk, etc.
                    return nil
                },
            },
            {
                Name:        "Performance Test",
                Description: "Are performance tests passing?",
                Validate: func() error {
                    // Check load test results
                    return nil
                },
            },
        },
    }
}

func (pdc *PreDeploymentChecklist) Run() error {
    for _, check := range pdc.checks {
        fmt.Printf("Checking: %s...\n", check.Name)
        if err := check.Validate(); err != nil {
            return fmt.Errorf("%s failed: %w", check.Name, err)
        }
        fmt.Printf("โœ“ %s passed\n", check.Name)
    }
    return nil
}

19. Security

Security is critical in production environments. Since edge processing systems handle sensitive video data, security layers must be implemented.

19.1 Credential Management and Secret Rotation

If username and password are used in RTSP URLs, use environment variables or secret management:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import (
    "crypto/tls"
    "fmt"
    "net/url"
    "os"
    "time"
    "sync"
)

type CredentialManager struct {
    secrets     map[string]string
    lastRotated time.Time
    mutex       sync.RWMutex
}

func NewCredentialManager() *CredentialManager {
    cm := &CredentialManager{
        secrets: make(map[string]string),
    }
    cm.loadSecrets()
    return cm
}

func (cm *CredentialManager) loadSecrets() {
    // Load from Kubernetes secrets or HashiCorp Vault
    cm.secrets["rtsp_username"] = os.Getenv("RTSP_USERNAME")
    cm.secrets["rtsp_password"] = os.Getenv("RTSP_PASSWORD")
}

func (cm *CredentialManager) GetRTSPURL(baseURL string) string {
    cm.mutex.RLock()
    defer cm.mutex.RUnlock()
    
    username := cm.secrets["rtsp_username"]
    password := cm.secrets["rtsp_password"]
    
    if username != "" && password != "" {
        return fmt.Sprintf("rtsp://%s:%s@%s", username, password, baseURL)
    }
    return baseURL
}

// Safe logging helper: mask credentials in RTSP URLs before logging
func maskCredentials(rawURL string) string {
    u, err := url.Parse(rawURL)
    if err != nil {
        return rawURL
    }
    if u.User != nil {
        u.User = url.UserPassword("***", "***")
    }
    return u.String()
}

// Secret rotation - periodically refresh secrets
func (cm *CredentialManager) RotateSecrets() {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()
    
    // Load new secrets
    cm.loadSecrets()
    cm.lastRotated = time.Now()
}

19.2 RTSP Authentication and TLS

To secure RTSP connections:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import (
    "crypto/tls"
    "net/url"
)

type SecureRTSPClient struct {
    tlsConfig *tls.Config
}

func NewSecureRTSPClient(certFile, keyFile string) (*SecureRTSPClient, error) {
    cert, err := tls.LoadX509KeyPair(certFile, keyFile)
    if err != nil {
        return nil, err
    }
    
    return &SecureRTSPClient{
        tlsConfig: &tls.Config{
            Certificates: []tls.Certificate{cert},
            MinVersion:   tls.VersionTLS12,
        },
    }, nil
}

// Use RTSP over TLS (RTSPS)
func (src *SecureRTSPClient) GetSecureRTSPURL(baseURL string) string {
    u, err := url.Parse(baseURL)
    if err != nil {
        return baseURL
    }
    
    // Use RTSPS (RTSP over TLS)
    if u.Scheme == "rtsp" {
        u.Scheme = "rtsps"
    }
    
    return u.String()
}

19.3 Video Stream Encryption (SRTP)

Use SRTP (Secure Real-time Transport Protocol) to encrypt real-time video streams:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import (
    "crypto/aes"
    "crypto/cipher"
    "crypto/rand"
    "io"
)

type StreamEncryptor struct {
    aesgcm cipher.AEAD
    nonce  []byte
}

func NewStreamEncryptor(key []byte) (*StreamEncryptor, error) {
    block, err := aes.NewCipher(key)
    if err != nil {
        return nil, err
    }
    
    aesgcm, err := cipher.NewGCM(block)
    if err != nil {
        return nil, err
    }
    
    nonce := make([]byte, aesgcm.NonceSize())
    if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
        return nil, err
    }
    
    return &StreamEncryptor{
        aesgcm: aesgcm,
        nonce:  nonce,
    }, nil
}

func (se *StreamEncryptor) EncryptFrame(frameData []byte) ([]byte, error) {
    return se.aesgcm.Seal(nil, se.nonce, frameData, nil), nil
}

func (se *StreamEncryptor) DecryptFrame(encryptedData []byte) ([]byte, error) {
    return se.aesgcm.Open(nil, se.nonce, encryptedData, nil)
}

19.4 API Authentication & Authorization

Authentication and authorization for HTTP API endpoints:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import (
    "fmt"
    "net/http"
    "time"
    
    "github.com/golang-jwt/jwt/v5"
)

type AuthMiddleware struct {
    secretKey []byte
}

func NewAuthMiddleware(secretKey string) *AuthMiddleware {
    return &AuthMiddleware{
        secretKey: []byte(secretKey),
    }
}

func (am *AuthMiddleware) Authenticate(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        token := r.Header.Get("Authorization")
        if token == "" {
            http.Error(w, "Unauthorized", http.StatusUnauthorized)
            return
        }
        
        // Validate JWT token
        claims, err := am.validateToken(token)
        if err != nil {
            http.Error(w, "Invalid token", http.StatusUnauthorized)
            return
        }
        
        // Role-based authorization
        if !am.hasPermission(claims.Role, r.URL.Path) {
            http.Error(w, "Forbidden", http.StatusForbidden)
            return
        }
        
        next(w, r)
    }
}

func (am *AuthMiddleware) validateToken(tokenString string) (*Claims, error) {
    token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
        return am.secretKey, nil
    })
    
    if err != nil {
        return nil, err
    }
    
    if claims, ok := token.Claims.(*Claims); ok && token.Valid {
        return claims, nil
    }
    
    return nil, fmt.Errorf("invalid token")
}

func (am *AuthMiddleware) hasPermission(role, path string) bool {
    // Simple role-based authorization
    // More complex system can be used in production
    
    // Admin can access all endpoints
    if role == "admin" {
        return true
    }
    
    // User can only access read-only endpoints
    if role == "user" {
        return path == "/api/stats" || path == "/api/health"
    }
    
    // Viewer can only access preview endpoints
    if role == "viewer" {
        return path == "/preview" || path == "/preview/list"
    }
    
    return false
}

type Claims struct {
    Username string `json:"username"`
    Role     string `json:"role"`
    jwt.RegisteredClaims
}

19.5 Container Security Best Practices

Security measures for Docker containers:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# Use non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
USER appuser

# Use minimal base image
FROM gcr.io/distroless/base-debian11

# Only expose necessary ports
EXPOSE 8080

# Read-only filesystem (if possible)
# docker run --read-only --tmpfs /tmp

# Security scanning
# docker scan <image-name>

Kubernetes Security Context:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
apiVersion: v1
kind: Pod
spec:
  securityContext:
    runAsNonRoot: true
    runAsUser: 1000
    fsGroup: 1000
    seccompProfile:
      type: RuntimeDefault
  containers:
  - name: video-processor
    securityContext:
      allowPrivilegeEscalation: false
      readOnlyRootFilesystem: true
      capabilities:
        drop:
        - ALL

19.6 Certificate Management

TLS certificate management:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import (
    "crypto/tls"
    "crypto/x509"
    "os"
    "time"
    "sync"
)

type CertificateManager struct {
    cert     *tls.Certificate
    certPool *x509.CertPool
    mutex    sync.RWMutex
}

func NewCertificateManager(certFile, keyFile, caFile string) (*CertificateManager, error) {
    cert, err := tls.LoadX509KeyPair(certFile, keyFile)
    if err != nil {
        return nil, err
    }
    
    caCert, err := os.ReadFile(caFile)
    if err != nil {
        return nil, err
    }
    
    certPool := x509.NewCertPool()
    if !certPool.AppendCertsFromPEM(caCert) {
        return nil, fmt.Errorf("failed to parse CA certificate")
    }
    
    return &CertificateManager{
        cert:     &cert,
        certPool: certPool,
    }, nil
}

// Check certificate expiration
func (cm *CertificateManager) CheckExpiration() (time.Duration, error) {
    cm.mutex.RLock()
    defer cm.mutex.RUnlock()
    
    cert, err := x509.ParseCertificate(cm.cert.Certificate[0])
    if err != nil {
        return 0, err
    }
    
    return time.Until(cert.NotAfter), nil
}

// Placeholder for certificate renewal logic (e.g., ACME / Let's Encrypt)
func (cm *CertificateManager) renewCertificate() error {
    // In a real system, implement ACME client or call an external certificate manager
    return fmt.Errorf("certificate renewal not implemented")
}

// For auto-renewal
func (cm *CertificateManager) StartAutoRenewal() {
    go func() {
        ticker := time.NewTicker(24 * time.Hour)
        defer ticker.Stop()
        
        for range ticker.C {
            timeLeft, err := cm.CheckExpiration()
            if err != nil {
                continue
            }
            
            // Renew if less than 30 days left
            if timeLeft < 30*24*time.Hour {
                cm.renewCertificate()
            }
        }
    }()
}

19.7 Input Validation and Sanitization

Validate frame data before processing:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import (
    "errors"
    "fmt"
)

func validateFrame(frame *Frame) error {
    if frame == nil {
        return errors.New("frame is nil")
    }
    if len(frame.Data) == 0 {
        return errors.New("frame data is empty")
    }
    expectedSize := frame.Width * frame.Height * 3
    if len(frame.Data) != expectedSize {
        return fmt.Errorf("frame size mismatch: expected %d, got %d", expectedSize, len(frame.Data))
    }
    
    // Size limits
    if frame.Width > 4096 || frame.Height > 4096 {
        return errors.New("frame dimensions too large")
    }
    
    return nil
}

19.8 Security Monitoring and Audit Logging

Logging security events:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import (
    "github.com/sirupsen/logrus"
    "time"
)

type SecurityLogger struct {
    logger *logrus.Logger
}

func (sl *SecurityLogger) LogSecurityEvent(eventType string, details map[string]interface{}) {
    sl.logger.WithFields(logrus.Fields{
        "event_type": eventType,
        "timestamp":  time.Now(),
        "details":    details,
    }).Warn("Security event")
}

// Usage examples:
// sl.LogSecurityEvent("authentication_failure", map[string]interface{}{
//     "ip": "192.168.1.100",
//     "username": "admin",
// })
// sl.LogSecurityEvent("unauthorized_access", map[string]interface{}{
//     "endpoint": "/api/frames",
//     "user": "user123",
// })

20. Complete Example Application

Example bringing all parts together:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    // Signal handling
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    
    go func() {
        <-sigChan
        log.Println("Shutting down...")
        cancel()
    }()
    
    // Create video source
    source := NewVideoSource("rtsp://camera:554/stream", 640, 480)
    
    // Create pipeline
    pipeline := NewPipeline(source)
    
    // Add motion detector
    motionDetector := NewMotionDetector(0.05, 640, 480) // 5% threshold
    pipeline.AddProcessor(motionDetector)
    
    // Add object detector (optional, graceful degradation on error)
    // objectDetector, err := NewObjectDetector("model.onnx", "config.yaml", "classes.txt")
    // if err == nil {
    //     pipeline.AddProcessor(objectDetector)
    // }
    
    // Add edge decision layer
    mqttPub, err := NewMQTTPublisher("tcp://mqtt:1883", "video/events", "video-processor")
    if err != nil {
        log.Fatalf("Failed to create MQTT publisher: %v", err)
    }
    
    decisionLayer := NewEdgeDecisionLayer(0.7, mqttPub, 5*time.Second)
    pipeline.AddProcessor(decisionLayer)
    
    // Start pipeline (in goroutine)
    pipelineDone := make(chan error, 1)
    go func() {
        pipelineDone <- pipeline.Run(ctx)
    }()
    
    // Wait for shutdown signal
    <-ctx.Done()
    log.Println("Shutting down gracefully...")
    
    // Timeout for graceful shutdown
    shutdownTimeout := 30 * time.Second
    shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
    defer shutdownCancel()
    
    // Stop pipeline with hard timeout fallback
    shutdownDone := make(chan error, 1)
    go func() {
        shutdownDone <- pipeline.Shutdown(shutdownCtx)
    }()

    select {
    case err := <-shutdownDone:
        if err != nil {
            log.Printf("Shutdown error: %v", err)
        } else {
            log.Println("Pipeline stopped successfully")
        }
    case <-time.After(30 * time.Second):
        log.Println("Force killing after shutdown timeout!")
        os.Exit(1)
    }

    // Close MQTT connection (best effort)
    if err := mqttPub.Close(); err != nil {
        log.Printf("MQTT close error: %v", err)
    }
}

21. Frame Recording and Storage

To record frames on important events:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import (
    "fmt"
    "os"
    "sync"
)

type FrameRecorder struct {
    storagePath string
    maxFrames   int
    frames      []*Frame
    mutex       sync.Mutex
}

func (fr *FrameRecorder) RecordFrame(frame *Frame) error {
    fr.mutex.Lock()
    defer fr.mutex.Unlock()
    
    if len(fr.frames) >= fr.maxFrames {
        // Remove oldest frame
        fr.frames = fr.frames[1:]
    }
    
    fr.frames = append(fr.frames, frame)
    return nil
}

func (fr *FrameRecorder) SaveEvent(eventID string) error {
    fr.mutex.Lock()
    frames := make([]*Frame, len(fr.frames))
    copy(frames, fr.frames)
    fr.mutex.Unlock()
    
    // Save frames as JPEG
    for i, frame := range frames {
        jpegData, err := FrameToJPEG(frame, 90)
        if err != nil {
            return err
        }
        filename := fmt.Sprintf("%s/%s_frame_%d.jpg", fr.storagePath, eventID, i)
        if err := os.WriteFile(filename, jpegData, 0644); err != nil {
            return err
        }
    }
    
    return nil
}

22. Alerting and Notifications

Notification mechanism for events:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import (
    "log"
    "time"
)

type AlertManager struct {
    notifiers []Notifier
}

type Notifier interface {
    Send(alert *Alert) error
}

type Alert struct {
    Level       string
    Message     string
    Event       *Event
    Timestamp   time.Time
}

// Email notifier
type EmailNotifier struct {
    smtpServer string
    recipients []string
}

// Slack notifier
type SlackNotifier struct {
    webhookURL string
}

// SMS notifier (Twilio, etc.)
type SMSNotifier struct {
    apiKey string
    phone  string
}

func (am *AlertManager) SendAlert(alert *Alert) {
    for _, notifier := range am.notifiers {
        go func(n Notifier) {
            if err := n.Send(alert); err != nil {
                log.Printf("Failed to send alert: %v", err)
            }
        }(notifier)
    }
}

23. Hardware Requirements

Minimum Requirements (Motion Detection Only)

  • CPU: 2 core, 1.5 GHz+
  • RAM: 512 MB
  • Storage: 1 GB (OS + application)
  • Network: 10 Mbps
  • CPU: 4 core, 2.0 GHz+
  • RAM: 2 GB
  • Storage: 5 GB
  • Network: 100 Mbps
  • GPU (optional): NVIDIA Jetson Nano, Intel Neural Compute Stick

Production Requirements (Multi-Camera)

  • CPU: 8+ core, 2.5 GHz+
  • RAM: 8 GB+
  • Storage: 50 GB+ (for event recording)
  • Network: 1 Gbps
  • GPU: NVIDIA Jetson Xavier NX or dedicated AI accelerator

24. Real-World Experiences and Challenges

Some challenges and solutions we encountered while developing this project:

24.1 FFmpeg Process Management

Problem: FFmpeg process can crash unexpectedly or connection can drop.

Solution: Continuous monitoring and automatic restart mechanism:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (vs *VideoSource) StartWithMonitoring(ctx context.Context) error {
    for {
        if err := vs.Start(ctx); err != nil {
            log.Printf("Failed to start FFmpeg: %v, retrying in 5 seconds...", err)
            select {
            case <-ctx.Done():
                return ctx.Err()
            case <-time.After(5 * time.Second):
                continue
            }
        }
        
        // Check if process is running
        select {
        case <-ctx.Done():
            return ctx.Err()
        case err := <-vs.Errors():
            log.Printf("FFmpeg error: %v, restarting...", err)
            vs.Stop()
            time.Sleep(2 * time.Second)
        }
    }
}

24.2 Memory Leaks

Problem: Memory usage continuously increases when frame buffers are not properly cleaned.

Solution: Use sync.Pool and regular GC calls:

1
2
3
4
5
6
7
8
9
// Call GC every 1000 frames
frameCount := 0
for frame := range frames {
    frameCount++
    if frameCount%1000 == 0 {
        runtime.GC()
    }
    // Frame processing...
}

24.3 Network Connection Issues

Problem: RTSP connections occasionally drop, especially over WiFi.

Solution: Use TCP transport and connection timeout settings:

1
2
# Use TCP instead of UDP (more reliable but slightly slower)
ffmpeg -rtsp_transport tcp -i rtsp://...

24.4 Frame Rate Inconsistencies

Problem: Source provides 30 FPS but pipeline can only process 10-15 FPS.

Solution: Frame skipping and adaptive processing:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type AdaptiveProcessor struct {
    targetFPS     int
    currentFPS    float64
    skipRatio     int
    lastFrameTime time.Time
}

func (ap *AdaptiveProcessor) ShouldProcess() bool {
    now := time.Now()
    if ap.lastFrameTime.IsZero() {
        ap.lastFrameTime = now
        return true
    }
    
    elapsed := now.Sub(ap.lastFrameTime)
    currentFPS := 1.0 / elapsed.Seconds()
    
    if currentFPS > float64(ap.targetFPS)*1.2 {
        // Too fast, skip some frames
        ap.skipRatio++
        return ap.skipRatio%2 == 0
    }
    
    ap.skipRatio = 0
    ap.lastFrameTime = now
    return true
}

24.5 Object Detection Delays

Problem: Object detection model is too slow, blocking the pipeline.

Solution: Async processing and frame buffering:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (od *ObjectDetector) ProcessAsync(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
    // Run detections asynchronously
    detectionResults := make(map[int]*Frame)
    resultMutex := sync.RWMutex{}
    
    frameID := 0
    for frame := range in {
        currentID := frameID
        frameID++
        
        // Pass frame immediately (without detection)
        select {
        case out <- frame:
        case <-ctx.Done():
            return
        }
        
        // Do detection in background
        go func(f *Frame, id int) {
            detections := od.detect(f)
            resultMutex.Lock()
            detectionResults[id] = f
            f.Metadata["detections"] = detections
            resultMutex.Unlock()
        }(frame, currentID)
    }
}

24.6 Multi-Camera Management

Problem: Managing streams from multiple cameras is complex.

Solution: Camera manager pattern:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type CameraManager struct {
    cameras map[string]*VideoSource
    pipelines map[string]*Pipeline
    mutex sync.RWMutex
}

func (cm *CameraManager) AddCamera(id, url string, width, height int) error {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()
    
    source := NewVideoSource(url, width, height)
    pipeline := NewPipeline(source)
    
    cm.cameras[id] = source
    cm.pipelines[id] = pipeline
    
    return pipeline.Run(context.Background())
}

func (cm *CameraManager) RemoveCamera(id string) error {
    cm.mutex.Lock()
    defer cm.mutex.Unlock()
    
    if pipeline, exists := cm.pipelines[id]; exists {
        pipeline.Stop()
        delete(cm.pipelines, id)
    }
    
    if source, exists := cm.cameras[id]; exists {
        source.Stop()
        delete(cm.cameras, id)
    }
    
    return nil
}

25. Troubleshooting Guide

Common issues and solutions:

25.1 FFmpeg Connection Failed

Symptoms: Cannot connect to RTSP stream

Solutions:

  • Check RTSP URL format: rtsp://username:password@ip:port/stream
  • Verify network connectivity: ping <camera-ip>
  • Test with FFmpeg directly: ffmpeg -i rtsp://...
  • Use TCP transport: -rtsp_transport tcp

25.2 High CPU Usage

Symptoms: CPU usage > 90%

Solutions:

  • Reduce frame rate: -r 10 (10 FPS)
  • Lower resolution: -vf scale=320:240
  • Disable object detection (use only motion detection)
  • Use GPU acceleration if available

25.3 Memory Leaks

Symptoms: Memory usage continuously increases

Solutions:

  • Use sync.Pool for frame buffers
  • Call runtime.GC() periodically
  • Check for goroutine leaks with pprof
  • Monitor channel buffer sizes

25.4 Frame Drops

Symptoms: Many frames are dropped

Solutions:

  • Increase channel buffer sizes
  • Reduce processing load (lower resolution/FPS)
  • Use frame skipping mechanism
  • Check network bandwidth

26. Cost Analysis

26.1 Cloud vs Edge Processing Comparison

Cloud Video Processing (AWS Rekognition, Azure Video Analyzer):

  • Per video: ~$0.10-0.50/minute
  • 10 cameras, 24/7: ~$1,440-7,200/month
  • Network cost: ~$200-500/month
  • Total: ~$1,640-7,700/month

Edge Processing (This System):

  • Edge device: ~$50-200 (one-time)
  • Cloud storage (events only): ~$10-50/month
  • Network (events only): ~$5-20/month
  • Total: ~$15-70/month + one-time hardware

Savings: 95-99% cost reduction (excluding hardware cost)

27. Production Best Practices

27.1 Resource Limits

Always set resource limits in Docker/Kubernetes:

1
2
3
4
5
6
7
resources:
  requests:
    memory: "512Mi"
    cpu: "1000m"
  limits:
    memory: "1Gi"
    cpu: "2000m"

27.2 Graceful Shutdown

Ensure all goroutines terminate properly when system shuts down:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (p *Pipeline) Shutdown(ctx context.Context) error {
    // Use external context for timeout / cancellation
    done := make(chan error, 1)
    go func() {
        done <- p.Stop()
    }()

    select {
    case err := <-done:
        return err
    case <-ctx.Done():
        return ctx.Err()
    }
}

27.3 Circuit Breaker Pattern

Use circuit breaker when connecting to external services (object detection API):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import "github.com/sony/gobreaker"

type CircuitBreakerDetector struct {
    detector *ExternalDetector
    cb       *gobreaker.CircuitBreaker
}

func NewCircuitBreakerDetector(detector *ExternalDetector) *CircuitBreakerDetector {
    cb := gobreaker.NewCircuitBreaker(gobreaker.Settings{
        Name:    "ObjectDetection",
        Timeout: 5 * time.Second,
        ReadyToTrip: func(counts gobreaker.Counts) bool {
            return counts.ConsecutiveFailures > 5
        },
    })
    
    return &CircuitBreakerDetector{
        detector: detector,
        cb:       cb,
    }
}

func (cbd *CircuitBreakerDetector) Detect(frame *Frame) ([]Detection, error) {
    result, err := cbd.cb.Execute(func() (interface{}, error) {
        return cbd.detector.callInferenceService(frame)
    })
    
    if err != nil {
        return nil, err
    }
    
    return result.([]Detection), nil
}

27.4 Rate Limiting

Use rate limiting for event publishing:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import "golang.org/x/time/rate"

type RateLimitedPublisher struct {
    publisher EventPublisher
    limiter   *rate.Limiter
}

func NewRateLimitedPublisher(publisher EventPublisher, eventsPerSecond int) *RateLimitedPublisher {
    return &RateLimitedPublisher{
        publisher: publisher,
        limiter:   rate.NewLimiter(rate.Limit(eventsPerSecond), eventsPerSecond),
    }
}

func (rlp *RateLimitedPublisher) Publish(event *Event) error {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()
    
    if err := rlp.limiter.Wait(ctx); err != nil {
        return fmt.Errorf("rate limit exceeded: %w", err)
    }
    
    return rlp.publisher.Publish(event)
}

27.5 Structured Logging

Keep all logs in structured format:

1
2
3
4
5
6
7
8
logger.WithFields(logrus.Fields{
    "camera_id":    cameraID,
    "frame_id":     frameID,
    "event_type":   event.Type,
    "confidence":   event.Confidence,
    "processing_ms": processingTime.Milliseconds(),
    "timestamp":    time.Now().Unix(),
}).Info("Event detected")

27.6 Metrics Collection

Collect and export important metrics:

  • Frame processing rate (FPS)
  • Detection accuracy
  • Error rates
  • Memory usage
  • CPU usage
  • Network latency
  • Event publish success rate

27.7 Configuration Management (Viper)

Get sensitive information from environment variables or secret management:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import "github.com/spf13/viper"

func LoadConfig() (*Config, error) {
    viper.SetConfigName("config")
    viper.SetConfigType("yaml")
    viper.AddConfigPath(".")
    viper.AddConfigPath("/etc/video-processor/")
    
    // Environment variables can override
    viper.SetEnvPrefix("VIDEO")
    viper.AutomaticEnv()
    
    if err := viper.ReadInConfig(); err != nil {
        return nil, err
    }
    
    var config Config
    if err := viper.Unmarshal(&config); err != nil {
        return nil, err
    }
    
    return &config, nil
}

28. Alternative Approaches and Trade-offs

28.1 FFmpeg vs GStreamer

FFmpeg:

  • โœ… More widespread, well documented
  • โœ… Support for many formats
  • โŒ Process overhead (pipe I/O for each frame)
  • โŒ Slightly slower

GStreamer:

  • โœ… More optimized, faster
  • โœ… Plugin architecture
  • โŒ Less documented
  • โŒ More complex setup

Decision: We chose FFmpeg because it’s more widespread and documented. GStreamer may be more performant but FFmpeg is sufficient for our needs.

28.2 GoCV vs External Service

GoCV (In-process):

  • โœ… Low latency
  • โœ… No network dependency
  • โŒ Model included in binary (large binary)
  • โŒ High CPU usage

External Service:

  • โœ… Run model in separate service (easy GPU usage)
  • โœ… Smaller binary
  • โŒ Network latency
  • โŒ Additional service management

Decision: We use GoCV for small models, external service for large models.

28.3 MQTT vs Kafka vs Webhook

MQTT:

  • โœ… Lightweight, ideal for edge
  • โœ… Low latency
  • โœ… QoS support
  • โŒ No message ordering guarantee

Kafka:

  • โœ… High throughput
  • โœ… Message ordering
  • โŒ Heavier, too much for edge
  • โŒ Higher latency

Webhook:

  • โœ… Simple HTTP
  • โœ… Easy integration
  • โŒ No retry mechanism
  • โŒ Network dependency

Decision: We use MQTT for sending events from edge to cloud, Kafka when high throughput is required.

28.4 Alternative Approaches to FFmpeg

Go-based video processing libraries are also available besides FFmpeg:

github.com/3d0c/gmf (Go Media Framework):

  • โœ… Pure Go implementation
  • โœ… No FFmpeg dependency
  • โŒ Limited format support
  • โŒ Slow active development

github.com/asticode/go-astits:

  • โœ… MPEG-TS stream parsing
  • โœ… Pure Go
  • โŒ Only MPEG-TS format

Why FFmpeg Was Preferred?

  1. Format support: 100+ video/audio formats
  2. Stability: Used in production for years
  3. Performance: Written in C, optimized
  4. Documentation: Comprehensive and up-to-date
  5. Community support: Large user base

Decision: We chose FFmpeg because format support, stability, and performance were critical. Go-based alternatives may mature in the future.

29. Machine Learning Model Deployment

ML model deployment in edge processing systems is a critical topic. Model versioning, A/B testing, and canary deployment strategies:

29.1 Model Versioning

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
type ModelManager struct {
    models      map[string]*Model
    activeModel string
    mutex       sync.RWMutex
}

type Model struct {
    Version     string
    Path        string
    LoadedAt    time.Time
    Performance ModelMetrics
}

// Predict is a placeholder for the actual inference logic of the model.
func (m *Model) Predict(frame *Frame) ([]Detection, error) {
    // In a real system, this would run inference using m.Path / loaded net, etc.
    return nil, nil
}

func (mm *ModelManager) LoadModel(version, path string) error {
    mm.mutex.Lock()
    defer mm.mutex.Unlock()
    
    // Load model
    model := &Model{
        Version:  version,
        Path:     path,
        LoadedAt: time.Now(),
    }
    
    mm.models[version] = model
    
    // Make active if first model
    if mm.activeModel == "" {
        mm.activeModel = version
    }
    
    return nil
}

func (mm *ModelManager) SwitchModel(version string) error {
    mm.mutex.Lock()
    defer mm.mutex.Unlock()
    
    if _, exists := mm.models[version]; !exists {
        return fmt.Errorf("model version %s not found", version)
    }
    
    mm.activeModel = version
    return nil
}

29.2 A/B Testing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type ABTester struct {
    modelA      *Model
    modelB      *Model
    splitRatio  float64  // 0.0-1.0, for modelA
    results     map[string]ABResult
    mutex       sync.RWMutex
}

type ABResult struct {
    ModelAAccuracy float64
    ModelBAccuracy float64
    ModelALatency  time.Duration
    ModelBLatency  time.Duration
}

func (ab *ABTester) Predict(frame *Frame) ([]Detection, error) {
    // Randomly select model (based on split ratio)
    useModelA := rand.Float64() < ab.splitRatio
    
    var model *Model
    var modelName string
    if useModelA {
        model = ab.modelA
        modelName = "A"
    } else {
        model = ab.modelB
        modelName = "B"
    }
    
    start := time.Now()
    detections, err := model.Predict(frame)
    latency := time.Since(start)
    
    // Record results
    ab.recordResult(modelName, detections, latency)
    
    return detections, err
}

// recordResult stores basic statistics about A/B test runs (placeholder implementation)
func (ab *ABTester) recordResult(modelName string, detections []Detection, latency time.Duration) {
    ab.mutex.Lock()
    defer ab.mutex.Unlock()

    // Initialize results map if needed
    if ab.results == nil {
        ab.results = make(map[string]ABResult)
    }

    res := ab.results[modelName]
    if modelName == "A" {
        res.ModelALatency = latency
    } else {
        res.ModelBLatency = latency
    }
    ab.results[modelName] = res
}

29.3 Model Performance Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type ModelMetrics struct {
    TotalInferences   int64
    AverageLatency    time.Duration
    Accuracy          float64
    FalsePositives    int64
    FalseNegatives    int64
    mutex             sync.RWMutex
}

func (mm *ModelMetrics) RecordInference(latency time.Duration, correct bool) {
    mm.mutex.Lock()
    defer mm.mutex.Unlock()
    
    mm.TotalInferences++
    
    // Moving average latency
    if mm.AverageLatency == 0 {
        mm.AverageLatency = latency
    } else {
        mm.AverageLatency = (mm.AverageLatency + latency) / 2
    }
    
    // Calculate accuracy
    if correct {
        mm.Accuracy = float64(mm.TotalInferences-mm.FalsePositives-mm.FalseNegatives) / float64(mm.TotalInferences)
    }
}

29.4 Hardware Acceleration and GPU Support

GPU support is critical for integration with edge AI devices:

NVIDIA Jetson Integration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import "gocv.io/x/gocv"

func NewGPUAcceleratedDetector(modelPath string) (*ObjectDetector, error) {
    net := gocv.ReadNet(modelPath, "")
    if net.Empty() {
        return nil, fmt.Errorf("failed to load model")
    }
    
    // Use CUDA backend
    net.SetPreferableBackend(gocv.NetBackendCUDA)
    net.SetPreferableTarget(gocv.NetTargetCUDA)
    
    return &ObjectDetector{net: net}, nil
}

Intel Neural Compute Stick (OpenVINO)

1
2
3
// With OpenVINO backend
net.SetPreferableBackend(gocv.NetBackendOpenVINO)
net.SetPreferableTarget(gocv.NetTargetVPU) // For Intel NCS

CUDA, OpenCL, Vulkan Comparison

API Platform Performance Edge Suitability
CUDA NVIDIA โญโญโญโญโญ Jetson series
OpenCL Cross-platform โญโญโญ General GPUs
Vulkan Cross-platform โญโญโญโญ Modern GPUs

Recommendation: CUDA for NVIDIA Jetson, OpenVINO for general use.

30. Video Quality Metrics

Various metrics can be used to measure video quality:

30.1 PSNR (Peak Signal-to-Noise Ratio)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import "math"

func CalculatePSNR(original, processed *Frame) float64 {
    mse := calculateMSE(original, processed)
    if mse == 0 {
        return math.Inf(1)  // Perfect match
    }
    
    maxPixelValue := 255.0
    psnr := 20 * math.Log10(maxPixelValue/math.Sqrt(mse))
    return psnr
}

func calculateMSE(original, processed *Frame) float64 {
    if len(original.Data) != len(processed.Data) {
        return math.MaxFloat64
    }
    
    var sum float64
    for i := 0; i < len(original.Data); i++ {
        diff := float64(original.Data[i]) - float64(processed.Data[i])
        sum += diff * diff
    }
    
    return sum / float64(len(original.Data))
}

30.2 SSIM (Structural Similarity Index)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import "math"

func CalculateSSIM(original, processed *Frame) float64 {
    // Simplified SSIM implementation
    // Full implementation requires luminance, contrast, structure calculations
    
    mu1 := calculateMean(original)
    mu2 := calculateMean(processed)
    
    sigma1 := calculateStdDev(original, mu1)
    sigma2 := calculateStdDev(processed, mu2)
    sigma12 := calculateCovariance(original, processed, mu1, mu2)
    
    c1 := 0.01 * 255 * 0.01 * 255
    c2 := 0.03 * 255 * 0.03 * 255
    
    numerator := (2*mu1*mu2 + c1) * (2*sigma12 + c2)
    denominator := (mu1*mu1 + mu2*mu2 + c1) * (sigma1*sigma1 + sigma2*sigma2 + c2)
    
    return numerator / denominator
}

func calculateMean(frame *Frame) float64 {
    var sum float64
    for _, v := range frame.Data {
        sum += float64(v)
    }
    return sum / float64(len(frame.Data))
}

func calculateStdDev(frame *Frame, mean float64) float64 {
    var sum float64
    for _, v := range frame.Data {
        diff := float64(v) - mean
        sum += diff * diff
    }
    return math.Sqrt(sum / float64(len(frame.Data)))
}

func calculateCovariance(f1, f2 *Frame, mean1, mean2 float64) float64 {
    var sum float64
    minLen := len(f1.Data)
    if len(f2.Data) < minLen {
        minLen = len(f2.Data)
    }
    
    for i := 0; i < minLen; i++ {
        sum += (float64(f1.Data[i]) - mean1) * (float64(f2.Data[i]) - mean2)
    }
    return sum / float64(minLen)
}

31. Load Balancing and High Availability

Load balancing and failover mechanisms for high availability in production environments:

31.1 Multi-Instance Deployment

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type LoadBalancer struct {
    instances []*VideoProcessor
    current   int
    mutex     sync.Mutex
}

func (lb *LoadBalancer) ProcessFrame(frame *Frame) error {
    lb.mutex.Lock()
    instance := lb.instances[lb.current]
    lb.current = (lb.current + 1) % len(lb.instances)
    lb.mutex.Unlock()
    
    return instance.Process(frame)
}

func (lb *LoadBalancer) HealthCheck() {
    lb.mutex.Lock()
    defer lb.mutex.Unlock()
    
    for i, instance := range lb.instances {
        if !instance.IsHealthy() {
            // Remove unhealthy instance from list
            lb.removeInstance(i)
        }
    }
}

func (lb *LoadBalancer) removeInstance(index int) {
    if index < 0 || index >= len(lb.instances) {
        return
    }
    
    // Remove instance from list
    lb.instances = append(lb.instances[:index], lb.instances[index+1:]...)
    
    // Update current index
    if lb.current >= len(lb.instances) {
        lb.current = 0
    }
}

31.2 Failover Mechanism

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type FailoverManager struct {
    primary   *VideoProcessor
    secondary *VideoProcessor
    active    *VideoProcessor
    mutex     sync.RWMutex
}

func (fm *FailoverManager) ProcessFrame(frame *Frame) error {
    fm.mutex.RLock()
    active := fm.active
    fm.mutex.RUnlock()
    
    err := active.Process(frame)
    if err != nil {
        // Failover
        fm.switchToSecondary()
        return fm.active.Process(frame)
    }
    
    return nil
}

func (fm *FailoverManager) switchToSecondary() {
    fm.mutex.Lock()
    defer fm.mutex.Unlock()
    
    if fm.active == fm.primary {
        fm.active = fm.secondary
    } else {
        fm.active = fm.primary
    }
}

32. Conclusion

In this article, we shared the core components and architecture of a real-time video analysis system used in production. The edge processing approach with Go enables excellent development of high-performance, scalable, and cost-effective systems. Go’s goroutine model provides an ideal structure for video processing pipelines and allows us to easily manage frame flow through channels.

32.1 Our Project Experiences

Key experiences we gained while developing and using this system in production:

  1. Modular architecture is critical - Each component being able to work independently makes system maintenance and development easier
  2. Error handling and resilience - Not everything goes smoothly in production, so graceful degradation and retry mechanisms are essential
  3. You’re flying blind without monitoring - System health and performance must be continuously monitored
  4. Edge processing really saves costs - 95%+ savings possible compared to cloud processing
  5. Go’s concurrency model is ideal for video processing - Goroutine and channel structure makes managing pipelines very easy
  6. C to Go migration experience - Rewriting a system written in C with Go provided significant improvements in both performance and developer experience

32.2 Summary

  • Edge processing approach reduces latency and optimizes bandwidth usage
  • Go’s concurrency model allows us to efficiently manage video pipelines
  • Modular architecture allows us to easily add and remove different analysis techniques
  • Graceful degradation and error handling enable building reliable systems in production environments
  • Techniques like ROI detection and network optimization significantly improve system performance
  • Machine learning model deployment strategies ensure continuous system improvement

32.3 Next Steps

  1. GPU support - Integration with edge AI devices like NVIDIA Jetson, Intel Neural Compute Stick (Section 29.4 started)
  2. Distributed processing - Distributed edge processing for multiple cameras
  3. Real-time streaming - Streaming analyzed frames in real-time
  4. Advanced analytics - Face recognition, behavior analysis, tracking algorithms
  5. Edge-to-cloud sync - Combining analysis results in cloud and reporting
  6. WebRTC integration - Browser-based real-time preview and control
  7. Advanced compression - AV1 codec support and adaptive compression

32.4 Advanced Topics (Separate Article Topics)

This article covers basic and intermediate topics. The following topics are recommended for separate articles:

Distributed Edge Processing:

  • Leader election and consensus algorithms
  • Edge-cloud synchronization (eventual consistency)
  • Multi-edge device coordination

CI/CD and GitOps:

  • Multi-architecture Docker images (arm/v7, arm64, amd64)
  • Edge deployment with ArgoCD, Flux
  • Automated testing and deployment pipelines

Monitoring & Observability:

  • Distributed tracing (Jaeger, OpenTelemetry)
  • Metric analysis for anomaly detection
  • Advanced alerting strategies

Security In-Depth:

  • SRTP, DTLS video stream encryption
  • ONVIF, digest auth camera authentication
  • Container security (AppArmor, Seccomp)

Legal and Ethical:

  • GDPR, KVKK compliance
  • Video recording storage and disposal policies
  • Privacy-by-design approaches

Performance Optimization:

  • Model quantization, pruning, distillation
  • Hardware codec integration (VA-API, VDPAU, NVDEC)
  • Memory management optimizations (HugeTLB, alignment)

Network Protocols:

  • SRT (Secure Reliable Transport)
  • Streaming over QUIC protocol
  • Time-Sensitive Networking (TSN) for industrial environments

32.5 Code Examples and Repository

The code examples shared in this article consist of core components of a system used in production. For full implementation and additional features:

Note: Code examples and full implementation have been customized according to project requirements. The examples in this article demonstrate the basic architecture and approach of the system.

32.6 Learning Resources

32.7 Conclusion and Recommendations

This system has been successfully running in production and provides the following advantages:

  1. 95%+ cost savings - Edge processing instead of cloud processing
  2. <100ms latency - Real-time decision making
  3. Scalable architecture - Each camera works independently
  4. Reliable system - Graceful degradation and error recovery

Recommendations for getting started:

  • Start with a single camera first
  • Test with motion detection
  • Add object detection later
  • Do load testing before going to production
  • Always set up monitoring and alerting

Note: This article has been prepared as a technical guide. Comprehensive testing is recommended before using in production environments.