Real-Time Video Analysis and Edge Processing with Go
Summary
- Edge Processing: Processing data locally without sending it to the center
- Go + Video Pipeline: High-performance video processing with goroutine and channel structure
- Production Ready: Motion detection, object detection, event publishing, monitoring
- Cost Savings: 95%+ savings compared to cloud processing
Note: This article shares the core components of a video analysis system used in production. Code examples and architectural decisions are based on real project experiences.
1. Introduction: Video Data and the Need for Edge Processing
Today, video constitutes the largest portion of generated data. CCTV systems, live streaming infrastructure, smart cities, industrial cameras, and IoT devices make real-time video analysis an inevitable need.
At this point, two fundamental problems arise:
- Latency - Sending data to the center takes time
- Bandwidth and cost - Raw video streams are quite expensive
One of the most effective ways to solve these problems is the Edge Processing approach.
Project Story
The need to write this article arose from my experience of rewriting a video analysis system I had previously developed in C language with Go (Golang). The system written in C was delivering excellent results in terms of performance and was successfully running in production. However, during the maintenance and development processes of the system, I realized how valuable some of the advantages that Go provides are for such a project.
During the rewriting process with Go, I was able to achieve performance very close to what I had in C, with a practically acceptable difference. In addition, I discovered how suitable the goroutine and channel structure is for video pipelines. Go’s concurrency model offers a cleaner and safer developer experience compared to C’s thread management and memory management approaches. Also, Go’s cross-platform compilation capability made it very easy for the system to run on different edge devices (Raspberry Pi, x86, ARM). While Go’s GC and runtime do introduce some overhead in theory, the gains in development speed and maintainability more than compensated for this in practice.
As a result of this experience, I saw that Go-based edge video processing systems are a strong alternative both in terms of performance and developer experience. C’s performance advantages are undeniable, but the modern language features and developer-friendly approach that Go offers can be an important reason for preference in such systems. This article has been prepared to guide developers who want to develop similar systems based on these experiences.
2. What is Edge Processing?
Edge processing is the processing of data without sending it to a central server, near the point where it is generated (camera, gateway, edge node).
2.1 Advantages
- Low latency - Results are obtained in milliseconds since data is processed locally
- Less bandwidth usage - Only analysis results (events) are sent, raw video is not sent
- Better data privacy - Sensitive images are not sent to the center
- Scalable architecture - Each edge node works independently, no single point of failure in the central system
- Cost effectiveness - Cloud compute costs decrease
2.2 Use Cases
- Smart security systems - Anomaly detection, face recognition
- Industrial quality control - Fast decision making on production lines
- Traffic management - Vehicle counting, traffic analysis
- Retail analytics - Customer behavior analysis, heat map creation
3. Why Go (Golang)?
Go is a very strong candidate for edge and real-time systems:
- High performance - Native binary, low GC overhead, C/C++ level speed
- Concurrency - Concurrent video pipelines are easily managed with goroutine & channel structure
- Low memory usage - Ideal for edge devices’ limited resources
- Cross-platform compilation - Single source code for Linux ARM (Raspberry Pi), x86, Windows
- Easy containerization - Compatible with Docker, k3s, Kubernetes Edge (K3s)
- Static binary - All dependencies in binary, easy distribution
- Rich standard library -
net, context, sync packages are perfect for video processing
4. General Architecture
5. Quick Start Guide
Step-by-step guide to set up and run the system:
5.1 Requirements
1
2
3
4
5
6
7
8
9
|
# Go 1.21+ must be installed
go version
# FFmpeg must be installed
ffmpeg -version
# (Optional) For OpenCV and GoCV
# Linux: apt-get install libopencv-dev
# macOS: brew install opencv
|
5.2 Project Structure
1
2
3
4
5
6
7
8
9
10
11
12
|
video-processor/
โโโ cmd/
โ โโโ main.go
โโโ internal/
โ โโโ pipeline/
โ โโโ detector/
โ โโโ source/
โ โโโ publisher/
โโโ config/
โ โโโ config.yaml
โโโ go.mod
โโโ Dockerfile
|
5.3 Basic Setup
1
2
3
4
5
6
7
8
9
10
11
|
# Create the project
mkdir video-processor && cd video-processor
go mod init video-processor
# Add dependencies
go get github.com/eclipse/paho.mqtt.golang
go get gopkg.in/yaml.v3
go get github.com/sirupsen/logrus
# (Optional) GoCV
go get gocv.io/x/gocv
|
GoCV installation varies by platform:
Linux (Ubuntu/Debian)
1
2
3
4
5
6
7
8
9
10
11
12
|
# Install OpenCV dependencies
sudo apt-get update
sudo apt-get install -y \
libopencv-dev \
pkg-config \
libavcodec-dev \
libavformat-dev \
libavutil-dev \
libswscale-dev
# Install GoCV
go get gocv.io/x/gocv
|
macOS
1
2
3
4
5
|
# Install OpenCV with Homebrew
brew install opencv pkg-config
# Install GoCV
go get gocv.io/x/gocv
|
Windows
1
2
3
4
5
6
7
8
9
|
# Install OpenCV with Chocolatey (or manually)
choco install opencv
# Set environment variables
set CGO_CPPFLAGS=-IC:\opencv\build\include
set CGO_LDFLAGS=-LC:\opencv\build\x64\vc15\lib
# Install GoCV
go get gocv.io/x/gocv
|
Portability with Docker
Managing OpenCV dependencies with Docker is the easiest method:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# Build stage
FROM golang:1.23 AS builder
# Install OpenCV dependencies
RUN apt-get update && apt-get install -y \
libopencv-dev \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=1 go build -o video-processor ./cmd/main.go
|
Common Issues and Solutions:
- CGO error: Cannot build without
CGO_ENABLED=1
- pkg-config not found: Ensure the
pkg-config package is installed
- OpenCV version incompatibility: GoCV works with specific OpenCV versions, check version compatibility
- Cross-compilation difficulty: Build natively for ARM or use Docker
Important Cross-Compilation Note (ARM / Edge Devices):
- Because GoCV uses CGO, simply running
GOOS=linux GOARCH=arm64 go build ... from an x86 machine to target Raspberry Pi / Jetson and other ARM devices will often not work out of the box.
- During compilation you must target the correct architecture both for Go and for the native OpenCV libraries, which is non-trivial in practice.
- Recommended approaches:
- Prefer native builds on the ARM device itself (e.g. run
go build directly on the Raspberry Pi).
- Or use Docker buildx + QEMU to build multi-arch Docker images (building arm64 images from an amd64 host).
- In production scenarios, defining separate build pipelines per architecture (amd64, armv7, arm64) is usually the most reliable solution.
5.4 First Test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
// cmd/main.go
package main
import (
"context"
"log"
"time"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Test video source (from file)
source := NewVideoSource("file://test.mp4", 640, 480)
if err := source.Start(ctx); err != nil {
log.Fatal(err)
}
// Read first 10 frames
frameCount := 0
for frame := range source.Frames() {
log.Printf("Frame %d received: %dx%d", frameCount, frame.Width, frame.Height)
frameCount++
if frameCount >= 10 {
break
}
}
source.Stop()
log.Println("Test completed!")
}
|
5.5 Production Readiness Checklist
Before moving to production, check the following checklist:
- Error handling and retry mechanisms added
- Logging and monitoring set up
- Health check endpoints working
- Configuration files ready
- Docker image built and tested
- Load test performed
- Memory leak test performed
- Graceful shutdown working
- Alerting mechanism set up
- Backup and recovery plan ready
Choosing the right codec and format for edge processing is critical. Advantages and disadvantages of different codecs:
6.1 Codec Comparison
| Codec |
Compression Ratio |
CPU Usage |
Edge Suitability |
Recommended Usage |
| H.264 |
1.0x (baseline) |
Low |
โญโญโญโญโญ |
General use, most common |
| H.265/HEVC |
1.5-2.0x |
Medium-High |
โญโญโญโญ |
High quality, limited bandwidth |
| AV1 |
2.0-2.5x |
Very High |
โญโญ |
Future use, high-performance devices |
| MJPEG |
0.3-0.5x |
Very Low |
โญโญโญ |
Low latency, frame-by-frame processing |
Recommended format for edge processing: RGB24 or YUV420
RGB24:
- โ
3 bytes per pixel (R, G, B)
- โ
Suitable for direct image processing
- โ
Easy integration with OpenCV/GoCV
- โ Larger memory usage
YUV420:
- โ
Smaller memory usage (50% less)
- โ
Native format for video codecs
- โ May require conversion to RGB
1
2
3
4
5
6
|
// Format selection in FFmpeg
// For RGB24:
args := []string{"-pix_fmt", "rgb24"}
// For YUV420 (smaller):
args := []string{"-pix_fmt", "yuv420p"}
|
7. Getting Frames from Video Source
To get frames from a video source, we’ll run FFmpeg as a process within our Go application. FFmpeg can read video from RTSP, USB cameras, files, and many other sources.
7.1 Different Video Sources
In production environments, you may encounter different types of video sources:
RTSP Stream (IP Camera)
The most common use case. IP cameras typically provide video over the RTSP protocol.
USB Camera
For USB cameras, we can use FFmpeg’s video4linux2 (v4l2) support:
1
|
ffmpeg -f v4l2 -i /dev/video0 -f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1
|
Video File
Reading from video files for testing and development:
1
|
ffmpeg -i test_video.mp4 -f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1
|
HTTP/HTTPS Stream
Some modern cameras provide MJPEG or HLS streams over HTTP.
7.2 FFmpeg Command
1
2
|
ffmpeg -rtsp_transport tcp -i rtsp://username:password@camera-ip:554/stream \
-f rawvideo -pix_fmt rgb24 -s 640x480 pipe:1
|
This command:
-rtsp_transport tcp: Uses TCP for RTSP connection (reliable)
-f rawvideo: Raw video format output
-pix_fmt rgb24: RGB24 pixel format (3 bytes per pixel)
-s 640x480: Resolution setting (can be changed as needed)
pipe:1: Writes to stdout (we’ll read it in Go)
7.3 FFmpeg Integration with Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
package main
import (
"bufio"
"context"
"fmt"
"io"
"log"
"os/exec"
"sync"
"time"
)
type Frame struct {
Data []byte
Width int
Height int
Timestamp time.Time
}
type VideoSource struct {
sourceURL string
width int
height int
cmd *exec.Cmd
stdout io.ReadCloser
frames chan *Frame
errors chan error
wg sync.WaitGroup
}
func NewVideoSource(sourceURL string, width, height int) *VideoSource {
return &VideoSource{
sourceURL: sourceURL,
width: width,
height: height,
frames: make(chan *Frame, 10), // Buffer 10 frames
errors: make(chan error, 1),
}
}
func (vs *VideoSource) Start(ctx context.Context) error {
// Create FFmpeg command
args := []string{
"-rtsp_transport", "tcp",
"-i", vs.sourceURL,
"-f", "rawvideo",
"-pix_fmt", "rgb24",
"-s", fmt.Sprintf("%dx%d", vs.width, vs.height),
"pipe:1",
}
vs.cmd = exec.CommandContext(ctx, "ffmpeg", args...)
var err error
vs.stdout, err = vs.cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("stdout pipe: %w", err)
}
stderr, err := vs.cmd.StderrPipe()
if err != nil {
return fmt.Errorf("stderr pipe: %w", err)
}
if err := vs.cmd.Start(); err != nil {
return fmt.Errorf("start ffmpeg: %w", err)
}
// Log FFmpeg stderr output for debugging
go func() {
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
log.Printf("FFmpeg: %s", scanner.Text())
}
}()
// Frame reading goroutine
vs.wg.Add(1)
go vs.readFrames(ctx)
return nil
}
func (vs *VideoSource) readFrames(ctx context.Context) {
defer vs.wg.Done()
frameSize := vs.width * vs.height * 3 // RGB24: 3 bytes per pixel
frameBuffer := make([]byte, frameSize)
for {
select {
case <-ctx.Done():
// When context is cancelled, also terminate the FFmpeg process
if vs.cmd != nil && vs.cmd.Process != nil {
_ = vs.cmd.Process.Kill()
}
return
default:
// Read a complete frame
n, err := io.ReadFull(vs.stdout, frameBuffer)
if err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
select {
case vs.errors <- fmt.Errorf("read frame: %w", err):
case <-ctx.Done():
}
}
return
}
if n != frameSize {
continue // Incomplete frame, skip
}
// Copy new frame buffer (goroutine safe)
frameData := make([]byte, frameSize)
copy(frameData, frameBuffer)
frame := &Frame{
Data: frameData,
Width: vs.width,
Height: vs.height,
Timestamp: time.Now(),
}
select {
case vs.frames <- frame:
case <-ctx.Done():
return
}
}
}
}
func (vs *VideoSource) Frames() <-chan *Frame {
return vs.frames
}
func (vs *VideoSource) Errors() <-chan error {
return vs.errors
}
func (vs *VideoSource) Stop() error {
if vs.cmd != nil && vs.cmd.Process != nil {
vs.cmd.Process.Kill()
}
vs.wg.Wait()
close(vs.frames)
close(vs.errors)
return nil
}
|
8. Video Pipeline with Go
Go’s goroutine and channel structure is perfect for video processing pipelines. Each stage runs as a goroutine and frames are passed through channels.
8.1 Pipeline Architecture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
package main
import (
"context"
"fmt"
"sync"
"time"
)
type Pipeline struct {
source *VideoSource
processors []FrameProcessor
wg sync.WaitGroup
}
type FrameProcessor interface {
Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame)
Name() string
}
func NewPipeline(source *VideoSource) *Pipeline {
return &Pipeline{
source: source,
processors: make([]FrameProcessor, 0),
}
}
func (p *Pipeline) AddProcessor(processor FrameProcessor) {
p.processors = append(p.processors, processor)
}
func (p *Pipeline) Run(ctx context.Context) error {
// Start video source
if err := p.source.Start(ctx); err != nil {
return err
}
// Create channels
channels := make([]chan *Frame, len(p.processors)+1)
for i := range channels {
channels[i] = make(chan *Frame, 10)
}
// Copy from source to first channel
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer close(channels[0])
for frame := range p.source.Frames() {
select {
case channels[0] <- frame:
case <-ctx.Done():
return
}
}
}()
// Run each processor
for i, proc := range p.processors {
p.wg.Add(1)
go func(idx int, processor FrameProcessor) {
defer p.wg.Done()
defer func() {
if r := recover(); r != nil {
fmt.Printf("Processor panic: %v\n", r)
}
close(channels[idx+1])
}()
processor.Process(ctx, channels[idx], channels[idx+1])
}(i, proc)
}
// Error handling
go func() {
for err := range p.source.Errors() {
// Log error (example)
fmt.Printf("Video source error: %v\n", err)
}
}()
return nil
}
func (p *Pipeline) Stop() error {
return p.source.Stop()
}
|
8.2 Frame Struct Enhancement
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
type Frame struct {
Data []byte
Width int
Height int
Timestamp time.Time
Metadata map[string]interface{} // For analysis results
}
func NewFrame(data []byte, width, height int) *Frame {
return &Frame{
Data: data,
Width: width,
Height: height,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
}
|
9. Real-Time Video Analysis
9.0 ROI (Region of Interest) Detection
Instead of processing the entire frame, we can significantly reduce CPU usage by processing only relevant regions (ROI):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
import (
"image"
"sync"
)
type ROIDetector struct {
regions []image.Rectangle
mutex sync.RWMutex
}
func NewROIDetector() *ROIDetector {
return &ROIDetector{
regions: make([]image.Rectangle, 0),
}
}
// Determine ROI based on motion detection results
func (rd *ROIDetector) UpdateROI(motionMap []bool, width, height int) {
rd.mutex.Lock()
defer rd.mutex.Unlock()
// Detect motion regions and create bounding box
// Simple implementation: bounding box of motion pixels
minX, minY := width, height
maxX, maxY := 0, 0
for y := 0; y < height; y++ {
for x := 0; x < width; x++ {
idx := y*width + x
if idx < len(motionMap) && motionMap[idx] {
if x < minX {
minX = x
}
if x > maxX {
maxX = x
}
if y < minY {
minY = y
}
if y > maxY {
maxY = y
}
}
}
}
if maxX > minX && maxY > minY {
// Add padding
padding := 20
roi := image.Rect(
maxInt(0, minX-padding),
maxInt(0, minY-padding),
minInt(width, maxX+padding),
minInt(height, maxY+padding),
)
rd.regions = []image.Rectangle{roi}
}
}
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
// Extract only the ROI region from frame
func (rd *ROIDetector) ExtractROI(frame *Frame) []*Frame {
rd.mutex.RLock()
defer rd.mutex.RUnlock()
roiFrames := make([]*Frame, 0, len(rd.regions))
for _, roi := range rd.regions {
roiData := extractRegion(frame.Data, frame.Width, frame.Height, roi)
roiFrame := &Frame{
Data: roiData,
Width: roi.Dx(),
Height: roi.Dy(),
Timestamp: frame.Timestamp,
Metadata: map[string]interface{}{
"roi": roi,
"original_width": frame.Width,
"original_height": frame.Height,
},
}
roiFrames = append(roiFrames, roiFrame)
}
return roiFrames
}
func extractRegion(data []byte, width, height int, roi image.Rectangle) []byte {
roiWidth := roi.Dx()
roiHeight := roi.Dy()
// Bounds check once (ROI'nin frame sฤฑnฤฑrlarฤฑ iรงinde olduฤundan emin ol)
if roi.Min.X < 0 || roi.Min.Y < 0 || roi.Max.X > width || roi.Max.Y > height {
return nil
}
roiData := make([]byte, roiWidth*roiHeight*3)
for y := 0; y < roiHeight; y++ {
srcY := roi.Min.Y + y
srcOffset := (srcY*width + roi.Min.X) * 3
dstOffset := (y * roiWidth) * 3
// Copy whole row at once (faster, fewer bounds checks)
copy(roiData[dstOffset:dstOffset+roiWidth*3],
data[srcOffset:srcOffset+roiWidth*3])
}
return roiData
}
|
Advantages of ROI Usage:
- 60-80% CPU savings - Process only relevant regions
- Faster object detection - Inference on smaller ROIs
- Less memory usage - Store only ROI frames
9.1 Motion Detection
Motion detection detects movement by calculating differences between frames. It’s a lightweight and fast method for edge systems.
Frame Differencing Algorithm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
package main
import (
"context"
"sync"
)
type MotionDetector struct {
threshold float64
previousFrame []byte
width, height int
mutex sync.RWMutex // thread-safety for previousFrame
}
func NewMotionDetector(threshold float64, width, height int) *MotionDetector {
return &MotionDetector{
threshold: threshold,
width: width,
height: height,
}
}
func (md *MotionDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
for frame := range in {
select {
case <-ctx.Done():
return
default:
hasMotion := md.detectMotion(frame)
frame.Metadata["motion_detected"] = hasMotion
if hasMotion {
frame.Metadata["motion_score"] = md.calculateMotionScore(frame)
}
select {
case out <- frame:
case <-ctx.Done():
return
}
}
}
}
func (md *MotionDetector) Name() string {
return "MotionDetector"
}
func (md *MotionDetector) detectMotion(frame *Frame) bool {
md.mutex.Lock()
defer md.mutex.Unlock()
if md.previousFrame == nil {
// First frame, save and no motion
md.previousFrame = make([]byte, len(frame.Data))
copy(md.previousFrame, frame.Data)
return false
}
diff := md.calculateDifference(frame.Data, md.previousFrame)
motionRatio := float64(diff) / float64(len(frame.Data))
// Reuse previous frame buffer (reallocate only if size changed)
if md.previousFrame == nil || len(md.previousFrame) != len(frame.Data) {
md.previousFrame = make([]byte, len(frame.Data))
}
copy(md.previousFrame, frame.Data)
return motionRatio > md.threshold
}
func (md *MotionDetector) calculateDifference(current, previous []byte) int {
diff := 0
for i := 0; i < len(current) && i < len(previous); i++ {
if current[i] > previous[i] {
diff += int(current[i] - previous[i])
} else {
diff += int(previous[i] - current[i])
}
}
return diff
}
func (md *MotionDetector) calculateMotionScore(frame *Frame) float64 {
md.mutex.RLock()
defer md.mutex.RUnlock()
if md.previousFrame == nil {
return 0.0
}
diff := md.calculateDifference(frame.Data, md.previousFrame)
maxDiff := len(frame.Data) * 255
return float64(diff) / float64(maxDiff)
}
|
9.2 Object Detection
For object detection, GoCV (OpenCV binding) or external inference services can be used.
Simple Object Detection with GoCV
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
package main
import (
"context"
"fmt"
"image"
"os"
"strings"
"gocv.io/x/gocv"
)
type ObjectDetector struct {
net gocv.Net
classes []string
}
func NewObjectDetector(modelPath, configPath, classesPath string) (*ObjectDetector, error) {
net := gocv.ReadNet(modelPath, configPath)
if net.Empty() {
return nil, fmt.Errorf("failed to load model")
}
// Use GPU if available (optional)
net.SetPreferableBackend(gocv.NetBackendOpenVINO)
net.SetPreferableTarget(gocv.NetTargetCPU) // or NetTargetVPU
// Read classes file
classes, err := loadClasses(classesPath)
if err != nil {
return nil, fmt.Errorf("failed to load classes: %w", err)
}
return &ObjectDetector{
net: net,
classes: classes,
}, nil
}
func (od *ObjectDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
for frame := range in {
select {
case <-ctx.Done():
return
default:
detections := od.detect(frame)
frame.Metadata["detections"] = detections
select {
case out <- frame:
case <-ctx.Done():
return
}
}
}
}
func (od *ObjectDetector) Name() string {
return "ObjectDetector"
}
func (od *ObjectDetector) detect(frame *Frame) []Detection {
// Convert frame data to gocv.Mat
img, err := gocv.NewMatFromBytes(
frame.Height,
frame.Width,
gocv.MatTypeCV8UC3,
frame.Data,
)
if err != nil {
return nil
}
defer img.Close()
// Create blob (model input format)
blob := gocv.BlobFromImage(img, 1.0/255.0, image.Pt(416, 416), gocv.NewScalar(0, 0, 0, 0), true, false)
defer blob.Close()
// Inference
od.net.SetInput(blob, "")
prob := od.net.Forward("")
defer prob.Close()
// Parse results
detections := parseDetections(prob, frame.Width, frame.Height)
return detections
}
type Detection struct {
Class string
Confidence float64
BBox image.Rectangle
}
func parseDetections(prob gocv.Mat, imgWidth, imgHeight int) []Detection {
// Parse according to model output
// This example is simplified, real implementation depends on model
return []Detection{}
}
func loadClasses(path string) ([]string, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read classes file: %w", err)
}
lines := strings.Split(string(data), "\n")
classes := make([]string, 0, len(lines))
for _, line := range lines {
line = strings.TrimSpace(line)
if line != "" {
classes = append(classes, line)
}
}
return classes, nil
}
|
External Inference Service (HTTP/REST)
If the model is very large or there’s specialized hardware (NVIDIA Jetson, Intel Neural Compute Stick), we can move inference to a separate service:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
package main
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"time"
)
type ExternalDetector struct {
client *http.Client
apiURL string
timeout time.Duration
}
func (ed *ExternalDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
for frame := range in {
select {
case <-ctx.Done():
return
default:
detections, err := ed.callInferenceService(frame)
if err == nil {
frame.Metadata["detections"] = detections
} else {
frame.Metadata["detection_error"] = err.Error()
}
select {
case out <- frame:
case <-ctx.Done():
return
}
}
}
}
func (ed *ExternalDetector) callInferenceService(frame *Frame) ([]Detection, error) {
// Base64 encode frame or send binary
reqBody, err := json.Marshal(map[string]interface{}{
"image": base64.StdEncoding.EncodeToString(frame.Data),
"width": frame.Width,
"height": frame.Height,
})
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), ed.timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", ed.apiURL, bytes.NewReader(reqBody))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
resp, err := ed.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result struct {
Detections []Detection `json:"detections"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return result.Detections, nil
}
|
10. Edge Decision Layer
The Edge Decision Layer makes decisions based on analysis results and sends only important events to the cloud. This layer:
- Threshold filtering - Filters low confidence detections
- Event deduplication - Doesn’t send the same event repeatedly
- Rate limiting - Prevents sending too many events
- Event aggregation - Combines multiple events
10.1 Event Publisher Implementations
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
package main
import (
"context"
"fmt"
"sync"
"time"
)
type EdgeDecisionLayer struct {
minConfidence float64
eventPublisher EventPublisher
lastEvents map[string]time.Time
mutex sync.RWMutex
cooldown time.Duration
}
type EventPublisher interface {
PublishWithContext(ctx context.Context, event *Event) error
}
type Event struct {
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Confidence float64 `json:"confidence"`
Detections []Detection `json:"detections,omitempty"`
Metadata map[string]interface{} `json:"metadata"`
}
func NewEdgeDecisionLayer(minConfidence float64, publisher EventPublisher, cooldown time.Duration) *EdgeDecisionLayer {
return &EdgeDecisionLayer{
minConfidence: minConfidence,
eventPublisher: publisher,
lastEvents: make(map[string]time.Time),
cooldown: cooldown,
}
}
func (edl *EdgeDecisionLayer) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
for frame := range in {
select {
case <-ctx.Done():
return
default:
edl.processFrame(frame)
// Continue frame in pipeline
select {
case out <- frame:
case <-ctx.Done():
return
}
}
}
}
func (edl *EdgeDecisionLayer) Name() string {
return "EdgeDecisionLayer"
}
func (edl *EdgeDecisionLayer) processFrame(frame *Frame) {
// Check if detections exist
detections, ok := frame.Metadata["detections"].([]Detection)
if !ok {
return
}
// Filter high confidence detections
significantDetections := edl.filterByConfidence(detections)
if len(significantDetections) == 0 {
return
}
// Determine event type
eventType := edl.determineEventType(significantDetections)
// Cooldown check (don't send same event too frequently)
if !edl.shouldSendEvent(eventType) {
return
}
// Create event
event := &Event{
Type: eventType,
Timestamp: frame.Timestamp,
Confidence: edl.calculateMaxConfidence(significantDetections),
Detections: significantDetections,
Metadata: map[string]interface{}{
"frame_width": frame.Width,
"frame_height": frame.Height,
},
}
// Publish asynchronously with timeout to avoid blocking the pipeline
go func(evt *Event, evtType string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := edl.eventPublisher.PublishWithContext(ctx, evt); err != nil {
fmt.Printf("Failed to publish event: %v\n", err)
return
}
// Update cooldown on successful publish
edl.mutex.Lock()
edl.lastEvents[evtType] = time.Now()
edl.mutex.Unlock()
}(event, eventType)
}
func (edl *EdgeDecisionLayer) filterByConfidence(detections []Detection) []Detection {
filtered := make([]Detection, 0)
for _, det := range detections {
if det.Confidence >= edl.minConfidence {
filtered = append(filtered, det)
}
}
return filtered
}
func (edl *EdgeDecisionLayer) determineEventType(detections []Detection) string {
// Simple logic: use first detection's class
// Real application can have more complex logic
if len(detections) > 0 {
return detections[0].Class + "_detected"
}
return "unknown"
}
func (edl *EdgeDecisionLayer) shouldSendEvent(eventType string) bool {
edl.mutex.RLock()
lastTime, exists := edl.lastEvents[eventType]
edl.mutex.RUnlock()
if !exists {
return true
}
return time.Since(lastTime) > edl.cooldown
}
func (edl *EdgeDecisionLayer) calculateMaxConfidence(detections []Detection) float64 {
max := 0.0
for _, det := range detections {
if det.Confidence > max {
max = det.Confidence
}
}
return max
}
|
MQTT Publisher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package main
import (
"encoding/json"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type MQTTPublisher struct {
client mqtt.Client
topic string
}
func NewMQTTPublisher(brokerURL, topic, clientID string) (*MQTTPublisher, error) {
opts := mqtt.NewClientOptions()
opts.AddBroker(brokerURL)
opts.SetClientID(clientID)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
return &MQTTPublisher{
client: client,
topic: topic,
}, nil
}
func (mp *MQTTPublisher) PublishWithContext(ctx context.Context, event *Event) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
token := mp.client.Publish(mp.topic, 1, false, data)
done := make(chan struct{})
go func() {
token.Wait()
close(done)
}()
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return token.Error()
}
}
func (mp *MQTTPublisher) Close() error {
if mp.client != nil && mp.client.IsConnected() {
// 250ms timeout for pending operations
mp.client.Disconnect(250)
}
return nil
}
|
HTTP Webhook Publisher
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
type WebhookPublisher struct {
client *http.Client
webhookURL string
}
func (wp *WebhookPublisher) PublishWithContext(ctx context.Context, event *Event) error {
data, err := json.Marshal(event)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", wp.webhookURL, bytes.NewReader(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := wp.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("webhook returned status %d", resp.StatusCode)
}
return nil
}
|
11. Multi-Camera Support
In production environments, you typically need to manage multiple cameras. We can do parallel processing by creating separate pipelines for each camera:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
import (
"context"
"fmt"
"log"
"sync"
)
type MultiCameraManager struct {
cameras map[string]*CameraPipeline
mutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
type CameraPipeline struct {
ID string
Source *VideoSource
Pipeline *Pipeline
Config CameraConfig
}
type CameraConfig struct {
URL string
Width int
Height int
MotionEnabled bool
ObjectEnabled bool
}
func NewMultiCameraManager() *MultiCameraManager {
ctx, cancel := context.WithCancel(context.Background())
return &MultiCameraManager{
cameras: make(map[string]*CameraPipeline),
ctx: ctx,
cancel: cancel,
}
}
func (mcm *MultiCameraManager) AddCamera(id string, config CameraConfig) error {
mcm.mutex.Lock()
defer mcm.mutex.Unlock()
source := NewVideoSource(config.URL, config.Width, config.Height)
pipeline := NewPipeline(source)
// Add motion detector
if config.MotionEnabled {
motionDetector := NewMotionDetector(0.05, config.Width, config.Height)
pipeline.AddProcessor(motionDetector)
}
// Add object detector
if config.ObjectEnabled {
objectDetector, err := NewObjectDetector("model.onnx", "config.yaml", "classes.txt")
if err == nil {
pipeline.AddProcessor(objectDetector)
}
}
cameraPipeline := &CameraPipeline{
ID: id,
Source: source,
Pipeline: pipeline,
Config: config,
}
mcm.cameras[id] = cameraPipeline
// Start pipeline
go func() {
if err := pipeline.Run(mcm.ctx); err != nil {
log.Printf("Camera %s pipeline error: %v", id, err)
}
}()
return nil
}
func (mcm *MultiCameraManager) RemoveCamera(id string) error {
mcm.mutex.Lock()
defer mcm.mutex.Unlock()
if camera, exists := mcm.cameras[id]; exists {
camera.Pipeline.Stop()
delete(mcm.cameras, id)
}
return nil
}
func (mcm *MultiCameraManager) GetCameraStatus(id string) (map[string]interface{}, error) {
mcm.mutex.RLock()
defer mcm.mutex.RUnlock()
camera, exists := mcm.cameras[id]
if !exists {
return nil, fmt.Errorf("camera %s not found", id)
}
return map[string]interface{}{
"id": camera.ID,
"url": camera.Config.URL,
"status": "running",
"width": camera.Config.Width,
"height": camera.Config.Height,
}, nil
}
|
12.1 Memory Pooling
Video processing requires a lot of memory allocation. We can reduce memory allocation overhead using sync.Pool:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
var framePool = sync.Pool{
New: func() interface{} {
// Pool for frame buffer
return make([]byte, 640*480*3) // RGB24, 640x480
},
}
// Get from pool when reading frames (correct usage example)
func (vs *VideoSource) readFrames(ctx context.Context) {
frameSize := vs.width * vs.height * 3
for {
frameBuffer := framePool.Get().([]byte)
n, err := io.ReadFull(vs.stdout, frameBuffer)
if err != nil {
// Immediately return buffer to pool on error
framePool.Put(frameBuffer)
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
}
select {
case vs.errors <- fmt.Errorf("read frame: %w", err):
case <-ctx.Done():
}
return
}
if n != frameSize {
framePool.Put(frameBuffer)
continue
}
// Copy frame data so it is independent from the pool buffer
frameData := make([]byte, frameSize)
copy(frameData, frameBuffer)
// Return buffer to pool
framePool.Put(frameBuffer)
frame := &Frame{
Data: frameData,
Width: vs.width,
Height: vs.height,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
select {
case vs.frames <- frame:
case <-ctx.Done():
return
}
}
}
|
12.2 Frame Rate Control
In some cases, we don’t need to process all frames. We can reduce CPU usage with frame skipping:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
type FrameRateController struct {
targetFPS int
frameCounter int
skipRatio int
}
func NewFrameRateController(targetFPS int) *FrameRateController {
// For example: to reduce from 30 FPS to 10 FPS, take 1 out of every 3 frames
skipRatio := 30 / targetFPS
return &FrameRateController{
targetFPS: targetFPS,
skipRatio: skipRatio,
}
}
func (frc *FrameRateController) ShouldProcess() bool {
frc.frameCounter++
return frc.frameCounter%frc.skipRatio == 0
}
|
12.3 Context and Cancellation
Use context cancellation in long-running operations to properly clean up resources:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import (
"os"
"os/signal"
"syscall"
)
func (p *Pipeline) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
cancel()
}()
// Run pipeline
// ...
return nil
}
|
12.4 Benchmarking
To collect performance metrics:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
type PerformanceMetrics struct {
FramesProcessed int64
ProcessingTime time.Duration
FrameRate float64
startTime time.Time
mutex sync.RWMutex
}
func NewPerformanceMetrics() *PerformanceMetrics {
return &PerformanceMetrics{
startTime: time.Now(),
}
}
func (pm *PerformanceMetrics) RecordFrame(processingTime time.Duration) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.FramesProcessed++
pm.ProcessingTime += processingTime
elapsed := time.Since(pm.startTime).Seconds()
if elapsed > 0 {
pm.FrameRate = float64(pm.FramesProcessed) / elapsed
}
}
|
Some real performance values we obtained in the test environment:
- Hardware: Raspberry Pi 4 (4GB RAM), 640x480 resolution
- Motion Detection: ~25-30 FPS (CPU: 15-20%)
- Object Detection (CPU): ~2-3 FPS (CPU: 80-90%)
- Object Detection (GPU/OpenVINO): ~8-10 FPS (CPU: 40-50%)
- Memory Usage: ~150-200 MB (with memory pool)
- Network Latency (MQTT): ~10-50ms (local broker)
Important Note: CPU usage for object detection is very high. In production, using GPU or dedicated AI accelerator (Jetson Nano, Neural Compute Stick) is recommended.
12.6 Error Handling and Resilience
Retry Mechanism
Video source connections can be interrupted. Let’s add a retry mechanism:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func (vs *VideoSource) StartWithRetry(ctx context.Context, maxRetries int) error {
var lastErr error
for i := 0; i < maxRetries; i++ {
if err := vs.Start(ctx); err == nil {
return nil
}
lastErr = err
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * time.Duration(i+1)):
// Exponential backoff
}
}
return fmt.Errorf("failed after %d retries: %w", maxRetries, lastErr)
}
|
Graceful Degradation
If the object detection service is not working, we can continue with only motion detection:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func (od *ObjectDetector) Process(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
for frame := range in {
select {
case <-ctx.Done():
return
default:
detections, err := od.detect(frame)
if err != nil {
// Log error but don't stop pipeline
frame.Metadata["detection_error"] = err.Error()
} else {
frame.Metadata["detections"] = detections
}
select {
case out <- frame:
case <-ctx.Done():
return
}
}
}
}
|
12.7 Monitoring and Logging
Structured Logging
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import "github.com/sirupsen/logrus"
var logger = logrus.New()
func init() {
logger.SetFormatter(&logrus.JSONFormatter{})
logger.SetLevel(logrus.InfoLevel)
}
// Usage
logger.WithFields(logrus.Fields{
"frame_id": frameID,
"fps": fps,
"detections": len(detections),
"processing_time_ms": processingTime.Milliseconds(),
}).Info("Frame processed")
|
Metrics Export (Prometheus)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import "github.com/prometheus/client_golang/prometheus"
var (
framesProcessed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "video_frames_processed_total",
Help: "Total number of frames processed",
},
[]string{"source"},
)
processingDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "video_frame_processing_seconds",
Help: "Frame processing duration",
Buckets: prometheus.DefBuckets,
},
[]string{"stage"},
)
)
func init() {
prometheus.MustRegister(framesProcessed)
prometheus.MustRegister(processingDuration)
}
|
Metrics HTTP Endpoint
Let’s add an HTTP endpoint to expose Prometheus metrics:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import (
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func startMetricsServer(port string) error {
http.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(":"+port, nil)
}
// In main.go
go func() {
if err := startMetricsServer("2112"); err != nil {
log.Printf("Metrics server error: %v", err)
}
}()
|
We can perform performance analysis with Go’s built-in profiling tools.
CPU Profiling with pprof
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import (
_ "net/http/pprof"
"net/http"
)
func main() {
// Add profiling endpoints
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// ... your application code
}
// To collect profile:
// go tool pprof http://localhost:6060/debug/pprof/profile?seconds=30
// go tool pprof http://localhost:6060/debug/pprof/heap
|
Memory Profiling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import (
"runtime"
"runtime/pprof"
"os"
)
func captureMemoryProfile(filename string) error {
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
runtime.GC() // Force GC
return pprof.WriteHeapProfile(f)
}
// Usage: Periodically capture memory profile
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
captureMemoryProfile("memprofile.prof")
}
}()
|
Goroutine Profiling
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import (
"runtime/pprof"
"os"
)
func captureGoroutineProfile(filename string) error {
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
return pprof.Lookup("goroutine").WriteTo(f, 0)
}
|
13. Testing
For writing tests in video processing systems, we should use mocks and test utilities. A comprehensive test strategy is critical for production systems.
13.1 Unit Test Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package main
import (
"testing"
"time"
)
func TestMotionDetector(t *testing.T) {
detector := NewMotionDetector(0.05, 640, 480)
// First frame - should not have motion
frame1 := &Frame{
Data: make([]byte, 640*480*3),
Width: 640,
Height: 480,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
hasMotion := detector.detectMotion(frame1)
if hasMotion {
t.Error("First frame should not have motion")
}
// Second frame - make changes
frame2 := &Frame{
Data: make([]byte, 640*480*3),
Width: 640,
Height: 480,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
// Make changes in frame2 (change pixels)
for i := 0; i < len(frame2.Data); i += 100 {
frame2.Data[i] = 255
}
hasMotion = detector.detectMotion(frame2)
if !hasMotion {
t.Error("Frame with changes should have motion")
}
}
|
13.2 Integration Test
Integration tests that test real components working together:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
func TestVideoPipelineIntegration(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Test with real FFmpeg process (use test video file)
source := NewVideoSource("test_video.mp4", 640, 480)
pipeline := NewPipeline(source)
motionDetector := NewMotionDetector(0.05, 640, 480)
objectDetector := NewMockObjectDetector()
edgeDecision := NewEdgeDecisionLayer(0.7, 5*time.Second)
pipeline.AddProcessor(motionDetector)
pipeline.AddProcessor(objectDetector)
pipeline.AddProcessor(edgeDecision)
// Collect events
events := make([]Event, 0)
go func() {
for event := range edgeDecision.Events() {
events = append(events, event)
}
}()
// Run pipeline
if err := pipeline.Run(ctx); err != nil {
t.Fatalf("Failed to start pipeline: %v", err)
}
// Wait 10 seconds
time.Sleep(10 * time.Second)
pipeline.Stop()
// At least one event expected
if len(events) == 0 {
t.Error("No events generated")
}
// Check that events are in correct format
for _, event := range events {
if event.Timestamp.IsZero() {
t.Error("Event timestamp is empty")
}
if event.Type == "" {
t.Error("Event type is empty")
}
}
}
|
13.3 End-to-End (E2E) Test
E2E tests that test the entire system working together:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
func TestE2EVideoProcessing(t *testing.T) {
// Start test environment
testEnv := setupTestEnvironment(t)
defer testEnv.Cleanup()
// Start MQTT broker (for testing)
mqttBroker := startTestMQTTBroker(t)
defer mqttBroker.Stop()
// Start video processing service
config := &Config{
VideoSource: "rtsp://test-camera:554/stream",
MQTTBroker: "localhost:1883",
}
service := NewVideoProcessingService(config)
if err := service.Start(); err != nil {
t.Fatalf("Failed to start service: %v", err)
}
defer service.Stop()
// Send test video stream
go sendTestVideoStream(t, "rtsp://test-camera:554/stream")
// Listen to events from MQTT
events := subscribeToMQTT(t, "video/events/+")
// Event expected within 30 seconds
timeout := time.After(30 * time.Second)
select {
case event := <-events:
// Validate event
if err := validateEvent(event); err != nil {
t.Errorf("Invalid event: %v", err)
}
t.Logf("Event received: %+v", event)
case <-timeout:
t.Error("Event timeout - no events received")
}
}
|
13.4 Load Testing
Load tests that test system performance under load:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
func TestLoadMultipleCameras(t *testing.T) {
if testing.Short() {
t.Skip("Skipping load test in short mode")
}
numCameras := 10
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Create multi-camera manager
manager := NewMultiCameraManager()
// Add 10 cameras
for i := 0; i < numCameras; i++ {
cameraID := fmt.Sprintf("camera-%d", i)
sourceURL := fmt.Sprintf("rtsp://test-camera-%d:554/stream", i)
if err := manager.AddCamera(cameraID, sourceURL, 640, 480); err != nil {
t.Fatalf("Failed to add camera: %v", err)
}
}
// Start all cameras
if err := manager.StartAll(ctx); err != nil {
t.Fatalf("Failed to start cameras: %v", err)
}
// Run for 2 minutes
time.Sleep(2 * time.Minute)
// Collect metrics
metrics := manager.GetMetrics()
// Performance check
if metrics.AverageFPS < 20 {
t.Errorf("Low FPS: %f (expected: >= 20)", metrics.AverageFPS)
}
if metrics.CPUUsage > 80 {
t.Errorf("High CPU usage: %f%% (expected: < 80%%)", metrics.CPUUsage)
}
if metrics.MemoryUsage > 500*1024*1024 { // 500MB
t.Errorf("High memory usage: %d bytes", metrics.MemoryUsage)
}
manager.StopAll()
}
|
13.5 Stress Testing
Stress tests that test system limits:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
func TestStressHighFrameRate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping stress test in short mode")
}
// Test with very high frame rate
highFPSSource := NewMockVideoSourceWithFPS(60) // 60 FPS
pipeline := NewPipeline(highFPSSource)
// Add all processors
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
// Monitor frame drop count
frameDrops := atomic.Int64{}
go func() {
for frame := range pipeline.Frames() {
if frame.Metadata["dropped"] == true {
frameDrops.Add(1)
}
}
}()
if err := pipeline.Run(ctx); err != nil {
t.Fatalf("Failed to start pipeline: %v", err)
}
time.Sleep(30 * time.Second)
pipeline.Stop()
dropRate := float64(frameDrops.Load()) / 1800.0 // 30 seconds * 60 FPS
if dropRate > 0.1 { // More than 10% drop
t.Logf("High frame drop rate: %.2f%%", dropRate*100)
// This means backpressure mechanism is working
}
}
|
13.6 Pipeline Integration Test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
func TestPipeline(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create mock video source
source := &MockVideoSource{
frames: make(chan *Frame, 10),
}
// Add test frames
for i := 0; i < 5; i++ {
source.frames <- &Frame{
Data: make([]byte, 640*480*3),
Width: 640,
Height: 480,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
}
close(source.frames)
// Create pipeline
pipeline := NewPipeline(source)
motionDetector := NewMotionDetector(0.05, 640, 480)
pipeline.AddProcessor(motionDetector)
// Run pipeline
if err := pipeline.Run(ctx); err != nil {
t.Fatalf("Failed to start pipeline: %v", err)
}
// Check results
time.Sleep(100 * time.Millisecond)
pipeline.Stop()
}
|
13.7 Mock Video Source
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
type MockVideoSource struct {
frames chan *Frame
errors chan error
}
func (m *MockVideoSource) Start(ctx context.Context) error {
return nil
}
func (m *MockVideoSource) Frames() <-chan *Frame {
return m.frames
}
func (m *MockVideoSource) Errors() <-chan error {
return m.errors
}
func (m *MockVideoSource) Stop() error {
close(m.frames)
close(m.errors)
return nil
}
|
13.8 Benchmark Test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func BenchmarkMotionDetection(b *testing.B) {
detector := NewMotionDetector(0.05, 640, 480)
frame := &Frame{
Data: make([]byte, 640*480*3),
Width: 640,
Height: 480,
Timestamp: time.Now(),
Metadata: make(map[string]interface{}),
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
detector.detectMotion(frame)
// Change frame
frame.Data[i%len(frame.Data)]++
}
}
// To run: go test -bench=BenchmarkMotionDetection -benchmem
|
13.9 Rate Limiting Test
To test that the rate limiting mechanism works correctly:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
import (
"sync"
"testing"
"time"
"golang.org/x/time/rate"
)
func TestRateLimiting(t *testing.T) {
publisher := &MockPublisher{
published: make([]*Event, 0),
}
// Create rate limiter with 10 events/second limit
limiter := rate.NewLimiter(10, 1) // 10 per second, burst 1
rateLimited := &RateLimitedPublisher{
publisher: publisher,
limiter: limiter,
}
// Send 20 events
start := time.Now()
for i := 0; i < 20; i++ {
event := &Event{
Type: "test",
Timestamp: time.Now(),
Data: map[string]interface{}{"id": i},
}
if err := rateLimited.Publish(event); err != nil {
t.Errorf("Publish error: %v", err)
}
}
elapsed := time.Since(start)
// At least 1 second required for 20 events (10 event/sec limit)
// Give some tolerance (0.9 seconds)
if elapsed < 900*time.Millisecond {
t.Errorf("Rate limiting not working: %v (expected: >= 1s)", elapsed)
}
// Check that all events were published
if len(publisher.published) != 20 {
t.Errorf("Expected 20 events, got: %d", len(publisher.published))
}
}
type MockPublisher struct {
published []*Event
mutex sync.Mutex
}
func (mp *MockPublisher) Publish(event *Event) error {
mp.mutex.Lock()
defer mp.mutex.Unlock()
mp.published = append(mp.published, event)
return nil
}
|
14. Configuration Management
We can use YAML or TOML for configuration management.
YAML Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
import (
"os"
"gopkg.in/yaml.v3"
)
type Config struct {
VideoSource struct {
URL string `yaml:"url"`
Width int `yaml:"width"`
Height int `yaml:"height"`
} `yaml:"video_source"`
Processing struct {
MotionThreshold float64 `yaml:"motion_threshold"`
MinConfidence float64 `yaml:"min_confidence"`
TargetFPS int `yaml:"target_fps"`
} `yaml:"processing"`
MQTT struct {
BrokerURL string `yaml:"broker_url"`
Topic string `yaml:"topic"`
ClientID string `yaml:"client_id"`
} `yaml:"mqtt"`
Server struct {
MetricsPort string `yaml:"metrics_port"`
HealthPort string `yaml:"health_port"`
} `yaml:"server"`
}
func LoadConfig(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var config Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, err
}
return &config, nil
}
|
config.yaml Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
video_source:
url: "rtsp://camera:554/stream"
width: 640
height: 480
processing:
motion_threshold: 0.05
min_confidence: 0.7
target_fps: 10
mqtt:
broker_url: "tcp://mqtt:1883"
topic: "video/events"
client_id: "video-processor"
server:
metrics_port: "2112"
health_port: "8080"
|
15. Health Checks and Observability
Health check endpoints are critical in production environments.
Health Check Endpoint
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
import (
"encoding/json"
"net/http"
"sync/atomic"
)
type HealthStatus struct {
Status string `json:"status"`
Version string `json:"version"`
Uptime string `json:"uptime"`
Frames int64 `json:"frames_processed"`
Errors int64 `json:"errors"`
Details map[string]string `json:"details,omitempty"`
}
type HealthChecker struct {
startTime time.Time
frames int64
errors int64
version string
}
func NewHealthChecker(version string) *HealthChecker {
return &HealthChecker{
startTime: time.Now(),
version: version,
}
}
func (hc *HealthChecker) RecordFrame() {
atomic.AddInt64(&hc.frames, 1)
}
func (hc *HealthChecker) RecordError() {
atomic.AddInt64(&hc.errors, 1)
}
func (hc *HealthChecker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
status := "healthy"
// Unhealthy if no frames processed in last 1 minute
frames := atomic.LoadInt64(&hc.frames)
if frames == 0 {
status = "unhealthy"
}
uptime := time.Since(hc.startTime).String()
health := HealthStatus{
Status: status,
Version: hc.version,
Uptime: uptime,
Frames: frames,
Errors: atomic.LoadInt64(&hc.errors),
}
w.Header().Set("Content-Type", "application/json")
if status == "healthy" {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
json.NewEncoder(w).Encode(health)
}
// Usage
healthChecker := NewHealthChecker("1.0.0")
http.Handle("/health", healthChecker)
go http.ListenAndServe(":8080", nil)
|
Readiness and Liveness Probes (Kubernetes)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 2
|
16. Backpressure Handling
When frame rate is high in video pipelines, channel buffers can fill up. Let’s add a backpressure mechanism.
Adaptive Frame Dropping
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
import (
"math"
"math/rand"
"sync"
"sync/atomic"
)
type FrameDropper struct {
dropRatio float64
bufferFull bool
droppedCount int64
mutex sync.RWMutex
}
func NewFrameDropper() *FrameDropper {
return &FrameDropper{
dropRatio: 0.0,
}
}
func (fd *FrameDropper) ShouldDrop(bufferUsage float64) bool {
fd.mutex.Lock()
defer fd.mutex.Unlock()
// Start frame drop if buffer is more than 80% full
if bufferUsage > 0.8 {
fd.bufferFull = true
fd.dropRatio = math.Min(0.5, (bufferUsage-0.8)*2.5) // Max 50% drop
return true
}
if fd.bufferFull && bufferUsage > 0.5 {
// Buffer still high, continue dropping
return rand.Float64() < fd.dropRatio
}
// Buffer returned to normal
fd.bufferFull = false
fd.dropRatio = 0.0
return false
}
func (fd *FrameDropper) RecordDrop() {
atomic.AddInt64(&fd.droppedCount, 1)
}
|
Channel Buffer Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func monitorChannelBuffer(ch <-chan *Frame, bufferSize int) float64 {
currentLen := len(ch)
return float64(currentLen) / float64(bufferSize)
}
// Usage in pipeline
select {
case out <- frame:
// Frame sent
case <-time.After(10 * time.Millisecond):
// Timeout - buffer may be full
bufferUsage := monitorChannelBuffer(out, cap(out))
if frameDropper.ShouldDrop(bufferUsage) {
frameDropper.RecordDrop()
// Skip frame
continue
}
// Retry
select {
case out <- frame:
case <-ctx.Done():
return
}
}
|
17. Network Optimization and Video Streaming Protocols
Network optimization is critical in edge processing systems. Video streams require high bandwidth and network delays directly affect system performance.
17.1 Video Streaming Protocols
RTSP (Real-Time Streaming Protocol)
RTSP is the most common protocol for IP cameras:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// RTSP URL format
rtspURL := "rtsp://username:password@192.168.1.100:554/stream1"
// RTSP connection with FFmpeg
args := []string{
"-rtsp_transport", "tcp", // Use TCP (instead of UDP)
"-i", rtspURL,
"-vf", "scale=640:480",
"-r", "15", // Frame rate limit
"-pix_fmt", "rgb24",
"-f", "rawvideo",
"pipe:1",
}
|
RTSP Optimizations:
- Use TCP transport (prevents UDP packet loss)
- Set timeout settings (fast recovery on connection drops)
- Add reconnection logic (automatic reconnection)
WebRTC (Web Real-Time Communication)
WebRTC is ideal for browser-based real-time streaming:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
import (
"bytes"
"image"
"image/color"
"image/jpeg"
"sync"
"github.com/gorilla/websocket"
)
// Convert frame to JPEG (helper function)
func frameToJPEG(frame *Frame, quality int) ([]byte, error) {
img := image.NewRGBA(image.Rect(0, 0, frame.Width, frame.Height))
for y := 0; y < frame.Height; y++ {
for x := 0; x < frame.Width; x++ {
idx := (y*frame.Width + x) * 3
if idx+2 >= len(frame.Data) {
continue
}
img.Set(x, y, color.RGBA{
R: frame.Data[idx],
G: frame.Data[idx+1],
B: frame.Data[idx+2],
A: 255,
})
}
}
var buf bytes.Buffer
if err := jpeg.Encode(&buf, img, &jpeg.Options{Quality: quality}); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Send frames over WebSocket for WebRTC
type WebRTCStreamer struct {
clients map[string]*websocket.Conn
mutex sync.RWMutex
}
func (ws *WebRTCStreamer) BroadcastFrame(frame *Frame) {
ws.mutex.RLock()
defer ws.mutex.RUnlock()
// Convert frame to JPEG
jpegData, err := frameToJPEG(frame, 80)
if err != nil {
return
}
// Send to all clients
for _, conn := range ws.clients {
if err := conn.WriteMessage(websocket.BinaryMessage, jpegData); err != nil {
// Remove faulty connection
delete(ws.clients, conn.RemoteAddr().String())
}
}
}
|
17.2 Bandwidth Optimization
Adaptive Bitrate Streaming
Dynamically adjusting quality based on network conditions:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
import (
"fmt"
"sync"
)
type AdaptiveBitrate struct {
currentQuality string
bandwidth float64 // Mbps
frameRate int
resolution string
mutex sync.RWMutex
}
func (ab *AdaptiveBitrate) AdjustQuality(measuredBandwidth float64) {
ab.mutex.Lock()
defer ab.mutex.Unlock()
// Determine quality level based on bandwidth
if measuredBandwidth < 1.0 {
ab.resolution = "320x240"
ab.frameRate = 10
} else if measuredBandwidth < 3.0 {
ab.resolution = "640x480"
ab.frameRate = 15
} else {
ab.resolution = "1280x720"
ab.frameRate = 30
}
ab.bandwidth = measuredBandwidth
}
func (ab *AdaptiveBitrate) GetFFmpegArgs() []string {
ab.mutex.RLock()
defer ab.mutex.RUnlock()
width, height := parseResolution(ab.resolution)
return []string{
"-vf", fmt.Sprintf("scale=%d:%d", width, height),
"-r", fmt.Sprintf("%d", ab.frameRate),
}
}
func parseResolution(resolution string) (int, int) {
var width, height int
fmt.Sscanf(resolution, "%dx%d", &width, &height)
return width, height
}
|
Frame Throttling
Throttling frames to reduce network load:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
type FrameThrottler struct {
maxFPS int
lastFrame time.Time
minInterval time.Duration
mutex sync.Mutex
}
func NewFrameThrottler(maxFPS int) *FrameThrottler {
return &FrameThrottler{
maxFPS: maxFPS,
minInterval: time.Second / time.Duration(maxFPS),
}
}
func (ft *FrameThrottler) ShouldProcess() bool {
ft.mutex.Lock()
defer ft.mutex.Unlock()
now := time.Now()
if now.Sub(ft.lastFrame) >= ft.minInterval {
ft.lastFrame = now
return true
}
return false
}
|
17.3 Network Resilience
Connection Retry and Backoff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
type NetworkRetry struct {
maxRetries int
backoff time.Duration
mutex sync.Mutex
}
func (nr *NetworkRetry) ConnectWithRetry(connectFn func() error) error {
nr.mutex.Lock()
defer nr.mutex.Unlock()
backoff := nr.backoff
for i := 0; i < nr.maxRetries; i++ {
if err := connectFn(); err == nil {
return nil
}
if i < nr.maxRetries-1 {
time.Sleep(backoff)
backoff *= 2 // Exponential backoff
}
}
return fmt.Errorf("connection failed after %d retries", nr.maxRetries)
}
|
Network Quality Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
type NetworkMonitor struct {
latency []time.Duration
packetLoss float64
bandwidth float64
mutex sync.RWMutex
}
func (nm *NetworkMonitor) MeasureLatency() time.Duration {
start := time.Now()
// Send ping or test packet
// ...
latency := time.Since(start)
nm.mutex.Lock()
nm.latency = append(nm.latency, latency)
if len(nm.latency) > 100 {
nm.latency = nm.latency[1:]
}
nm.mutex.Unlock()
return latency
}
func (nm *NetworkMonitor) GetAverageLatency() time.Duration {
nm.mutex.RLock()
defer nm.mutex.RUnlock()
if len(nm.latency) == 0 {
return 0
}
var sum time.Duration
for _, l := range nm.latency {
sum += l
}
return sum / time.Duration(len(nm.latency))
}
|
17.4 Video Compression Strategies
Compressing frames in events sent from edge to cloud:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// Compression based on JPEG quality setting
func compressFrame(frame *Frame, quality int) ([]byte, error) {
img := &image.RGBA{
Pix: frame.Data,
Stride: frame.Width * 3,
Rect: image.Rect(0, 0, frame.Width, frame.Height),
}
var buf bytes.Buffer
opts := &jpeg.Options{Quality: quality}
if err := jpeg.Encode(&buf, img, opts); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Quality levels:
// - 90-100: High quality, large file
// - 70-89: Medium quality, medium file
// - 50-69: Low quality, small file
|
18. Deployment and Containerization
Dockerfile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# Build stage - Cache optimization with multi-stage build
FROM golang:1.23-alpine AS builder
WORKDIR /app
# FFmpeg and OpenCV dependencies
RUN apk add --no-cache \
ffmpeg \
pkgconfig \
gstreamer \
gstreamer-dev \
opencv-dev
# Dependencies first (for cache - unchanged files)
COPY go.mod go.sum ./
RUN go mod download
# Then code (changing files)
COPY . .
# Build binary with optimization
RUN CGO_ENABLED=1 GOOS=linux go build \
-ldflags="-w -s" \
-o video-processor ./cmd/main.go
# Runtime image - Minimal and secure
FROM alpine:latest
RUN apk add --no-cache \
ffmpeg \
ca-certificates \
libc6-compat
# Create non-root user (security best practice)
RUN addgroup -g 1000 appuser && \
adduser -D -u 1000 -G appuser appuser
WORKDIR /app
# Copy binary from builder
COPY --from=builder /app/video-processor /app/video-processor
# Change ownership
RUN chown -R appuser:appuser /app
# Run as non-root user
USER appuser
CMD ["/app/video-processor"]
|
Docker Compose Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
version: '3.8'
services:
video-processor:
build: .
environment:
- RTSP_URL=rtsp://camera:554/stream
- MQTT_BROKER=mqtt://broker:1883
- LOG_LEVEL=info
volumes:
- ./config:/app/config
restart: unless-stopped
mqtt-broker:
image: eclipse-mosquitto:latest
ports:
- "1883:1883"
|
K3s/Kubernetes Deployment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
apiVersion: apps/v1
kind: Deployment
metadata:
name: video-processor
spec:
replicas: 1
selector:
matchLabels:
app: video-processor
template:
metadata:
labels:
app: video-processor
spec:
containers:
- name: video-processor
image: video-processor:latest
env:
- name: RTSP_URL
valueFrom:
configMapKeyRef:
name: video-config
key: rtsp-url
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "512Mi"
cpu: "1000m"
|
18.3 CI/CD Pipeline
CI/CD pipeline example for production deployment:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# .github/workflows/deploy.yml
name: Build and Deploy
on:
push:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: '1.23'
- name: Run tests
run: |
go test -v -race -coverprofile=coverage.out ./...
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.out
build:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Build Docker image
run: |
docker build -t video-processor:${{ github.sha }} .
docker tag video-processor:${{ github.sha }} video-processor:latest
- name: Push to registry
run: |
echo "${{ secrets.DOCKER_PASSWORD }}" | docker login -u "${{ secrets.DOCKER_USERNAME }}" --password-stdin
docker push video-processor:${{ github.sha }}
docker push video-processor:latest
deploy:
needs: build
runs-on: ubuntu-latest
steps:
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/video-processor \
video-processor=video-processor:${{ github.sha }} \
-n production
|
GitLab CI example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# .gitlab-ci.yml
stages:
- test
- build
- deploy
test:
stage: test
image: golang:1.23
script:
- go test -v -race ./...
- go vet ./...
build:
stage: build
image: docker:latest
services:
- docker:dind
script:
- docker build -t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA .
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
- docker tag $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA $CI_REGISTRY_IMAGE:latest
- docker push $CI_REGISTRY_IMAGE:latest
deploy:
stage: deploy
image: bitnami/kubectl:latest
script:
- kubectl set image deployment/video-processor video-processor=$CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
only:
- main
|
18.4 Blue-Green Deployment
Blue-green strategy for zero downtime deployment:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# blue-green-deployment.yaml
apiVersion: v1
kind: Service
metadata:
name: video-processor-service
spec:
selector:
app: video-processor
version: blue # or green
ports:
- port: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: video-processor-blue
spec:
replicas: 3
template:
metadata:
labels:
app: video-processor
version: blue
spec:
containers:
- name: video-processor
image: video-processor:v1.0.0
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: video-processor-green
spec:
replicas: 0 # Closed initially
template:
metadata:
labels:
app: video-processor
version: green
spec:
containers:
- name: video-processor
image: video-processor:v1.1.0
|
Blue-green deployment script:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
#!/bin/bash
# blue-green-deploy.sh
CURRENT_VERSION=$(kubectl get service video-processor-service -o jsonpath='{.spec.selector.version}')
NEW_VERSION=$([ "$CURRENT_VERSION" == "blue" ] && echo "green" || echo "blue")
NEW_IMAGE="video-processor:v1.2.0"
echo "Current version: $CURRENT_VERSION"
echo "Deploying to: $NEW_VERSION"
# Scale up new version
kubectl scale deployment video-processor-$NEW_VERSION --replicas=3
kubectl set image deployment/video-processor-$NEW_VERSION video-processor=$NEW_IMAGE
# Health check
echo "Waiting for new version to be ready..."
kubectl wait --for=condition=available --timeout=300s deployment/video-processor-$NEW_VERSION
# Route service to new version
kubectl patch service video-processor-service -p "{\"spec\":{\"selector\":{\"version\":\"$NEW_VERSION\"}}}"
# Scale down old version
kubectl scale deployment video-processor-$CURRENT_VERSION --replicas=0
echo "Deployment complete. New version: $NEW_VERSION"
|
18.5 Rollback Mechanism
Mechanism for quick rollback:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
package main
import (
"context"
"fmt"
"os/exec"
"time"
)
type DeploymentManager struct {
currentVersion string
previousVersion string
kubeconfig string
}
func (dm *DeploymentManager) Rollback() error {
if dm.previousVersion == "" {
return fmt.Errorf("no previous version to rollback to")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Rollback to old version
cmd := exec.CommandContext(ctx, "kubectl",
"set", "image", "deployment/video-processor",
fmt.Sprintf("video-processor=video-processor:%s", dm.previousVersion),
)
if err := cmd.Run(); err != nil {
return fmt.Errorf("rollback failed: %w", err)
}
// Health check
if err := dm.waitForDeployment(ctx); err != nil {
return fmt.Errorf("rollback health check failed: %w", err)
}
dm.currentVersion, dm.previousVersion = dm.previousVersion, dm.currentVersion
return nil
}
func (dm *DeploymentManager) waitForDeployment(ctx context.Context) error {
cmd := exec.CommandContext(ctx, "kubectl",
"wait", "--for=condition=available",
"--timeout=300s", "deployment/video-processor",
)
return cmd.Run()
}
|
Kubernetes rollback command:
1
2
3
4
5
6
7
8
|
# Rollback to last successful revision
kubectl rollout undo deployment/video-processor
# Rollback to specific revision
kubectl rollout undo deployment/video-processor --to-revision=3
# View rollout history
kubectl rollout history deployment/video-processor
|
18.6 Canary Deployment
Canary strategy for gradual deployment:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
apiVersion: v1
kind: Service
metadata:
name: video-processor
spec:
selector:
app: video-processor
ports:
- port: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: video-processor-stable
spec:
replicas: 9
template:
metadata:
labels:
app: video-processor
version: stable
spec:
containers:
- name: video-processor
image: video-processor:v1.0.0
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: video-processor-canary
spec:
replicas: 1 # 10% traffic
template:
metadata:
labels:
app: video-processor
version: canary
spec:
containers:
- name: video-processor
image: video-processor:v1.1.0
|
18.7 Pre-Deployment Checklist
Checklist before deploying to production:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
import (
"context"
"fmt"
"os/exec"
"time"
)
type PreDeploymentChecklist struct {
checks []Check
}
type Check struct {
Name string
Description string
Validate func() error
}
func NewPreDeploymentChecklist() *PreDeploymentChecklist {
return &PreDeploymentChecklist{
checks: []Check{
{
Name: "Unit Tests",
Description: "Are all unit tests passing?",
Validate: func() error {
cmd := exec.Command("go", "test", "./...")
return cmd.Run()
},
},
{
Name: "Integration Tests",
Description: "Are integration tests passing?",
Validate: func() error {
cmd := exec.Command("go", "test", "-tags=integration", "./...")
return cmd.Run()
},
},
{
Name: "Security Scan",
Description: "Has security scan been performed?",
Validate: func() error {
// Scan with Trivy, Snyk, etc.
return nil
},
},
{
Name: "Performance Test",
Description: "Are performance tests passing?",
Validate: func() error {
// Check load test results
return nil
},
},
},
}
}
func (pdc *PreDeploymentChecklist) Run() error {
for _, check := range pdc.checks {
fmt.Printf("Checking: %s...\n", check.Name)
if err := check.Validate(); err != nil {
return fmt.Errorf("%s failed: %w", check.Name, err)
}
fmt.Printf("โ %s passed\n", check.Name)
}
return nil
}
|
19. Security
Security is critical in production environments. Since edge processing systems handle sensitive video data, security layers must be implemented.
19.1 Credential Management and Secret Rotation
If username and password are used in RTSP URLs, use environment variables or secret management:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
import (
"crypto/tls"
"fmt"
"net/url"
"os"
"time"
"sync"
)
type CredentialManager struct {
secrets map[string]string
lastRotated time.Time
mutex sync.RWMutex
}
func NewCredentialManager() *CredentialManager {
cm := &CredentialManager{
secrets: make(map[string]string),
}
cm.loadSecrets()
return cm
}
func (cm *CredentialManager) loadSecrets() {
// Load from Kubernetes secrets or HashiCorp Vault
cm.secrets["rtsp_username"] = os.Getenv("RTSP_USERNAME")
cm.secrets["rtsp_password"] = os.Getenv("RTSP_PASSWORD")
}
func (cm *CredentialManager) GetRTSPURL(baseURL string) string {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
username := cm.secrets["rtsp_username"]
password := cm.secrets["rtsp_password"]
if username != "" && password != "" {
return fmt.Sprintf("rtsp://%s:%s@%s", username, password, baseURL)
}
return baseURL
}
// Safe logging helper: mask credentials in RTSP URLs before logging
func maskCredentials(rawURL string) string {
u, err := url.Parse(rawURL)
if err != nil {
return rawURL
}
if u.User != nil {
u.User = url.UserPassword("***", "***")
}
return u.String()
}
// Secret rotation - periodically refresh secrets
func (cm *CredentialManager) RotateSecrets() {
cm.mutex.Lock()
defer cm.mutex.Unlock()
// Load new secrets
cm.loadSecrets()
cm.lastRotated = time.Now()
}
|
19.2 RTSP Authentication and TLS
To secure RTSP connections:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import (
"crypto/tls"
"net/url"
)
type SecureRTSPClient struct {
tlsConfig *tls.Config
}
func NewSecureRTSPClient(certFile, keyFile string) (*SecureRTSPClient, error) {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
return &SecureRTSPClient{
tlsConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
},
}, nil
}
// Use RTSP over TLS (RTSPS)
func (src *SecureRTSPClient) GetSecureRTSPURL(baseURL string) string {
u, err := url.Parse(baseURL)
if err != nil {
return baseURL
}
// Use RTSPS (RTSP over TLS)
if u.Scheme == "rtsp" {
u.Scheme = "rtsps"
}
return u.String()
}
|
19.3 Video Stream Encryption (SRTP)
Use SRTP (Secure Real-time Transport Protocol) to encrypt real-time video streams:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"io"
)
type StreamEncryptor struct {
aesgcm cipher.AEAD
nonce []byte
}
func NewStreamEncryptor(key []byte) (*StreamEncryptor, error) {
block, err := aes.NewCipher(key)
if err != nil {
return nil, err
}
aesgcm, err := cipher.NewGCM(block)
if err != nil {
return nil, err
}
nonce := make([]byte, aesgcm.NonceSize())
if _, err := io.ReadFull(rand.Reader, nonce); err != nil {
return nil, err
}
return &StreamEncryptor{
aesgcm: aesgcm,
nonce: nonce,
}, nil
}
func (se *StreamEncryptor) EncryptFrame(frameData []byte) ([]byte, error) {
return se.aesgcm.Seal(nil, se.nonce, frameData, nil), nil
}
func (se *StreamEncryptor) DecryptFrame(encryptedData []byte) ([]byte, error) {
return se.aesgcm.Open(nil, se.nonce, encryptedData, nil)
}
|
19.4 API Authentication & Authorization
Authentication and authorization for HTTP API endpoints:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
import (
"fmt"
"net/http"
"time"
"github.com/golang-jwt/jwt/v5"
)
type AuthMiddleware struct {
secretKey []byte
}
func NewAuthMiddleware(secretKey string) *AuthMiddleware {
return &AuthMiddleware{
secretKey: []byte(secretKey),
}
}
func (am *AuthMiddleware) Authenticate(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get("Authorization")
if token == "" {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// Validate JWT token
claims, err := am.validateToken(token)
if err != nil {
http.Error(w, "Invalid token", http.StatusUnauthorized)
return
}
// Role-based authorization
if !am.hasPermission(claims.Role, r.URL.Path) {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
next(w, r)
}
}
func (am *AuthMiddleware) validateToken(tokenString string) (*Claims, error) {
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
return am.secretKey, nil
})
if err != nil {
return nil, err
}
if claims, ok := token.Claims.(*Claims); ok && token.Valid {
return claims, nil
}
return nil, fmt.Errorf("invalid token")
}
func (am *AuthMiddleware) hasPermission(role, path string) bool {
// Simple role-based authorization
// More complex system can be used in production
// Admin can access all endpoints
if role == "admin" {
return true
}
// User can only access read-only endpoints
if role == "user" {
return path == "/api/stats" || path == "/api/health"
}
// Viewer can only access preview endpoints
if role == "viewer" {
return path == "/preview" || path == "/preview/list"
}
return false
}
type Claims struct {
Username string `json:"username"`
Role string `json:"role"`
jwt.RegisteredClaims
}
|
19.5 Container Security Best Practices
Security measures for Docker containers:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# Use non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
USER appuser
# Use minimal base image
FROM gcr.io/distroless/base-debian11
# Only expose necessary ports
EXPOSE 8080
# Read-only filesystem (if possible)
# docker run --read-only --tmpfs /tmp
# Security scanning
# docker scan <image-name>
|
Kubernetes Security Context:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
apiVersion: v1
kind: Pod
spec:
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
seccompProfile:
type: RuntimeDefault
containers:
- name: video-processor
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL
|
19.6 Certificate Management
TLS certificate management:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
import (
"crypto/tls"
"crypto/x509"
"os"
"time"
"sync"
)
type CertificateManager struct {
cert *tls.Certificate
certPool *x509.CertPool
mutex sync.RWMutex
}
func NewCertificateManager(certFile, keyFile, caFile string) (*CertificateManager, error) {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
caCert, err := os.ReadFile(caFile)
if err != nil {
return nil, err
}
certPool := x509.NewCertPool()
if !certPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to parse CA certificate")
}
return &CertificateManager{
cert: &cert,
certPool: certPool,
}, nil
}
// Check certificate expiration
func (cm *CertificateManager) CheckExpiration() (time.Duration, error) {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
cert, err := x509.ParseCertificate(cm.cert.Certificate[0])
if err != nil {
return 0, err
}
return time.Until(cert.NotAfter), nil
}
// Placeholder for certificate renewal logic (e.g., ACME / Let's Encrypt)
func (cm *CertificateManager) renewCertificate() error {
// In a real system, implement ACME client or call an external certificate manager
return fmt.Errorf("certificate renewal not implemented")
}
// For auto-renewal
func (cm *CertificateManager) StartAutoRenewal() {
go func() {
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for range ticker.C {
timeLeft, err := cm.CheckExpiration()
if err != nil {
continue
}
// Renew if less than 30 days left
if timeLeft < 30*24*time.Hour {
cm.renewCertificate()
}
}
}()
}
|
Validate frame data before processing:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import (
"errors"
"fmt"
)
func validateFrame(frame *Frame) error {
if frame == nil {
return errors.New("frame is nil")
}
if len(frame.Data) == 0 {
return errors.New("frame data is empty")
}
expectedSize := frame.Width * frame.Height * 3
if len(frame.Data) != expectedSize {
return fmt.Errorf("frame size mismatch: expected %d, got %d", expectedSize, len(frame.Data))
}
// Size limits
if frame.Width > 4096 || frame.Height > 4096 {
return errors.New("frame dimensions too large")
}
return nil
}
|
19.8 Security Monitoring and Audit Logging
Logging security events:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import (
"github.com/sirupsen/logrus"
"time"
)
type SecurityLogger struct {
logger *logrus.Logger
}
func (sl *SecurityLogger) LogSecurityEvent(eventType string, details map[string]interface{}) {
sl.logger.WithFields(logrus.Fields{
"event_type": eventType,
"timestamp": time.Now(),
"details": details,
}).Warn("Security event")
}
// Usage examples:
// sl.LogSecurityEvent("authentication_failure", map[string]interface{}{
// "ip": "192.168.1.100",
// "username": "admin",
// })
// sl.LogSecurityEvent("unauthorized_access", map[string]interface{}{
// "endpoint": "/api/frames",
// "user": "user123",
// })
|
20. Complete Example Application
Example bringing all parts together:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("Shutting down...")
cancel()
}()
// Create video source
source := NewVideoSource("rtsp://camera:554/stream", 640, 480)
// Create pipeline
pipeline := NewPipeline(source)
// Add motion detector
motionDetector := NewMotionDetector(0.05, 640, 480) // 5% threshold
pipeline.AddProcessor(motionDetector)
// Add object detector (optional, graceful degradation on error)
// objectDetector, err := NewObjectDetector("model.onnx", "config.yaml", "classes.txt")
// if err == nil {
// pipeline.AddProcessor(objectDetector)
// }
// Add edge decision layer
mqttPub, err := NewMQTTPublisher("tcp://mqtt:1883", "video/events", "video-processor")
if err != nil {
log.Fatalf("Failed to create MQTT publisher: %v", err)
}
decisionLayer := NewEdgeDecisionLayer(0.7, mqttPub, 5*time.Second)
pipeline.AddProcessor(decisionLayer)
// Start pipeline (in goroutine)
pipelineDone := make(chan error, 1)
go func() {
pipelineDone <- pipeline.Run(ctx)
}()
// Wait for shutdown signal
<-ctx.Done()
log.Println("Shutting down gracefully...")
// Timeout for graceful shutdown
shutdownTimeout := 30 * time.Second
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer shutdownCancel()
// Stop pipeline with hard timeout fallback
shutdownDone := make(chan error, 1)
go func() {
shutdownDone <- pipeline.Shutdown(shutdownCtx)
}()
select {
case err := <-shutdownDone:
if err != nil {
log.Printf("Shutdown error: %v", err)
} else {
log.Println("Pipeline stopped successfully")
}
case <-time.After(30 * time.Second):
log.Println("Force killing after shutdown timeout!")
os.Exit(1)
}
// Close MQTT connection (best effort)
if err := mqttPub.Close(); err != nil {
log.Printf("MQTT close error: %v", err)
}
}
|
21. Frame Recording and Storage
To record frames on important events:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
import (
"fmt"
"os"
"sync"
)
type FrameRecorder struct {
storagePath string
maxFrames int
frames []*Frame
mutex sync.Mutex
}
func (fr *FrameRecorder) RecordFrame(frame *Frame) error {
fr.mutex.Lock()
defer fr.mutex.Unlock()
if len(fr.frames) >= fr.maxFrames {
// Remove oldest frame
fr.frames = fr.frames[1:]
}
fr.frames = append(fr.frames, frame)
return nil
}
func (fr *FrameRecorder) SaveEvent(eventID string) error {
fr.mutex.Lock()
frames := make([]*Frame, len(fr.frames))
copy(frames, fr.frames)
fr.mutex.Unlock()
// Save frames as JPEG
for i, frame := range frames {
jpegData, err := FrameToJPEG(frame, 90)
if err != nil {
return err
}
filename := fmt.Sprintf("%s/%s_frame_%d.jpg", fr.storagePath, eventID, i)
if err := os.WriteFile(filename, jpegData, 0644); err != nil {
return err
}
}
return nil
}
|
22. Alerting and Notifications
Notification mechanism for events:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
import (
"log"
"time"
)
type AlertManager struct {
notifiers []Notifier
}
type Notifier interface {
Send(alert *Alert) error
}
type Alert struct {
Level string
Message string
Event *Event
Timestamp time.Time
}
// Email notifier
type EmailNotifier struct {
smtpServer string
recipients []string
}
// Slack notifier
type SlackNotifier struct {
webhookURL string
}
// SMS notifier (Twilio, etc.)
type SMSNotifier struct {
apiKey string
phone string
}
func (am *AlertManager) SendAlert(alert *Alert) {
for _, notifier := range am.notifiers {
go func(n Notifier) {
if err := n.Send(alert); err != nil {
log.Printf("Failed to send alert: %v", err)
}
}(notifier)
}
}
|
23. Hardware Requirements
Minimum Requirements (Motion Detection Only)
- CPU: 2 core, 1.5 GHz+
- RAM: 512 MB
- Storage: 1 GB (OS + application)
- Network: 10 Mbps
Recommended Requirements (Object Detection)
- CPU: 4 core, 2.0 GHz+
- RAM: 2 GB
- Storage: 5 GB
- Network: 100 Mbps
- GPU (optional): NVIDIA Jetson Nano, Intel Neural Compute Stick
Production Requirements (Multi-Camera)
- CPU: 8+ core, 2.5 GHz+
- RAM: 8 GB+
- Storage: 50 GB+ (for event recording)
- Network: 1 Gbps
- GPU: NVIDIA Jetson Xavier NX or dedicated AI accelerator
24. Real-World Experiences and Challenges
Some challenges and solutions we encountered while developing this project:
24.1 FFmpeg Process Management
Problem: FFmpeg process can crash unexpectedly or connection can drop.
Solution: Continuous monitoring and automatic restart mechanism:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
func (vs *VideoSource) StartWithMonitoring(ctx context.Context) error {
for {
if err := vs.Start(ctx); err != nil {
log.Printf("Failed to start FFmpeg: %v, retrying in 5 seconds...", err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
continue
}
}
// Check if process is running
select {
case <-ctx.Done():
return ctx.Err()
case err := <-vs.Errors():
log.Printf("FFmpeg error: %v, restarting...", err)
vs.Stop()
time.Sleep(2 * time.Second)
}
}
}
|
24.2 Memory Leaks
Problem: Memory usage continuously increases when frame buffers are not properly cleaned.
Solution: Use sync.Pool and regular GC calls:
1
2
3
4
5
6
7
8
9
|
// Call GC every 1000 frames
frameCount := 0
for frame := range frames {
frameCount++
if frameCount%1000 == 0 {
runtime.GC()
}
// Frame processing...
}
|
24.3 Network Connection Issues
Problem: RTSP connections occasionally drop, especially over WiFi.
Solution: Use TCP transport and connection timeout settings:
1
2
|
# Use TCP instead of UDP (more reliable but slightly slower)
ffmpeg -rtsp_transport tcp -i rtsp://...
|
24.4 Frame Rate Inconsistencies
Problem: Source provides 30 FPS but pipeline can only process 10-15 FPS.
Solution: Frame skipping and adaptive processing:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
type AdaptiveProcessor struct {
targetFPS int
currentFPS float64
skipRatio int
lastFrameTime time.Time
}
func (ap *AdaptiveProcessor) ShouldProcess() bool {
now := time.Now()
if ap.lastFrameTime.IsZero() {
ap.lastFrameTime = now
return true
}
elapsed := now.Sub(ap.lastFrameTime)
currentFPS := 1.0 / elapsed.Seconds()
if currentFPS > float64(ap.targetFPS)*1.2 {
// Too fast, skip some frames
ap.skipRatio++
return ap.skipRatio%2 == 0
}
ap.skipRatio = 0
ap.lastFrameTime = now
return true
}
|
24.5 Object Detection Delays
Problem: Object detection model is too slow, blocking the pipeline.
Solution: Async processing and frame buffering:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func (od *ObjectDetector) ProcessAsync(ctx context.Context, in <-chan *Frame, out chan<- *Frame) {
// Run detections asynchronously
detectionResults := make(map[int]*Frame)
resultMutex := sync.RWMutex{}
frameID := 0
for frame := range in {
currentID := frameID
frameID++
// Pass frame immediately (without detection)
select {
case out <- frame:
case <-ctx.Done():
return
}
// Do detection in background
go func(f *Frame, id int) {
detections := od.detect(f)
resultMutex.Lock()
detectionResults[id] = f
f.Metadata["detections"] = detections
resultMutex.Unlock()
}(frame, currentID)
}
}
|
24.6 Multi-Camera Management
Problem: Managing streams from multiple cameras is complex.
Solution: Camera manager pattern:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
type CameraManager struct {
cameras map[string]*VideoSource
pipelines map[string]*Pipeline
mutex sync.RWMutex
}
func (cm *CameraManager) AddCamera(id, url string, width, height int) error {
cm.mutex.Lock()
defer cm.mutex.Unlock()
source := NewVideoSource(url, width, height)
pipeline := NewPipeline(source)
cm.cameras[id] = source
cm.pipelines[id] = pipeline
return pipeline.Run(context.Background())
}
func (cm *CameraManager) RemoveCamera(id string) error {
cm.mutex.Lock()
defer cm.mutex.Unlock()
if pipeline, exists := cm.pipelines[id]; exists {
pipeline.Stop()
delete(cm.pipelines, id)
}
if source, exists := cm.cameras[id]; exists {
source.Stop()
delete(cm.cameras, id)
}
return nil
}
|
25. Troubleshooting Guide
Common issues and solutions:
25.1 FFmpeg Connection Failed
Symptoms: Cannot connect to RTSP stream
Solutions:
- Check RTSP URL format:
rtsp://username:password@ip:port/stream
- Verify network connectivity:
ping <camera-ip>
- Test with FFmpeg directly:
ffmpeg -i rtsp://...
- Use TCP transport:
-rtsp_transport tcp
25.2 High CPU Usage
Symptoms: CPU usage > 90%
Solutions:
- Reduce frame rate:
-r 10 (10 FPS)
- Lower resolution:
-vf scale=320:240
- Disable object detection (use only motion detection)
- Use GPU acceleration if available
25.3 Memory Leaks
Symptoms: Memory usage continuously increases
Solutions:
- Use
sync.Pool for frame buffers
- Call
runtime.GC() periodically
- Check for goroutine leaks with
pprof
- Monitor channel buffer sizes
25.4 Frame Drops
Symptoms: Many frames are dropped
Solutions:
- Increase channel buffer sizes
- Reduce processing load (lower resolution/FPS)
- Use frame skipping mechanism
- Check network bandwidth
26. Cost Analysis
26.1 Cloud vs Edge Processing Comparison
Cloud Video Processing (AWS Rekognition, Azure Video Analyzer):
- Per video: ~$0.10-0.50/minute
- 10 cameras, 24/7: ~$1,440-7,200/month
- Network cost: ~$200-500/month
- Total: ~$1,640-7,700/month
Edge Processing (This System):
- Edge device: ~$50-200 (one-time)
- Cloud storage (events only): ~$10-50/month
- Network (events only): ~$5-20/month
- Total: ~$15-70/month + one-time hardware
Savings: 95-99% cost reduction (excluding hardware cost)
27. Production Best Practices
27.1 Resource Limits
Always set resource limits in Docker/Kubernetes:
1
2
3
4
5
6
7
|
resources:
requests:
memory: "512Mi"
cpu: "1000m"
limits:
memory: "1Gi"
cpu: "2000m"
|
27.2 Graceful Shutdown
Ensure all goroutines terminate properly when system shuts down:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (p *Pipeline) Shutdown(ctx context.Context) error {
// Use external context for timeout / cancellation
done := make(chan error, 1)
go func() {
done <- p.Stop()
}()
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}
|
27.3 Circuit Breaker Pattern
Use circuit breaker when connecting to external services (object detection API):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
import "github.com/sony/gobreaker"
type CircuitBreakerDetector struct {
detector *ExternalDetector
cb *gobreaker.CircuitBreaker
}
func NewCircuitBreakerDetector(detector *ExternalDetector) *CircuitBreakerDetector {
cb := gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "ObjectDetection",
Timeout: 5 * time.Second,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.ConsecutiveFailures > 5
},
})
return &CircuitBreakerDetector{
detector: detector,
cb: cb,
}
}
func (cbd *CircuitBreakerDetector) Detect(frame *Frame) ([]Detection, error) {
result, err := cbd.cb.Execute(func() (interface{}, error) {
return cbd.detector.callInferenceService(frame)
})
if err != nil {
return nil, err
}
return result.([]Detection), nil
}
|
27.4 Rate Limiting
Use rate limiting for event publishing:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import "golang.org/x/time/rate"
type RateLimitedPublisher struct {
publisher EventPublisher
limiter *rate.Limiter
}
func NewRateLimitedPublisher(publisher EventPublisher, eventsPerSecond int) *RateLimitedPublisher {
return &RateLimitedPublisher{
publisher: publisher,
limiter: rate.NewLimiter(rate.Limit(eventsPerSecond), eventsPerSecond),
}
}
func (rlp *RateLimitedPublisher) Publish(event *Event) error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err := rlp.limiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limit exceeded: %w", err)
}
return rlp.publisher.Publish(event)
}
|
27.5 Structured Logging
Keep all logs in structured format:
1
2
3
4
5
6
7
8
|
logger.WithFields(logrus.Fields{
"camera_id": cameraID,
"frame_id": frameID,
"event_type": event.Type,
"confidence": event.Confidence,
"processing_ms": processingTime.Milliseconds(),
"timestamp": time.Now().Unix(),
}).Info("Event detected")
|
27.6 Metrics Collection
Collect and export important metrics:
- Frame processing rate (FPS)
- Detection accuracy
- Error rates
- Memory usage
- CPU usage
- Network latency
- Event publish success rate
27.7 Configuration Management (Viper)
Get sensitive information from environment variables or secret management:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import "github.com/spf13/viper"
func LoadConfig() (*Config, error) {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
viper.AddConfigPath("/etc/video-processor/")
// Environment variables can override
viper.SetEnvPrefix("VIDEO")
viper.AutomaticEnv()
if err := viper.ReadInConfig(); err != nil {
return nil, err
}
var config Config
if err := viper.Unmarshal(&config); err != nil {
return nil, err
}
return &config, nil
}
|
28. Alternative Approaches and Trade-offs
28.1 FFmpeg vs GStreamer
FFmpeg:
- โ
More widespread, well documented
- โ
Support for many formats
- โ Process overhead (pipe I/O for each frame)
- โ Slightly slower
GStreamer:
- โ
More optimized, faster
- โ
Plugin architecture
- โ Less documented
- โ More complex setup
Decision: We chose FFmpeg because it’s more widespread and documented. GStreamer may be more performant but FFmpeg is sufficient for our needs.
28.2 GoCV vs External Service
GoCV (In-process):
- โ
Low latency
- โ
No network dependency
- โ Model included in binary (large binary)
- โ High CPU usage
External Service:
- โ
Run model in separate service (easy GPU usage)
- โ
Smaller binary
- โ Network latency
- โ Additional service management
Decision: We use GoCV for small models, external service for large models.
28.3 MQTT vs Kafka vs Webhook
MQTT:
- โ
Lightweight, ideal for edge
- โ
Low latency
- โ
QoS support
- โ No message ordering guarantee
Kafka:
- โ
High throughput
- โ
Message ordering
- โ Heavier, too much for edge
- โ Higher latency
Webhook:
- โ
Simple HTTP
- โ
Easy integration
- โ No retry mechanism
- โ Network dependency
Decision: We use MQTT for sending events from edge to cloud, Kafka when high throughput is required.
28.4 Alternative Approaches to FFmpeg
Go-based video processing libraries are also available besides FFmpeg:
github.com/3d0c/gmf (Go Media Framework):
- โ
Pure Go implementation
- โ
No FFmpeg dependency
- โ Limited format support
- โ Slow active development
github.com/asticode/go-astits:
- โ
MPEG-TS stream parsing
- โ
Pure Go
- โ Only MPEG-TS format
Why FFmpeg Was Preferred?
- Format support: 100+ video/audio formats
- Stability: Used in production for years
- Performance: Written in C, optimized
- Documentation: Comprehensive and up-to-date
- Community support: Large user base
Decision: We chose FFmpeg because format support, stability, and performance were critical. Go-based alternatives may mature in the future.
29. Machine Learning Model Deployment
ML model deployment in edge processing systems is a critical topic. Model versioning, A/B testing, and canary deployment strategies:
29.1 Model Versioning
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
type ModelManager struct {
models map[string]*Model
activeModel string
mutex sync.RWMutex
}
type Model struct {
Version string
Path string
LoadedAt time.Time
Performance ModelMetrics
}
// Predict is a placeholder for the actual inference logic of the model.
func (m *Model) Predict(frame *Frame) ([]Detection, error) {
// In a real system, this would run inference using m.Path / loaded net, etc.
return nil, nil
}
func (mm *ModelManager) LoadModel(version, path string) error {
mm.mutex.Lock()
defer mm.mutex.Unlock()
// Load model
model := &Model{
Version: version,
Path: path,
LoadedAt: time.Now(),
}
mm.models[version] = model
// Make active if first model
if mm.activeModel == "" {
mm.activeModel = version
}
return nil
}
func (mm *ModelManager) SwitchModel(version string) error {
mm.mutex.Lock()
defer mm.mutex.Unlock()
if _, exists := mm.models[version]; !exists {
return fmt.Errorf("model version %s not found", version)
}
mm.activeModel = version
return nil
}
|
29.2 A/B Testing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
import (
"fmt"
"math/rand"
"sync"
"time"
)
type ABTester struct {
modelA *Model
modelB *Model
splitRatio float64 // 0.0-1.0, for modelA
results map[string]ABResult
mutex sync.RWMutex
}
type ABResult struct {
ModelAAccuracy float64
ModelBAccuracy float64
ModelALatency time.Duration
ModelBLatency time.Duration
}
func (ab *ABTester) Predict(frame *Frame) ([]Detection, error) {
// Randomly select model (based on split ratio)
useModelA := rand.Float64() < ab.splitRatio
var model *Model
var modelName string
if useModelA {
model = ab.modelA
modelName = "A"
} else {
model = ab.modelB
modelName = "B"
}
start := time.Now()
detections, err := model.Predict(frame)
latency := time.Since(start)
// Record results
ab.recordResult(modelName, detections, latency)
return detections, err
}
// recordResult stores basic statistics about A/B test runs (placeholder implementation)
func (ab *ABTester) recordResult(modelName string, detections []Detection, latency time.Duration) {
ab.mutex.Lock()
defer ab.mutex.Unlock()
// Initialize results map if needed
if ab.results == nil {
ab.results = make(map[string]ABResult)
}
res := ab.results[modelName]
if modelName == "A" {
res.ModelALatency = latency
} else {
res.ModelBLatency = latency
}
ab.results[modelName] = res
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
type ModelMetrics struct {
TotalInferences int64
AverageLatency time.Duration
Accuracy float64
FalsePositives int64
FalseNegatives int64
mutex sync.RWMutex
}
func (mm *ModelMetrics) RecordInference(latency time.Duration, correct bool) {
mm.mutex.Lock()
defer mm.mutex.Unlock()
mm.TotalInferences++
// Moving average latency
if mm.AverageLatency == 0 {
mm.AverageLatency = latency
} else {
mm.AverageLatency = (mm.AverageLatency + latency) / 2
}
// Calculate accuracy
if correct {
mm.Accuracy = float64(mm.TotalInferences-mm.FalsePositives-mm.FalseNegatives) / float64(mm.TotalInferences)
}
}
|
29.4 Hardware Acceleration and GPU Support
GPU support is critical for integration with edge AI devices:
NVIDIA Jetson Integration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import "gocv.io/x/gocv"
func NewGPUAcceleratedDetector(modelPath string) (*ObjectDetector, error) {
net := gocv.ReadNet(modelPath, "")
if net.Empty() {
return nil, fmt.Errorf("failed to load model")
}
// Use CUDA backend
net.SetPreferableBackend(gocv.NetBackendCUDA)
net.SetPreferableTarget(gocv.NetTargetCUDA)
return &ObjectDetector{net: net}, nil
}
|
Intel Neural Compute Stick (OpenVINO)
1
2
3
|
// With OpenVINO backend
net.SetPreferableBackend(gocv.NetBackendOpenVINO)
net.SetPreferableTarget(gocv.NetTargetVPU) // For Intel NCS
|
CUDA, OpenCL, Vulkan Comparison
| API |
Platform |
Performance |
Edge Suitability |
| CUDA |
NVIDIA |
โญโญโญโญโญ |
Jetson series |
| OpenCL |
Cross-platform |
โญโญโญ |
General GPUs |
| Vulkan |
Cross-platform |
โญโญโญโญ |
Modern GPUs |
Recommendation: CUDA for NVIDIA Jetson, OpenVINO for general use.
30. Video Quality Metrics
Various metrics can be used to measure video quality:
30.1 PSNR (Peak Signal-to-Noise Ratio)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import "math"
func CalculatePSNR(original, processed *Frame) float64 {
mse := calculateMSE(original, processed)
if mse == 0 {
return math.Inf(1) // Perfect match
}
maxPixelValue := 255.0
psnr := 20 * math.Log10(maxPixelValue/math.Sqrt(mse))
return psnr
}
func calculateMSE(original, processed *Frame) float64 {
if len(original.Data) != len(processed.Data) {
return math.MaxFloat64
}
var sum float64
for i := 0; i < len(original.Data); i++ {
diff := float64(original.Data[i]) - float64(processed.Data[i])
sum += diff * diff
}
return sum / float64(len(original.Data))
}
|
30.2 SSIM (Structural Similarity Index)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import "math"
func CalculateSSIM(original, processed *Frame) float64 {
// Simplified SSIM implementation
// Full implementation requires luminance, contrast, structure calculations
mu1 := calculateMean(original)
mu2 := calculateMean(processed)
sigma1 := calculateStdDev(original, mu1)
sigma2 := calculateStdDev(processed, mu2)
sigma12 := calculateCovariance(original, processed, mu1, mu2)
c1 := 0.01 * 255 * 0.01 * 255
c2 := 0.03 * 255 * 0.03 * 255
numerator := (2*mu1*mu2 + c1) * (2*sigma12 + c2)
denominator := (mu1*mu1 + mu2*mu2 + c1) * (sigma1*sigma1 + sigma2*sigma2 + c2)
return numerator / denominator
}
func calculateMean(frame *Frame) float64 {
var sum float64
for _, v := range frame.Data {
sum += float64(v)
}
return sum / float64(len(frame.Data))
}
func calculateStdDev(frame *Frame, mean float64) float64 {
var sum float64
for _, v := range frame.Data {
diff := float64(v) - mean
sum += diff * diff
}
return math.Sqrt(sum / float64(len(frame.Data)))
}
func calculateCovariance(f1, f2 *Frame, mean1, mean2 float64) float64 {
var sum float64
minLen := len(f1.Data)
if len(f2.Data) < minLen {
minLen = len(f2.Data)
}
for i := 0; i < minLen; i++ {
sum += (float64(f1.Data[i]) - mean1) * (float64(f2.Data[i]) - mean2)
}
return sum / float64(minLen)
}
|
31. Load Balancing and High Availability
Load balancing and failover mechanisms for high availability in production environments:
31.1 Multi-Instance Deployment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
type LoadBalancer struct {
instances []*VideoProcessor
current int
mutex sync.Mutex
}
func (lb *LoadBalancer) ProcessFrame(frame *Frame) error {
lb.mutex.Lock()
instance := lb.instances[lb.current]
lb.current = (lb.current + 1) % len(lb.instances)
lb.mutex.Unlock()
return instance.Process(frame)
}
func (lb *LoadBalancer) HealthCheck() {
lb.mutex.Lock()
defer lb.mutex.Unlock()
for i, instance := range lb.instances {
if !instance.IsHealthy() {
// Remove unhealthy instance from list
lb.removeInstance(i)
}
}
}
func (lb *LoadBalancer) removeInstance(index int) {
if index < 0 || index >= len(lb.instances) {
return
}
// Remove instance from list
lb.instances = append(lb.instances[:index], lb.instances[index+1:]...)
// Update current index
if lb.current >= len(lb.instances) {
lb.current = 0
}
}
|
31.2 Failover Mechanism
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
type FailoverManager struct {
primary *VideoProcessor
secondary *VideoProcessor
active *VideoProcessor
mutex sync.RWMutex
}
func (fm *FailoverManager) ProcessFrame(frame *Frame) error {
fm.mutex.RLock()
active := fm.active
fm.mutex.RUnlock()
err := active.Process(frame)
if err != nil {
// Failover
fm.switchToSecondary()
return fm.active.Process(frame)
}
return nil
}
func (fm *FailoverManager) switchToSecondary() {
fm.mutex.Lock()
defer fm.mutex.Unlock()
if fm.active == fm.primary {
fm.active = fm.secondary
} else {
fm.active = fm.primary
}
}
|
32. Conclusion
In this article, we shared the core components and architecture of a real-time video analysis system used in production. The edge processing approach with Go enables excellent development of high-performance, scalable, and cost-effective systems. Go’s goroutine model provides an ideal structure for video processing pipelines and allows us to easily manage frame flow through channels.
32.1 Our Project Experiences
Key experiences we gained while developing and using this system in production:
- Modular architecture is critical - Each component being able to work independently makes system maintenance and development easier
- Error handling and resilience - Not everything goes smoothly in production, so graceful degradation and retry mechanisms are essential
- You’re flying blind without monitoring - System health and performance must be continuously monitored
- Edge processing really saves costs - 95%+ savings possible compared to cloud processing
- Go’s concurrency model is ideal for video processing - Goroutine and channel structure makes managing pipelines very easy
- C to Go migration experience - Rewriting a system written in C with Go provided significant improvements in both performance and developer experience
32.2 Summary
- Edge processing approach reduces latency and optimizes bandwidth usage
- Go’s concurrency model allows us to efficiently manage video pipelines
- Modular architecture allows us to easily add and remove different analysis techniques
- Graceful degradation and error handling enable building reliable systems in production environments
- Techniques like ROI detection and network optimization significantly improve system performance
- Machine learning model deployment strategies ensure continuous system improvement
32.3 Next Steps
- GPU support - Integration with edge AI devices like NVIDIA Jetson, Intel Neural Compute Stick (Section 29.4 started)
- Distributed processing - Distributed edge processing for multiple cameras
- Real-time streaming - Streaming analyzed frames in real-time
- Advanced analytics - Face recognition, behavior analysis, tracking algorithms
- Edge-to-cloud sync - Combining analysis results in cloud and reporting
- WebRTC integration - Browser-based real-time preview and control
- Advanced compression - AV1 codec support and adaptive compression
32.4 Advanced Topics (Separate Article Topics)
This article covers basic and intermediate topics. The following topics are recommended for separate articles:
Distributed Edge Processing:
- Leader election and consensus algorithms
- Edge-cloud synchronization (eventual consistency)
- Multi-edge device coordination
CI/CD and GitOps:
- Multi-architecture Docker images (arm/v7, arm64, amd64)
- Edge deployment with ArgoCD, Flux
- Automated testing and deployment pipelines
Monitoring & Observability:
- Distributed tracing (Jaeger, OpenTelemetry)
- Metric analysis for anomaly detection
- Advanced alerting strategies
Security In-Depth:
- SRTP, DTLS video stream encryption
- ONVIF, digest auth camera authentication
- Container security (AppArmor, Seccomp)
Legal and Ethical:
- GDPR, KVKK compliance
- Video recording storage and disposal policies
- Privacy-by-design approaches
Performance Optimization:
- Model quantization, pruning, distillation
- Hardware codec integration (VA-API, VDPAU, NVDEC)
- Memory management optimizations (HugeTLB, alignment)
Network Protocols:
- SRT (Secure Reliable Transport)
- Streaming over QUIC protocol
- Time-Sensitive Networking (TSN) for industrial environments
32.5 Code Examples and Repository
The code examples shared in this article consist of core components of a system used in production. For full implementation and additional features:
Note: Code examples and full implementation have been customized according to project requirements. The examples in this article demonstrate the basic architecture and approach of the system.
32.6 Learning Resources
32.7 Conclusion and Recommendations
This system has been successfully running in production and provides the following advantages:
- 95%+ cost savings - Edge processing instead of cloud processing
- <100ms latency - Real-time decision making
- Scalable architecture - Each camera works independently
- Reliable system - Graceful degradation and error recovery
Recommendations for getting started:
- Start with a single camera first
- Test with motion detection
- Add object detection later
- Do load testing before going to production
- Always set up monitoring and alerting
Note: This article has been prepared as a technical guide. Comprehensive testing is recommended before using in production environments.