AMWA NMOS: SMPTE ST 2110 için Kontrol Düzlemi ve Go ile Uygulama
Özet
- NMOS Genel Bakış: Ağ üzerinden medya kontrolü ve yönetimi için açık spesifikasyonlar
- IS-04: Cihazların ve kaynakların keşfi ve kaydı
- IS-05: Otomatik yönlendirme için cihaz bağlantı yönetimi
- IS-06: Switch’ler ve ağ altyapısı için SDN kontrolü
- IS-08: Cihazlar içinde ses yönlendirmesi için kanal haritalama
- IS-09: Global ağ zamanlaması için sistem parametreleri
- Go Implementasyonu: Production-ready NMOS client ve registry server
- Gerçek Dünya Kullanım Senaryoları: Otomatik iş akışları, kaynak keşfi, bağlantı yönetimi, SDN entegrasyonu
Not: Bu makale, AMWA NMOS spesifikasyonlarının kapsamlı bir şekilde ele alınmasını ve pratik Go implementasyonlarını içerir. Tüm kod örnekleri gerçek broadcast ortamlarına dayanır ve NMOS best practice’lerini takip eder.
1. Giriş: AMWA NMOS Nedir?
1.1 Geleneksel Broadcast Kontrolünün Sorunu
SMPTE ST 2110 hakkında yazdığım makalede, video, ses ve metadata’nın IP ağları üzerinden nasıl taşındığını tartışmıştık. Ancak ST 2110 sadece veri düzlemini (data plane - medyanın nasıl aktığını) tanımlar. Şunları ele almaz:
- Cihaz Keşfi: Cihazlar birbirlerini ağda nasıl bulur?
- Bağlantı Yönetimi: Video/sesi cihazlar arasında nasıl yönlendiririz?
- Konfigürasyon: Cihaz parametrelerini nasıl yönetiriz?
- İzleme: Kaynak kullanılabilirliğini ve sağlığını nasıl takip ederiz?
- Birlikte Çalışabilirlik: Farklı üreticilerin cihazları nasıl birlikte çalışır?
Geleneksel SDI ortamlarında, kontrol panelli fiziksel router’larımız vardı. IP tabanlı ST 2110’da, standartlaştırılmış bir kontrol düzlemine (control plane) ihtiyacımız var.
1.2 SDI vs NMOS: Temel Farklar
NMOS’a dalmadan önce, geleneksel SDI iş akışlarıyla nasıl karşılaştırıldığını anlayalım:
| Özellik |
Geleneksel SDI |
NMOS/ST 2110 |
| Keşif |
Manuel (etiketler, dokümanlar) |
Otomatik (IS-04) |
| Yönlendirme |
Fiziksel router/patch panel |
Yazılım tabanlı (IS-05) |
| Ölçeklenebilirlik |
Fiziksel portlarla sınırlı |
Sınırsız (ağ bant genişliği) |
| Esneklik |
Sabit kablo yolları |
Dinamik, yeniden yapılandırılabilir |
| Maliyet |
Yüksek (kablolar, router’lar) |
Düşük (Ethernet altyapısı) |
| Uzaktan Kontrol |
Sınırlı |
Tam API erişimi |
| Metadata |
Sınırlı (VANC) |
Zengin (JSON) |
| Multi-viewer |
Özel donanım |
Yazılım tabanlı |
| Yedeklilik |
Çift kablolama |
Ağ yedekliliği |
| Entegrasyon |
Zor |
API-first, kolay |
Gerçek Dünya Etkisi:
- SDI: 100 kameralı tesis, 100+ kablo, 100+ I/O’lu fiziksel router gerektirir
- NMOS: Aynı tesis 10GbE ağ üzerinde, sanal yönlendirme, yeniden kablolama olmadan kamera ekleme
1.3 AMWA NMOS: Çözüm
AMWA (Advanced Media Workflow Association), bu zorlukları çözmek için NMOS (Networked Media Open Specifications) geliştirdi. NMOS, şunları sağlayan bir spesifikasyon ailesidir:
- Otomatik Keşif: Cihazlar kendilerini kaydeder ve yeteneklerini duyurur
- RESTful API’ler: Cihaz kontrolü için standart HTTP API’leri
- WebSocket Event’leri: Ağ değişikliklerinin gerçek zamanlı bildirimleri
- Vendor Agnostic: Herhangi bir üreticinin ST 2110 ekipmanıyla çalışır
- Ölçeklenebilir: Küçük stüdyolardan büyük broadcast tesislerine
1.4 NMOS Spesifikasyon Ailesi
NMOS, birden fazla spesifikasyondan (“IS” - Interface Specifications) oluşur:
| Spesifikasyon |
Amaç |
Durum |
| IS-04 |
Keşif ve Kayıt |
Stabil |
| IS-05 |
Cihaz Bağlantı Yönetimi |
Stabil |
| IS-06 |
Ağ Kontrolü (SDN) |
✅ Makale ile birlikte |
| IS-07 |
Event ve Tally |
Stabil |
| IS-08 |
Kanal Haritalama (Ses) |
Stabil |
| IS-09 |
Sistem Parametreleri (Zamanlama) |
Stabil |
| IS-10 |
Yetkilendirme (OAuth2) |
Stabil |
| BCP-003 |
Güvenlik Best Practice’leri |
Stabil |
Bu makalede, IS-04, IS-05, IS-08 ve IS-09 spesifikasyonlarına ve Go implementasyonlarına odaklanacağız.
2. NMOS Mimarisi Genel Bakış
2.1 Üst Seviye Mimari
2.2 Temel Bileşenler
- NMOS Node: NMOS’a katılan herhangi bir cihaz (kameralar, encoder’lar, switcher’lar)
- NMOS Registry: Kaynak kaydı ve keşfi için merkezi servis
- NMOS Controller: Bağlantıları ve iş akışlarını yöneten uygulama
3. IS-04: Keşif ve Kayıt
3.1 IS-04 Genel Bakış
IS-04, NMOS’un temelidir. Cihazların şunları nasıl yaptığını tanımlar:
- DNS-SD (mDNS/unicast DNS) kullanarak NMOS Registry’yi keşfetme
- Kaynaklarını kaydetme (nodes, devices, sources, flows, senders, receivers)
- Mevcut kaynakları sorgulama
- WebSocket üzerinden gerçek zamanlı güncellemeler alma
3.2 IS-04 Kaynak Modeli
IS-04 kaynak hiyerarşisi:
1
2
3
4
5
6
|
Node (Cihaz/Sunucu)
└── Device (Mantıksal İşlem Birimi)
├── Source (Medya Kaynağı)
├── Flow (Mantıksal Medya Akışı)
├── Sender (Flow'u İletir)
└── Receiver (Flow'u Alır)
|
Örnek Senaryo: Gömülü encoder’lı kamera
- Node: Kamera donanımı
- Device: Encoder chip
- Source: Kamera sensör çıkışı
- Flow: Sıkıştırılmış video akışı (H.264)
- Sender: Akışı ileten ağ arayüzü
- Receiver: (dönüş feed monitörü varsa)
3.3 Kaynak Nitelikleri
Her kaynağın şunları vardır:
- id: UUID (örn.
58f6b536-ca4c-43fd-880e-9df2af5d5d94)
- version: Zaman damgası (örn.
1735545600:0)
- label: İnsan tarafından okunabilir isim
- description: Detaylı açıklama
- tags: Gruplama için metadata
3.4 IS-04 API’leri
IS-04 iki ana API tanımlar:
- Registration API (port 3210): Node’ların kaynakları kaydetmesi için
- Query API (port 3211): Controller’ların kaynakları keşfetmesi için
3.5 IS-04 Kayıt Akışı
İşte tam kayıt sıralaması:
Önemli Noktalar:
- Kaynaklar hiyerarşik sırayla kaydedilmelidir (Node → Device → Source/Flow → Sender)
- Heartbeat her 5 saniyede zorunludur (12 saniyelik timeout)
- WebSocket aboneleri gerçek zamanlı güncellemeler alır
- Graceful shutdown açık DELETE istekleri gerektirir
4. Go ile NMOS Registry Oluşturma
IS-04’ü implement eden production-ready bir NMOS Registry oluşturalım.
4.1 Proje Yapısı
1
2
3
4
5
6
7
8
9
10
11
12
|
nmos-registry/
├── main.go
├── registry/
│ ├── registry.go # Temel registry mantığı
│ ├── registration.go # Registration API (IS-04)
│ ├── query.go # Query API (IS-04)
│ ├── websocket.go # WebSocket subscription'lar
│ └── models.go # NMOS kaynak modelleri
├── storage/
│ ├── memory.go # In-memory depolama
│ └── postgres.go # PostgreSQL depolama (production)
└── go.mod
|
4.2 NMOS Kaynak Modelleri
Önce IS-04 kaynak modellerini tanımlayalım:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
// registry/models.go
package registry
import (
"time"
)
// Resource, tüm NMOS kaynakları için temel
type Resource struct {
ID string `json:"id"`
Version string `json:"version"`
Label string `json:"label"`
Description string `json:"description"`
Tags map[string]string `json:"tags"`
}
// Node, cihazlar için mantıksal host'u temsil eder
type Node struct {
Resource
Hostname string `json:"hostname"`
APIVersions []string `json:"api"`
Caps interface{} `json:"caps"`
Services []Service `json:"services"`
Clocks []Clock `json:"clocks"`
Interfaces []Interface `json:"interfaces"`
}
// Device, bir işlevsellik birimini temsil eder (encoder, decoder, vb.)
type Device struct {
Resource
Type string `json:"type"` // generic, pipeline, host
NodeID string `json:"node_id"`
Senders []string `json:"senders"` // Sender ID'leri dizisi
Receivers []string `json:"receivers"` // Receiver ID'leri dizisi
Controls []Control `json:"controls"`
}
// Source, medya içeriğinin kaynağını temsil eder
type Source struct {
Resource
Format string `json:"format"` // video, audio, data
DeviceID string `json:"device_id"`
ClockName string `json:"clock_name"`
GrainRate *Rational `json:"grain_rate,omitempty"`
// ... ek alanlar: caps, parents, ses için channels
}
// Flow, bir medya akışını temsil eder
type Flow struct {
Resource
Format string `json:"format"` // urn:x-nmos:format:video, audio, data
SourceID string `json:"source_id"`
DeviceID string `json:"device_id"`
GrainRate *Rational `json:"grain_rate,omitempty"`
MediaType string `json:"media_type"` // video/raw, audio/L24, vb.
// Video-spesifik: frame_width, frame_height, colorspace, interlace_mode
// Ses-spesifik: sample_rate, bit_depth, channels
// ... tam alan listesi için NMOS IS-04 spec'e bakın
}
// Sender bir flow'u iletir
type Sender struct {
Resource
FlowID string `json:"flow_id"`
Transport string `json:"transport"` // urn:x-nmos:transport:rtp
DeviceID string `json:"device_id"`
ManifestHref string `json:"manifest_href"` // SDP dosya URL'i
InterfaceBindings []string `json:"interface_bindings"`
SubscriptionID string `json:"subscription,omitempty"`
}
// Receiver bir flow'u alır
type Receiver struct {
Resource
Format string `json:"format"` // urn:x-nmos:format:video
Caps ReceiverCaps `json:"caps"`
Transport string `json:"transport"`
DeviceID string `json:"device_id"`
InterfaceBindings []string `json:"interface_bindings"`
SubscriptionID string `json:"subscription,omitempty"`
}
// Destekleyici yapılar (basitleştirilmiş - tüm alanlar için spec'e bakın)
type Service struct {
Href string `json:"href"`
Type string `json:"type"`
}
type Clock struct {
Name string `json:"name"`
RefType string `json:"ref_type"` // internal, ptp, vb.
}
type Interface struct {
Name string `json:"name"`
// ... chassis_id, port_id, vb.
}
type Rational struct {
Numerator int `json:"numerator"`
Denominator int `json:"denominator"`
}
type ReceiverCaps struct {
MediaTypes []string `json:"media_types,omitempty"`
}
// Tam kaynak tanımları için: https://specs.amwa.tv/is-04/
// Version, NMOS version zaman damgası oluşturur
func CreateVersion() string {
now := time.Now()
seconds := now.Unix()
nanos := now.Nanosecond()
return fmt.Sprintf("%d:%d", seconds, nanos)
}
|
4.3 Registry Temel Mantığı
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
|
// registry/registry.go
package registry
import (
"fmt"
"sync"
"time"
)
// Registry, NMOS kaynaklarını yönetir
type Registry struct {
mu sync.RWMutex
nodes map[string]*Node
devices map[string]*Device
sources map[string]*Source
flows map[string]*Flow
senders map[string]*Sender
receivers map[string]*Receiver
// Sağlık takibi
health map[string]time.Time // node_id -> son heartbeat
// WebSocket subscription'lar
subscribers map[string]*Subscriber
}
type Subscriber struct {
ID string
Query SubscriptionQuery
Updates chan ResourceUpdate
}
type SubscriptionQuery struct {
ResourceType string // node, device, sender, receiver
Params map[string]string // sorgu parametreleri
}
type ResourceUpdate struct {
Action string // create, update, delete
Type string // kaynak türü
Resource interface{} // kaynak
}
// NewRegistry yeni bir NMOS registry oluşturur
func NewRegistry() *Registry {
r := &Registry{
nodes: make(map[string]*Node),
devices: make(map[string]*Device),
sources: make(map[string]*Source),
flows: make(map[string]*Flow),
senders: make(map[string]*Sender),
receivers: make(map[string]*Receiver),
health: make(map[string]time.Time),
subscribers: make(map[string]*Subscriber),
}
// Sağlık kontrolü goroutine'ini başlat
go r.healthCheckLoop()
return r
}
// RegisterNode bir node'u kaydeder veya günceller
func (r *Registry) RegisterNode(node *Node) error {
r.mu.Lock()
defer r.mu.Unlock()
existing, exists := r.nodes[node.ID]
action := "create"
if exists {
action = "update"
// Version'ı güncelle
node.Version = CreateVersion()
}
r.nodes[node.ID] = node
r.health[node.ID] = time.Now()
// Aboneleri bilgilendir
r.notifySubscribers(ResourceUpdate{
Action: action,
Type: "node",
Resource: node,
})
return nil
}
// RegisterDevice, RegisterSource, RegisterFlow, RegisterSender, RegisterReceiver
// Hepsi aynı pattern'i takip eder. İşte generic implementasyon:
func (r *Registry) registerResource(id string, resource interface{}, resourceType string,
resourceMap map[string]interface{}, validate func() error) error {
r.mu.Lock()
defer r.mu.Unlock()
// Opsiyonel validasyon
if validate != nil {
if err := validate(); err != nil {
return err
}
}
action := "create"
if _, exists := resourceMap[id]; exists {
action = "update"
// Version'ı güncelle (kaynak türüne bağlı implementasyon)
}
resourceMap[id] = resource
r.notifySubscribers(ResourceUpdate{
Action: action,
Type: resourceType,
Resource: resource,
})
return nil
}
// Kolaylık wrapper'ları:
func (r *Registry) RegisterDevice(device *Device) error {
return r.registerResource(device.ID, device, "device",
cast(r.devices), func() error {
if _, exists := r.nodes[device.NodeID]; !exists {
return fmt.Errorf("node %s bulunamadı", device.NodeID)
}
return nil
})
}
func (r *Registry) RegisterSender(sender *Sender) error {
return r.registerResource(sender.ID, sender, "sender", cast(r.senders), nil)
}
func (r *Registry) RegisterReceiver(receiver *Receiver) error {
return r.registerResource(receiver.ID, receiver, "receiver", cast(r.receivers), nil)
}
// RegisterSource, RegisterFlow için benzer şekilde...
// Query metotları - Generic getter
func getResources[T any](mu *sync.RWMutex, resourceMap map[string]*T) []*T {
mu.RLock()
defer mu.RUnlock()
resources := make([]*T, 0, len(resourceMap))
for _, resource := range resourceMap {
resources = append(resources, resource)
}
return resources
}
// Kolaylık metotları
func (r *Registry) GetNodes() []*Node {
return getResources(&r.mu, r.nodes)
}
func (r *Registry) GetSenders() []*Sender {
return getResources(&r.mu, r.senders)
}
func (r *Registry) GetReceivers() []*Receiver {
return getResources(&r.mu, r.receivers)
}
// GetDevices(), GetSources(), GetFlows() için benzer şekilde...
// Sağlık kontrolü - Eski node'ları temizle
func (r *Registry) healthCheckLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
r.checkHealth()
}
}
func (r *Registry) checkHealth() {
r.mu.Lock()
defer r.mu.Unlock()
staleThreshold := 12 * time.Second // IS-04 spec: node'lar her 5s'de heartbeat göndermeli
now := time.Now()
for nodeID, lastSeen := range r.health {
if now.Sub(lastSeen) > staleThreshold {
// Eski node ve kaynaklarını kaldır
delete(r.nodes, nodeID)
delete(r.health, nodeID)
// İlişkili device, sender, receiver'ları kaldır
r.removeNodeResources(nodeID)
fmt.Printf("Eski node kaldırıldı: %s\n", nodeID)
}
}
}
func (r *Registry) removeNodeResources(nodeID string) {
// Device'ları kaldır
for id, device := range r.devices {
if device.NodeID == nodeID {
delete(r.devices, id)
}
}
// Kaldırılan device'lara ait sender/receiver'ları kaldır
// (basitleştirilmiş - production'da device ilişkilerini takip edin)
}
// Heartbeat, node sağlığını günceller
func (r *Registry) Heartbeat(nodeID string) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.nodes[nodeID]; !exists {
return fmt.Errorf("node %s kayıtlı değil", nodeID)
}
r.health[nodeID] = time.Now()
return nil
}
// Subscriber yönetimi
func (r *Registry) Subscribe(query SubscriptionQuery) *Subscriber {
r.mu.Lock()
defer r.mu.Unlock()
sub := &Subscriber{
ID: generateUUID(),
Query: query,
Updates: make(chan ResourceUpdate, 100),
}
r.subscribers[sub.ID] = sub
return sub
}
func (r *Registry) Unsubscribe(subID string) {
r.mu.Lock()
defer r.mu.Unlock()
if sub, exists := r.subscribers[subID]; exists {
close(sub.Updates)
delete(r.subscribers, subID)
}
}
func (r *Registry) notifySubscribers(update ResourceUpdate) {
for _, sub := range r.subscribers {
// Subscription query'sine göre filtrele
if r.matchesQuery(update, sub.Query) {
select {
case sub.Updates <- update:
default:
// Subscriber yavaş, atla
}
}
}
}
func (r *Registry) matchesQuery(update ResourceUpdate, query SubscriptionQuery) bool {
// Kaynak türünü eşleştir
if query.ResourceType != "" && query.ResourceType != update.Type {
return false
}
// Ek sorgu parametrelerini eşleştir
// (basitleştirilmiş - production'da tam sorgu eşleştirmesi implement edin)
return true
}
func generateUUID() string {
// Production'da github.com/google/uuid kullanın
return fmt.Sprintf("%d", time.Now().UnixNano())
}
|
4.4 Registration API (HTTP Handler’lar)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
// registry/registration.go
package registry
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/gorilla/mux"
)
// RegistrationAPI, IS-04 Registration API'sini yönetir
type RegistrationAPI struct {
registry *Registry
}
func NewRegistrationAPI(registry *Registry) *RegistrationAPI {
return &RegistrationAPI{registry: registry}
}
// SetupRoutes, Registration API için HTTP route'larını yapılandırır
func (api *RegistrationAPI) SetupRoutes(router *mux.Router) {
// IS-04 v1.3 Registration API route'ları
v13 := router.PathPrefix("/x-nmos/registration/v1.3").Subrouter()
// Kaynak kaydı
v13.HandleFunc("/resource", api.RegisterResource).Methods("POST")
v13.HandleFunc("/resource", api.DeleteAllResources).Methods("DELETE")
// Sağlık kontrolü
v13.HandleFunc("/health/nodes/{nodeId}", api.Heartbeat).Methods("POST")
// Kaynak silme
v13.HandleFunc("/resource/{resourceType}/{resourceId}", api.DeleteResource).Methods("DELETE")
}
// RegisterResource, kaynak kaydını işler (POST /resource)
func (api *RegistrationAPI) RegisterResource(w http.ResponseWriter, r *http.Request) {
var rawResource map[string]interface{}
if err := json.NewDecoder(r.Body).Decode(&rawResource); err != nil {
http.Error(w, "Geçersiz JSON", http.StatusBadRequest)
return
}
// Veri'den kaynak türünü belirle
resourceType := inferResourceType(rawResource)
var err error
switch resourceType {
case "node":
var node Node
if err = remarshal(rawResource, &node); err == nil {
err = api.registry.RegisterNode(&node)
}
case "device":
var device Device
if err = remarshal(rawResource, &device); err == nil {
err = api.registry.RegisterDevice(&device)
}
case "source":
var source Source
if err = remarshal(rawResource, &source); err == nil {
err = api.registry.RegisterSource(&source)
}
case "flow":
var flow Flow
if err = remarshal(rawResource, &flow); err == nil {
err = api.registry.RegisterFlow(&flow)
}
case "sender":
var sender Sender
if err = remarshal(rawResource, &sender); err == nil {
err = api.registry.RegisterSender(&sender)
}
case "receiver":
var receiver Receiver
if err = remarshal(rawResource, &receiver); err == nil {
err = api.registry.RegisterReceiver(&receiver)
}
default:
http.Error(w, "Bilinmeyen kaynak türü", http.StatusBadRequest)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(map[string]string{
"status": "registered",
"type": resourceType,
})
}
// Heartbeat, node sağlık kontrollerini işler
func (api *RegistrationAPI) Heartbeat(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
nodeID := vars["nodeId"]
if err := api.registry.Heartbeat(nodeID); err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{
"status": "healthy",
})
}
// DeleteResource, kaynak silme işlemini yönetir
func (api *RegistrationAPI) DeleteResource(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
resourceType := vars["resourceType"]
resourceID := vars["resourceId"]
if err := api.registry.DeleteResource(resourceType, resourceID); err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.WriteHeader(http.StatusNoContent)
}
// DeleteAllResources, bir node için tüm kaynakları kaldırır
func (api *RegistrationAPI) DeleteAllResources(w http.ResponseWriter, r *http.Request) {
// Bu, bir node graceful shutdown yaparken çağrılır
w.WriteHeader(http.StatusNoContent)
}
// Yardımcı fonksiyonlar
func inferResourceType(data map[string]interface{}) string {
// NMOS kaynakları farklı zorunlu alanlara sahiptir
if _, hasHostname := data["hostname"]; hasHostname {
return "node"
}
if _, hasManifest := data["manifest_href"]; hasManifest {
return "sender"
}
if caps, hasCaps := data["caps"]; hasCaps {
if capsMap, ok := caps.(map[string]interface{}); ok {
if _, hasMediaTypes := capsMap["media_types"]; hasMediaTypes {
return "receiver"
}
}
return "source"
}
if _, hasTransport := data["transport"]; hasTransport {
if _, hasFlowID := data["flow_id"]; hasFlowID {
return "sender"
}
return "receiver"
}
if _, hasSourceID := data["source_id"]; hasSourceID {
return "flow"
}
if _, hasNodeID := data["node_id"]; hasNodeID {
return "device"
}
return "unknown"
}
func remarshal(from interface{}, to interface{}) error {
data, err := json.Marshal(from)
if err != nil {
return err
}
return json.Unmarshal(data, to)
}
|
4.4.1 cURL ile Registration API’yi Test Etme
Registration API’yi test etmek için pratik örnekler:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# 1. Node Kaydet
curl -X POST http://localhost:3210/x-nmos/registration/v1.3/resource \
-H "Content-Type: application/json" \
-d '{
"id": "58f6b536-ca4c-43fd-880e-9df2af5d5d94",
"version": "1735545600:0",
"label": "Camera Encoder 01",
"description": "Production camera encoder",
"hostname": "encoder-01.local",
"api": ["v1.3"],
"caps": {},
"services": [],
"clocks": [{"name": "clk0", "ref_type": "ptp"}],
"interfaces": [{"name": "eth0"}],
"tags": {}
}'
# 2. Sender Kaydet
curl -X POST http://localhost:3210/x-nmos/registration/v1.3/resource \
-H "Content-Type: application/json" \
-d '{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"version": "1735545600:0",
"label": "Camera 1 Video",
"description": "1080p59.94 video sender",
"flow_id": "f1f2f3f4-f5f6-f7f8-f9f0-f1f2f3f4f5f6",
"transport": "urn:x-nmos:transport:rtp",
"device_id": "d1d2d3d4-d5d6-d7d8-d9d0-d1d2d3d4d5d6",
"manifest_href": "http://encoder-01.local:8080/sdp/camera1.sdp",
"interface_bindings": ["eth0"],
"tags": {}
}'
# 3. Heartbeat Gönder
NODE_ID="58f6b536-ca4c-43fd-880e-9df2af5d5d94"
curl -X POST http://localhost:3210/x-nmos/registration/v1.3/health/nodes/$NODE_ID
# 4. Kaynak Sil
curl -X DELETE http://localhost:3210/x-nmos/registration/v1.3/resource/senders/a1b2c3d4-e5f6-7890-abcd-ef1234567890
# Response Örnekleri:
# Başarılı (201 Created):
# {"status":"registered","type":"node"}
# Heartbeat Başarılı (200 OK):
# {"status":"healthy"}
|
4.5 Query API (HTTP Handler’lar)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
// registry/query.go
package registry
import (
"encoding/json"
"net/http"
"strings"
"github.com/gorilla/mux"
)
// QueryAPI, IS-04 Query API'sini yönetir
type QueryAPI struct {
registry *Registry
}
func NewQueryAPI(registry *Registry) *QueryAPI {
return &QueryAPI{registry: registry}
}
// SetupRoutes, Query API için HTTP route'larını yapılandırır
func (api *QueryAPI) SetupRoutes(router *mux.Router) {
// IS-04 v1.3 Query API route'ları
v13 := router.PathPrefix("/x-nmos/query/v1.3").Subrouter()
// Kaynak sorguları
v13.HandleFunc("/nodes", api.GetNodes).Methods("GET")
v13.HandleFunc("/devices", api.GetDevices).Methods("GET")
v13.HandleFunc("/sources", api.GetSources).Methods("GET")
v13.HandleFunc("/flows", api.GetFlows).Methods("GET")
v13.HandleFunc("/senders", api.GetSenders).Methods("GET")
v13.HandleFunc("/receivers", api.GetReceivers).Methods("GET")
// Tek kaynak sorgusu
v13.HandleFunc("/nodes/{nodeId}", api.GetNode).Methods("GET")
v13.HandleFunc("/senders/{senderId}", api.GetSender).Methods("GET")
v13.HandleFunc("/receivers/{receiverId}", api.GetReceiver).Methods("GET")
// WebSocket subscription
v13.HandleFunc("/subscriptions", api.CreateSubscription).Methods("POST")
v13.HandleFunc("/subscriptions/{subscriptionId}", api.DeleteSubscription).Methods("DELETE")
}
// GetNodes, tüm kayıtlı node'ları döndürür
func (api *QueryAPI) GetNodes(w http.ResponseWriter, r *http.Request) {
nodes := api.registry.GetNodes()
// Query parametrelerini uygula (label, description filtreleri)
nodes = api.filterNodes(nodes, r.URL.Query())
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(nodes)
}
// GetSenders, tüm kayıtlı sender'ları döndürür
func (api *QueryAPI) GetSenders(w http.ResponseWriter, r *http.Request) {
senders := api.registry.GetSenders()
// Filtreleri uygula
if transport := r.URL.Query().Get("transport"); transport != "" {
filtered := []*Sender{}
for _, s := range senders {
if strings.Contains(s.Transport, transport) {
filtered = append(filtered, s)
}
}
senders = filtered
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(senders)
}
// GetReceivers, tüm kayıtlı receiver'ları döndürür
func (api *QueryAPI) GetReceivers(w http.ResponseWriter, r *http.Request) {
receivers := api.registry.GetReceivers()
// Filtreleri uygula
if format := r.URL.Query().Get("format"); format != "" {
filtered := []*Receiver{}
for _, rcv := range receivers {
if strings.Contains(rcv.Format, format) {
filtered = append(filtered, rcv)
}
}
receivers = filtered
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(receivers)
}
// GetNode, ID'ye göre belirli bir node döndürür
func (api *QueryAPI) GetNode(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
nodeID := vars["nodeId"]
node := api.registry.GetNode(nodeID)
if node == nil {
http.Error(w, "Node bulunamadı", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(node)
}
// CreateSubscription, WebSocket subscription oluşturur
func (api *QueryAPI) CreateSubscription(w http.ResponseWriter, r *http.Request) {
var query SubscriptionQuery
if err := json.NewDecoder(r.Body).Decode(&query); err != nil {
http.Error(w, "Geçersiz subscription sorgusu", http.StatusBadRequest)
return
}
sub := api.registry.Subscribe(query)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(map[string]string{
"id": sub.ID,
"ws_href": fmt.Sprintf("ws://localhost:3211/x-nmos/query/v1.3/subscriptions/%s/ws", sub.ID),
"resource_path": fmt.Sprintf("/%s", query.ResourceType),
})
}
// DeleteSubscription, bir subscription'ı kaldırır
func (api *QueryAPI) DeleteSubscription(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
subID := vars["subscriptionId"]
api.registry.Unsubscribe(subID)
w.WriteHeader(http.StatusNoContent)
}
// Filtre yardımcıları
func (api *QueryAPI) filterNodes(nodes []*Node, params url.Values) []*Node {
if label := params.Get("label"); label != "" {
filtered := []*Node{}
for _, n := range nodes {
if strings.Contains(strings.ToLower(n.Label), strings.ToLower(label)) {
filtered = append(filtered, n)
}
}
return filtered
}
return nodes
}
|
4.6 Gerçek Zamanlı Güncellemeler için WebSocket Implementasyonu
NMOS’un güçlü özelliklerinden biri WebSocket üzerinden gerçek zamanlı bildirimlerdir. Haydi implement edelim:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
// registry/websocket.go
package registry
import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // Production'da origin'i doğrulayın
},
}
// WebSocketHandler, subscription'lar için WebSocket bağlantılarını yönetir
func (api *QueryAPI) WebSocketHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
subID := vars["subscriptionId"]
// Subscription'ı al
sub := api.registry.GetSubscription(subID)
if sub == nil {
http.Error(w, "Subscription bulunamadı", http.StatusNotFound)
return
}
// WebSocket'e yükselt
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket upgrade başarısız: %v", err)
return
}
defer conn.Close()
log.Printf("WebSocket client bağlandı: subscription %s", subID)
// İlk grain'i gönder (mevcut durum)
if err := api.sendInitialGrain(conn, sub); err != nil {
log.Printf("İlk grain gönderilemedi: %v", err)
return
}
// Gerçek zamanlı güncellemeleri yönet
done := make(chan struct{})
// Read pump (ping/pong ve client disconnect'i yönet)
go func() {
defer close(done)
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
if _, _, err := conn.ReadMessage(); err != nil {
log.Printf("WebSocket okuma hatası: %v", err)
return
}
}
}()
// Write pump (güncellemeleri client'a gönder)
ticker := time.NewTicker(30 * time.Second) // Ping aralığı
defer ticker.Stop()
for {
select {
case update := <-sub.Updates:
// Kaynak güncellemeyi client'a gönder
if err := api.sendUpdate(conn, update); err != nil {
log.Printf("Güncelleme gönderilemedi: %v", err)
return
}
case <-ticker.C:
// Ping gönder
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
case <-done:
return
}
}
}
// sendInitialGrain, kaynakların mevcut durumunu gönderir
func (api *QueryAPI) sendInitialGrain(conn *websocket.Conn, sub *Subscriber) error {
var resources []interface{}
// Subscription sorguya göre kaynakları al
switch sub.Query.ResourceType {
case "nodes":
for _, node := range api.registry.GetNodes() {
resources = append(resources, node)
}
case "senders":
for _, sender := range api.registry.GetSenders() {
resources = append(resources, sender)
}
case "receivers":
for _, receiver := range api.registry.GetReceivers() {
resources = append(resources, receiver)
}
default:
// Tüm kaynakları al
for _, node := range api.registry.GetNodes() {
resources = append(resources, node)
}
}
// Grain mesajı oluştur
grain := map[string]interface{}{
"grain_type": "event",
"source_id": "nmos-registry",
"flow_id": "query-api-events",
"origin_timestamp": time.Now().Format(time.RFC3339Nano),
"sync_timestamp": time.Now().Format(time.RFC3339Nano),
"creation_timestamp": time.Now().Format(time.RFC3339Nano),
"rate": map[string]int{
"numerator": 0,
"denominator": 1,
},
"duration": map[string]int{
"numerator": 0,
"denominator": 1,
},
"grain": map[string]interface{}{
"type": "urn:x-nmos:format:data.event",
"topic": fmt.Sprintf("/%s/", sub.Query.ResourceType),
"data": resources,
},
}
return conn.WriteJSON(grain)
}
// sendUpdate, kaynak değişikliği bildirimi gönderir
func (api *QueryAPI) sendUpdate(conn *websocket.Conn, update ResourceUpdate) error {
// Event grain oluştur
var eventType string
switch update.Action {
case "create":
eventType = "resource_added"
case "update":
eventType = "resource_modified"
case "delete":
eventType = "resource_removed"
}
grain := map[string]interface{}{
"grain_type": "event",
"source_id": "nmos-registry",
"flow_id": "query-api-events",
"origin_timestamp": time.Now().Format(time.RFC3339Nano),
"sync_timestamp": time.Now().Format(time.RFC3339Nano),
"creation_timestamp": time.Now().Format(time.RFC3339Nano),
"rate": map[string]int{
"numerator": 0,
"denominator": 1,
},
"duration": map[string]int{
"numerator": 0,
"denominator": 1,
},
"grain": map[string]interface{}{
"type": "urn:x-nmos:format:data.event",
"topic": fmt.Sprintf("/%s/", update.Type),
"data": []map[string]interface{}{
{
"path": fmt.Sprintf("/%s", update.Type),
"pre": nil, // Önceki durum (update/delete ise)
"post": update.Resource, // Yeni durum (create/update ise)
},
},
},
}
return conn.WriteJSON(grain)
}
// QueryAPI.SetupRoutes'a WebSocket endpoint'i ekleyin
// Bu route'u ekleyin:
// v13.HandleFunc("/subscriptions/{subscriptionId}/ws", api.WebSocketHandler).Methods("GET")
|
wscat ile WebSocket’i Test Etme:
1
2
3
4
5
6
7
8
9
10
11
12
|
# wscat yükle
npm install -g wscat
# Önce subscription oluştur
SUB_ID=$(curl -X POST http://localhost:3211/x-nmos/query/v1.3/subscriptions \
-H "Content-Type: application/json" \
-d '{"resource_type":"senders"}' | jq -r '.id')
# WebSocket'e bağlan
wscat -c "ws://localhost:3211/x-nmos/query/v1.3/subscriptions/$SUB_ID/ws"
# Sender eklendiğinde/kaldırıldığında/değiştirildiğinde gerçek zamanlı güncellemeler alacaksınız
|
4.6.1 cURL ile Query API’yi Test Etme
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
# 1. Tüm node'ları al
curl http://localhost:3211/x-nmos/query/v1.3/nodes | jq
# 2. Tüm sender'ları al
curl http://localhost:3211/x-nmos/query/v1.3/senders | jq
# 3. Belirli sender'ı al
SENDER_ID="a1b2c3d4-e5f6-7890-abcd-ef1234567890"
curl http://localhost:3211/x-nmos/query/v1.3/senders/$SENDER_ID | jq
# 4. Filtrelerle sorgula
# Sadece video sender'ları al
curl "http://localhost:3211/x-nmos/query/v1.3/senders?format=urn:x-nmos:format:video" | jq
# Label'a göre node'ları al
curl "http://localhost:3211/x-nmos/query/v1.3/nodes?label=camera" | jq
# 5. WebSocket subscription oluştur
curl -X POST http://localhost:3211/x-nmos/query/v1.3/subscriptions \
-H "Content-Type: application/json" \
-d '{
"resource_type": "senders",
"params": {}
}' | jq
# Response:
# {
# "id": "sub-12345",
# "ws_href": "ws://localhost:3211/x-nmos/query/v1.3/subscriptions/sub-12345/ws",
# "resource_path": "/senders"
# }
# 6. Pagination (büyük registry'ler için)
curl "http://localhost:3211/x-nmos/query/v1.3/senders?paging.limit=10&paging.offset=0" | jq
|
4.7 Ana Sunucu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
// main.go
package main
import (
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
"yourproject/registry"
)
func main() {
// Registry oluştur
reg := registry.NewRegistry()
// API'leri kur
registrationAPI := registry.NewRegistrationAPI(reg)
queryAPI := registry.NewQueryAPI(reg)
// Router oluştur
router := mux.NewRouter()
// Route'ları kur
registrationAPI.SetupRoutes(router)
queryAPI.SetupRoutes(router)
// Health check endpoint
router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "OK")
}).Methods("GET")
// Registration API'yi port 3210'da başlat
go func() {
regRouter := mux.NewRouter()
registrationAPI.SetupRoutes(regRouter)
log.Println("Registration API :3210'da dinliyor")
log.Fatal(http.ListenAndServe(":3210", regRouter))
}()
// Query API'yi port 3211'de başlat
log.Println("Query API :3211'de dinliyor")
log.Fatal(http.ListenAndServe(":3211", router))
}
|
5. Go ile NMOS Node Client Oluşturma
Şimdi registry’mize video sender kaydeden bir client oluşturalım.
5.1 NMOS Node Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
|
// client/node.go
package client
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
"yourproject/registry"
)
// NMOSNode, bir NMOS uyumlu cihazı temsil eder
type NMOSNode struct {
RegistryURL string
Node *registry.Node
Devices []*registry.Device
Senders []*registry.Sender
Receivers []*registry.Receiver
httpClient *http.Client
stopChan chan struct{}
}
// NewNMOSNode, yeni bir NMOS node client oluşturur
func NewNMOSNode(registryURL string, hostname string) *NMOSNode {
nodeID := generateUUID()
node := ®istry.Node{
Resource: registry.Resource{
ID: nodeID,
Version: registry.CreateVersion(),
Label: fmt.Sprintf("Video Encoder %s", hostname),
Description: "NMOS-etkin video encoder",
Tags: map[string]string{},
},
Hostname: hostname,
APIVersions: []string{"v1.3"},
Caps: map[string]interface{}{},
Services: []registry.Service{
{
Href: fmt.Sprintf("http://%s:8080/", hostname),
Type: "urn:x-manufacturer:service:encoder",
},
},
Clocks: []registry.Clock{
{
Name: "clk0",
RefType: "ptp",
},
},
Interfaces: []registry.Interface{
{
Name: "eth0",
},
},
}
return &NMOSNode{
RegistryURL: registryURL,
Node: node,
Devices: []*registry.Device{},
Senders: []*registry.Sender{},
Receivers: []*registry.Receiver{},
httpClient: &http.Client{Timeout: 5 * time.Second},
stopChan: make(chan struct{}),
}
}
// Start, node'u kaydeder ve heartbeat'i başlatır
func (n *NMOSNode) Start() error {
// Node'u kaydet
if err := n.registerResource(n.Node, "node"); err != nil {
return fmt.Errorf("node kaydedilemedi: %w", err)
}
// Device'ları kaydet
for _, device := range n.Devices {
if err := n.registerResource(device, "device"); err != nil {
return fmt.Errorf("device kaydedilemedi: %w", err)
}
}
// Sender'ları kaydet
for _, sender := range n.Senders {
if err := n.registerResource(sender, "sender"); err != nil {
return fmt.Errorf("sender kaydedilemedi: %w", err)
}
}
// Receiver'ları kaydet
for _, receiver := range n.Receivers {
if err := n.registerResource(receiver, "receiver"); err != nil {
return fmt.Errorf("receiver kaydedilemedi: %w", err)
}
}
// Heartbeat goroutine'ini başlat (IS-04 spec'e göre her 5 saniyede)
go n.heartbeatLoop()
fmt.Println("NMOS Node başarıyla kaydedildi")
return nil
}
// Stop, node'u graceful olarak kapatır
func (n *NMOSNode) Stop() error {
close(n.stopChan)
// Tüm kaynakların kaydını kaldır
// Production'da registry'ye DELETE istekleri gönder
return nil
}
// AddVideoSender, bu node'a video sender ekler
func (n *NMOSNode) AddVideoSender(label string, sdpURL string) error {
deviceID := generateUUID()
sourceID := generateUUID()
flowID := generateUUID()
senderID := generateUUID()
// Device oluştur
device := ®istry.Device{
Resource: registry.Resource{
ID: deviceID,
Version: registry.CreateVersion(),
Label: label,
Description: "Video encoder device",
},
Type: "urn:x-nmos:device:pipeline",
NodeID: n.Node.ID,
Senders: []string{senderID},
Receivers: []string{},
}
// Source oluştur
source := ®istry.Source{
Resource: registry.Resource{
ID: sourceID,
Version: registry.CreateVersion(),
Label: fmt.Sprintf("%s Source", label),
},
Format: "urn:x-nmos:format:video",
DeviceID: deviceID,
ClockName: "clk0",
GrainRate: ®istry.Rational{Numerator: 25, Denominator: 1}, // 25fps
}
// Flow oluştur
flow := ®istry.Flow{
Resource: registry.Resource{
ID: flowID,
Version: registry.CreateVersion(),
Label: fmt.Sprintf("%s Flow", label),
},
Format: "urn:x-nmos:format:video",
SourceID: sourceID,
DeviceID: deviceID,
GrainRate: ®istry.Rational{Numerator: 25, Denominator: 1},
FrameWidth: 1920,
FrameHeight: 1080,
Colorspace: "BT709",
Interlace: "progressive",
MediaType: "video/raw",
}
// Sender oluştur
sender := ®istry.Sender{
Resource: registry.Resource{
ID: senderID,
Version: registry.CreateVersion(),
Label: fmt.Sprintf("%s Sender", label),
},
FlowID: flowID,
Transport: "urn:x-nmos:transport:rtp",
DeviceID: deviceID,
ManifestHref: sdpURL,
InterfaceBindings: []string{"eth0"},
}
// Node'a ekle
n.Devices = append(n.Devices, device)
n.Senders = append(n.Senders, sender)
// Node zaten başlatıldıysa, hemen kaydet
if n.isRunning() {
n.registerResource(device, "device")
n.registerResource(source, "source")
n.registerResource(flow, "flow")
n.registerResource(sender, "sender")
}
return nil
}
// registerResource, registry'ye POST isteği gönderir
func (n *NMOSNode) registerResource(resource interface{}, resourceType string) error {
url := fmt.Sprintf("%s/x-nmos/registration/v1.3/resource", n.RegistryURL)
data, err := json.Marshal(resource)
if err != nil {
return err
}
resp, err := n.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
return fmt.Errorf("kayıt başarısız, status %d", resp.StatusCode)
}
fmt.Printf("%s kaydedildi: %v\n", resourceType, resource)
return nil
}
// heartbeatLoop, periyodik heartbeat'ler gönderir
func (n *NMOSNode) heartbeatLoop() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := n.sendHeartbeat(); err != nil {
fmt.Printf("Heartbeat başarısız: %v\n", err)
}
case <-n.stopChan:
return
}
}
}
// sendHeartbeat, /health/nodes/{nodeId}'ye POST gönderir
func (n *NMOSNode) sendHeartbeat() error {
url := fmt.Sprintf("%s/x-nmos/registration/v1.3/health/nodes/%s",
n.RegistryURL, n.Node.ID)
resp, err := n.httpClient.Post(url, "application/json", nil)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("heartbeat başarısız, status %d", resp.StatusCode)
}
return nil
}
func (n *NMOSNode) isRunning() bool {
select {
case <-n.stopChan:
return false
default:
return true
}
}
func generateUUID() string {
// Production'da github.com/google/uuid kullanın
return fmt.Sprintf("%d", time.Now().UnixNano())
}
|
5.2 Örnek: Video Encoder Node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
// examples/encoder_node.go
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"yourproject/client"
)
func main() {
// NMOS node oluştur
node := client.NewNMOSNode(
"http://localhost:3210", // Registry URL
"encoder-01.local", // Hostname
)
// Video sender ekle (örn. kamera girişi)
err := node.AddVideoSender(
"Camera 1 HD",
"http://encoder-01.local:8080/sdp/camera1.sdp",
)
if err != nil {
log.Fatalf("Sender eklenemedi: %v", err)
}
// Node'u başlat (registry'ye kaydet)
if err := node.Start(); err != nil {
log.Fatalf("Node başlatılamadı: %v", err)
}
fmt.Println("NMOS Node çalışıyor. Durdurmak için Ctrl+C basın.")
// Interrupt bekle
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
// Graceful shutdown
fmt.Println("\nKapatılıyor...")
node.Stop()
}
|
⚠️ GÜVENLİK NOTU:
Bu örneklerdeki API çağrıları authentication ve TLS kullanmamaktadır.
Production’da:
- ✅ HTTPS kullanın (HTTP yerine)
- ✅ Authorization header ekleyin (
Authorization: Bearer <token>)
- ✅ Registry’den alınan API endpoint’leri doğrulayın
- ✅ Rate limiting ve API throttling uygulayın
Güvenli implementasyon için Section 13 (IS-10) bölümüne bakın.
6. IS-05: Bağlantı Yönetimi
IS-05, sender’lar ve receiver’lar arasında bağlantı kurulmasını tanımlar.
6.1 IS-05 Genel Bakış
IS-05, şunlar için bir REST API sağlar:
- Aktif bağlantılar: Şu anda neyin bağlı olduğu
- Staged bağlantılar: Sırada neyin bağlanacağı
- Bağlantı aktivasyonu: Staged bağlantıları uygulama
6.2 IS-05 Bağlantı Akışı
6.3 IS-05 için SDP Parsing
IS-05’i implement etmeden önce, bağlantı parametrelerini çıkarmak için SDP dosyalarını parse etmemiz gerekiyor:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
// connection/sdp.go
package connection
import (
"bufio"
"fmt"
"strconv"
"strings"
)
// SDPInfo, parse edilmiş SDP bilgisini tutar
type SDPInfo struct {
// Video/Audio parametreleri
MediaType string // video, audio
Encoding string // raw, H264, L24, vb.
// RTP parametreleri
MulticastIP string
Port int
TTL int
PayloadType int
// ST 2110'a özel
PacketTime float64 // ptime milisaniye cinsinden
SamplingRate int // Ses için
Channels int // Ses için
Width int // Video için
Height int // Video için
Framerate string // örn., "25/1"
}
// ParseSDP bir SDP dosyasını parse eder
func ParseSDP(sdpContent string) (*SDPInfo, error) {
info := &SDPInfo{}
scanner := bufio.NewScanner(strings.NewReader(sdpContent))
for scanner.Scan() {
line := scanner.Text()
switch {
case strings.HasPrefix(line, "m="):
// Media satırı: m=video 5004 RTP/AVP 96
parts := strings.Fields(line)
if len(parts) >= 4 {
info.MediaType = parts[0][2:] // "m=" kısmını kaldır
if port, err := strconv.Atoi(parts[1]); err == nil {
info.Port = port
}
if pt, err := strconv.Atoi(parts[3]); err == nil {
info.PayloadType = pt
}
}
case strings.HasPrefix(line, "c="):
// Connection satırı: c=IN IP4 239.0.0.1/32
parts := strings.Fields(line)
if len(parts) >= 3 {
addrParts := strings.Split(parts[2], "/")
info.MulticastIP = addrParts[0]
if len(addrParts) > 1 {
if ttl, err := strconv.Atoi(addrParts[1]); err == nil {
info.TTL = ttl
}
}
}
case strings.HasPrefix(line, "a=rtpmap:"):
// RTP map: a=rtpmap:96 raw/90000
parts := strings.Fields(line)
if len(parts) >= 2 {
encodingInfo := strings.Split(parts[1], "/")
if len(encodingInfo) >= 2 {
info.Encoding = encodingInfo[0]
if rate, err := strconv.Atoi(encodingInfo[1]); err == nil {
info.SamplingRate = rate
}
if len(encodingInfo) >= 3 {
if ch, err := strconv.Atoi(encodingInfo[2]); err == nil {
info.Channels = ch
}
}
}
}
case strings.HasPrefix(line, "a=fmtp:"):
// Format parametreleri: a=fmtp:96 sampling=YCbCr-4:2:2; width=1920; height=1080
if strings.Contains(line, "width=") {
info.Width = extractIntParam(line, "width=")
}
if strings.Contains(line, "height=") {
info.Height = extractIntParam(line, "height=")
}
if strings.Contains(line, "exactframerate=") {
info.Framerate = extractStringParam(line, "exactframerate=")
}
case strings.HasPrefix(line, "a=ptime:"):
// Packet time: a=ptime:1.000
parts := strings.Fields(line)
if len(parts) >= 1 {
ptimeStr := strings.TrimPrefix(parts[0], "a=ptime:")
if pt, err := strconv.ParseFloat(ptimeStr, 64); err == nil {
info.PacketTime = pt
}
}
}
}
if err := scanner.Err(); err != nil {
return nil, err
}
return info, nil
}
func extractIntParam(line, key string) int {
start := strings.Index(line, key)
if start == -1 {
return 0
}
start += len(key)
end := strings.IndexAny(line[start:], "; \t")
if end == -1 {
end = len(line) - start
}
valueStr := line[start : start+end]
value, _ := strconv.Atoi(valueStr)
return value
}
func extractStringParam(line, key string) string {
start := strings.Index(line, key)
if start == -1 {
return ""
}
start += len(key)
end := strings.IndexAny(line[start:], "; \t")
if end == -1 {
return line[start:]
}
return line[start : start+end]
}
// FetchAndParseSDP URL'den SDP'yi alır ve parse eder
func FetchAndParseSDP(sdpURL string) (*SDPInfo, error) {
resp, err := http.Get(sdpURL)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return ParseSDP(string(body))
}
|
Örnek SDP Dosyası (ST 2110-20 Video):
1
2
3
4
5
6
7
8
9
10
11
|
v=0
o=- 1234567890 1234567890 IN IP4 192.168.1.100
s=ST 2110-20 Video Stream
t=0 0
m=video 5004 RTP/AVP 96
c=IN IP4 239.100.0.1/32
a=source-filter: incl IN IP4 239.100.0.1 192.168.1.100
a=rtpmap:96 raw/90000
a=fmtp:96 sampling=YCbCr-4:2:2; width=1920; height=1080; exactframerate=25; depth=10; TCS=SDR; colorimetry=BT709; PM=2110GPM; SSN=ST2110-20:2017; TP=2110TPN
a=mediaclk:direct=0
a=ts-refclk:ptp=IEEE1588-2008:00-00-00-00-00-00-00-00:0
|
6.4 IS-05 Implementasyonu (Receiver Tarafı)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
|
// connection/is05_receiver.go
package connection
import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/gorilla/mux"
)
// ConnectionAPI, IS-05 Bağlantı Yönetimini implement eder
type ConnectionAPI struct {
mu sync.RWMutex
receiverID string
activeParams TransportParams
stagedParams TransportParams
activationState ActivationState
}
// TransportParams, SDP ve transport bilgisini tutar
type TransportParams struct {
SenderID string `json:"sender_id"`
ManifestHref string `json:"manifest_href"` // SDP URL
MasterEnable bool `json:"master_enable"`
TransportFile map[string]interface{} `json:"transport_file"` // Parse edilmiş SDP
}
// ActivationState, aktivasyon zamanlamasını tutar
type ActivationState struct {
Mode string `json:"mode"` // activate_immediate, activate_scheduled_absolute
RequestedTime string `json:"requested_time,omitempty"`
ActivationTime string `json:"activation_time,omitempty"`
}
func NewConnectionAPI(receiverID string) *ConnectionAPI {
return &ConnectionAPI{
receiverID: receiverID,
activeParams: TransportParams{
MasterEnable: false,
},
stagedParams: TransportParams{
MasterEnable: false,
},
activationState: ActivationState{
Mode: "activate_immediate",
},
}
}
// SetupRoutes, receiver için IS-05 route'larını yapılandırır
func (api *ConnectionAPI) SetupRoutes(router *mux.Router, receiverID string) {
base := fmt.Sprintf("/x-nmos/connection/v1.1/single/receivers/%s", receiverID)
// Aktif bağlantı
router.HandleFunc(base+"/active", api.GetActive).Methods("GET")
// Staged bağlantı
router.HandleFunc(base+"/staged", api.GetStaged).Methods("GET")
router.HandleFunc(base+"/staged", api.PatchStaged).Methods("PATCH")
// Constraints (yetenekler)
router.HandleFunc(base+"/constraints", api.GetConstraints).Methods("GET")
}
// GetActive, mevcut aktif bağlantıyı döndürür
func (api *ConnectionAPI) GetActive(w http.ResponseWriter, r *http.Request) {
api.mu.RLock()
defer api.mu.RUnlock()
response := map[string]interface{}{
"sender_id": api.activeParams.SenderID,
"master_enable": api.activeParams.MasterEnable,
"activation": api.activationState,
"transport_file": api.activeParams.TransportFile,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// GetStaged, staged bağlantı parametrelerini döndürür
func (api *ConnectionAPI) GetStaged(w http.ResponseWriter, r *http.Request) {
api.mu.RLock()
defer api.mu.RUnlock()
response := map[string]interface{}{
"sender_id": api.stagedParams.SenderID,
"master_enable": api.stagedParams.MasterEnable,
"activation": api.activationState,
"transport_file": api.stagedParams.TransportFile,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// PatchStaged, staged parametreleri günceller ve opsiyonel olarak aktive eder
func (api *ConnectionAPI) PatchStaged(w http.ResponseWriter, r *http.Request) {
var patch map[string]interface{}
if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
http.Error(w, "Geçersiz JSON", http.StatusBadRequest)
return
}
api.mu.Lock()
defer api.mu.Unlock()
// Staged parametreleri güncelle
if senderID, ok := patch["sender_id"].(string); ok {
api.stagedParams.SenderID = senderID
// Sender'dan SDP al
if senderID != "" {
// Sender'ın SDP dosyasını al
sender := getSenderFromRegistry(senderID) // NMOS registry'yi sorgula
if sender != nil {
api.stagedParams.ManifestHref = sender.ManifestHref
// Bağlantı parametrelerini almak için SDP'yi parse et
sdpInfo, err := FetchAndParseSDP(sender.ManifestHref)
if err == nil {
// Parse edilmiş SDP'yi daha sonra kullanmak üzere sakla
api.stagedParams.TransportFile = map[string]interface{}{
"multicast_ip": sdpInfo.MulticastIP,
"port": sdpInfo.Port,
"rtp_enabled": true,
}
}
}
}
}
if masterEnable, ok := patch["master_enable"].(bool); ok {
api.stagedParams.MasterEnable = masterEnable
}
// Aktivasyonu yönet
if activation, ok := patch["activation"].(map[string]interface{}); ok {
mode := activation["mode"].(string)
api.activationState.Mode = mode
switch mode {
case "activate_immediate":
// Şimdi aktive et
api.activateConnection()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "activated",
})
return
case "activate_scheduled_absolute":
requestedTime := activation["requested_time"].(string)
api.activationState.RequestedTime = requestedTime
// Aktivasyonu zamanla
go api.scheduleActivation(requestedTime)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "scheduled",
})
return
}
}
// Sadece staged, aktive edilmedi
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "staged",
})
}
// GetConstraints, receiver yeteneklerini döndürür
func (api *ConnectionAPI) GetConstraints(w http.ResponseWriter, r *http.Request) {
constraints := []map[string]interface{}{
{
"parameter": "sender_id",
"type": "string",
},
{
"parameter": "master_enable",
"type": "boolean",
},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(constraints)
}
// activateConnection, staged parametreleri aktif hale getirir
func (api *ConnectionAPI) activateConnection() {
api.activeParams = api.stagedParams
api.activationState.ActivationTime = time.Now().Format(time.RFC3339Nano)
fmt.Printf("Bağlantı aktive edildi: Sender %s -> Receiver %s\n",
api.activeParams.SenderID, api.receiverID)
// Production'da: gerçekten RTP stream almaya başla
if api.activeParams.MasterEnable {
// GStreamer pipeline, FFmpeg veya başka medya receiver başlat
api.startReceiving()
} else {
api.stopReceiving()
}
}
// scheduleActivation, gelecekte bir zamanda aktive eder
func (api *ConnectionAPI) scheduleActivation(timeStr string) {
activationTime, err := time.Parse(time.RFC3339Nano, timeStr)
if err != nil {
fmt.Printf("Geçersiz aktivasyon zamanı: %v\n", err)
return
}
// Aktivasyon zamanına kadar bekle
time.Sleep(time.Until(activationTime))
api.mu.Lock()
api.activateConnection()
api.mu.Unlock()
}
// startReceiving, gerçek medya pipeline'ını başlatır
func (api *ConnectionAPI) startReceiving() {
fmt.Println("Medya receiver başlatılıyor...")
// Örnek: exec.Command("gst-launch-1.0", "udpsrc", "port=5004", "...")
}
func (api *ConnectionAPI) stopReceiving() {
fmt.Println("Medya receiver durduruluyor...")
}
func getSenderFromRegistry(senderID string) *registry.Sender {
// Sender detayları için NMOS registry'yi sorgula
// Production'da: registry'ye HTTP GET
return nil
}
|
6.5 IS-05 Receiver cURL Örnekleri
1. Mevcut aktif bağlantıyı göster:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
curl http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/active | jq
# Response:
# {
# "sender_id": "sender-123",
# "master_enable": true,
# "activation": {
# "mode": "activate_immediate",
# "activation_time": "2025-12-30T10:15:30.123Z"
# },
# "transport_file": {
# "multicast_ip": "239.100.0.1",
# "port": 5004,
# "rtp_enabled": true
# }
# }
|
2. Staged parametreleri göster:
1
|
curl http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/staged | jq
|
3. Sender’a bağlan (immediate activation):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
curl -X PATCH http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/staged \
-H "Content-Type: application/json" \
-d '{
"sender_id": "sender-789",
"master_enable": true,
"activation": {
"mode": "activate_immediate"
}
}' | jq
# Response:
# {
# "status": "activated"
# }
|
4. Zamanlanmış aktivasyon:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
curl -X PATCH http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/staged \
-H "Content-Type: application/json" \
-d '{
"sender_id": "sender-789",
"master_enable": true,
"activation": {
"mode": "activate_scheduled_absolute",
"requested_time": "2025-12-30T15:00:00.000Z"
}
}' | jq
# Response:
# {
# "status": "scheduled"
# }
|
5. Bağlantıyı kes:
1
2
3
4
5
6
7
8
9
|
curl -X PATCH http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/staged \
-H "Content-Type: application/json" \
-d '{
"sender_id": null,
"master_enable": false,
"activation": {
"mode": "activate_immediate"
}
}' | jq
|
6. Receiver yeteneklerini kontrol et:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
curl http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/constraints | jq
# Response:
# [
# {
# "parameter": "sender_id",
# "type": "string"
# },
# {
# "parameter": "master_enable",
# "type": "boolean"
# }
# ]
|
6.4.1 cURL ile IS-05 Test Etme
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
# 1. Receiver'ın mevcut aktif bağlantısını kontrol et
RECEIVER_ID="a8b4c2d1-5f6e-4a3b-9c8d-1e2f3a4b5c6d"
curl http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/active | jq
# Response:
# {
# "sender_id": null,
# "master_enable": false,
# "activation": {...},
# "transport_file": {}
# }
# 2. Staged parametreleri kontrol et
curl http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/staged | jq
# 3. Bir bağlantıyı stage et (anında aktivasyon)
SENDER_ID="a1b2c3d4-e5f6-7890-abcd-ef1234567890"
curl -X PATCH http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/staged \
-H "Content-Type: application/json" \
-d '{
"sender_id": "'$SENDER_ID'",
"master_enable": true,
"activation": {
"mode": "activate_immediate"
}
}' | jq
# Response (200 OK):
# {"status": "activated"}
# 4. Bir bağlantıyı stage et (zamanlanmış aktivasyon)
ACTIVATION_TIME=$(date -u -v+10S +"%Y-%m-%dT%H:%M:%S.%NZ") # 10 saniye sonra
curl -X PATCH http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/staged \
-H "Content-Type: application/json" \
-d '{
"sender_id": "'$SENDER_ID'",
"master_enable": true,
"activation": {
"mode": "activate_scheduled_absolute",
"requested_time": "'$ACTIVATION_TIME'"
}
}' | jq
# Response (202 Accepted):
# {"status": "scheduled"}
# 5. Bağlantıyı kes (sender_id null yap)
curl -X PATCH http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/staged \
-H "Content-Type: application/json" \
-d '{
"sender_id": null,
"master_enable": false,
"activation": {
"mode": "activate_immediate"
}
}' | jq
# 6. Receiver constraints kontrol et
curl http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/constraints | jq
|
6.5 IS-05 Controller Örneği
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
// examples/is05_controller.go
package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
)
func main() {
// Camera 1 Sender'ı Monitor Receiver'a bağla
senderID := "58f6b536-ca4c-43fd-880e-9df2af5d5d94"
receiverID := "a8b4c2d1-5f6e-4a3b-9c8d-1e2f3a4b5c6d"
// Bağlantıyı stage et
stageRequest := map[string]interface{}{
"sender_id": senderID,
"master_enable": true,
"activation": map[string]interface{}{
"mode": "activate_immediate",
},
}
data, _ := json.Marshal(stageRequest)
url := fmt.Sprintf("http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/%s/staged", receiverID)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(data))
if err != nil {
log.Fatalf("Bağlantı başarısız: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
fmt.Println("Bağlantı başarıyla aktive edildi!")
} else {
fmt.Printf("Bağlantı başarısız status: %d\n", resp.StatusCode)
}
}
|
⚠️ GÜVENLİK NOTU:
Bu örneklerde IS-10 (Authorization) ve BCP-003 (Secure Communication) uygulanmamıştır.
Production ortamlarında mutlaka:
- ✅ TLS/HTTPS kullanın (BCP-003)
- ✅ OAuth2/JWT token authentication ekleyin (IS-10)
- ✅ mTLS (mutual TLS) için client sertifikaları yapılandırın
- ✅ HTTPS-only registry ve node endpoint’leri
Örnek güvenli bağlantı:
1
2
3
|
curl -H "Authorization: Bearer $TOKEN" \
--cacert /path/to/ca.crt \
https://registry.studio.local:443/x-nmos/registration/v1.3/resource
|
Detaylar için makalenin Section 13 (IS-10 Authentication) bölümüne bakın.
7. IS-08: Channel Mapping (Ses Yönlendirmesi)
IS-08, cihazlar içinde esnek ses kanalı yönlendirmesine izin verir.
7.1 IS-08 Kullanım Senaryosu
Şu özelliklere sahip bir cihaz düşünün:
- Giriş: 16 kanallı ses (8 stereo çift)
- Çıkış: 8 kanallı ses (4 stereo çift)
IS-08, hangi giriş kanallarının hangi çıkış kanallarına gideceğini haritalandırmanıza izin verir.
7.2 IS-08 Implementasyonu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
// channel/is08.go
package channel
import (
"encoding/json"
"fmt"
"net/http"
"sync"
"github.com/gorilla/mux"
)
// ChannelMappingAPI, IS-08'i implement eder
type ChannelMappingAPI struct {
mu sync.RWMutex
deviceID string
inputs []IOChannel
outputs []IOChannel
activeMap map[string][]ChannelMapping // output_id -> input mappings
stagedMap map[string][]ChannelMapping
}
type IOChannel struct {
Name string `json:"name"`
Description string `json:"description"`
ChannelID string `json:"channel_id"`
}
type ChannelMapping struct {
InputChannelID string `json:"input"`
OutputChannelID string `json:"output"`
GainDB float64 `json:"gain_db"`
}
func NewChannelMappingAPI(deviceID string) *ChannelMappingAPI {
api := &ChannelMappingAPI{
deviceID: deviceID,
inputs: make([]IOChannel, 0),
outputs: make([]IOChannel, 0),
activeMap: make(map[string][]ChannelMapping),
stagedMap: make(map[string][]ChannelMapping),
}
// Örnek: 16 giriş kanalı, 8 çıkış kanalı
for i := 0; i < 16; i++ {
api.inputs = append(api.inputs, IOChannel{
Name: fmt.Sprintf("Input %d", i+1),
ChannelID: fmt.Sprintf("in%d", i),
})
}
for i := 0; i < 8; i++ {
api.outputs = append(api.outputs, IOChannel{
Name: fmt.Sprintf("Output %d", i+1),
ChannelID: fmt.Sprintf("out%d", i),
})
}
return api
}
func (api *ChannelMappingAPI) SetupRoutes(router *mux.Router, deviceID string) {
base := fmt.Sprintf("/x-nmos/channelmapping/v1.0/map/%s", deviceID)
router.HandleFunc(base+"/inputs", api.GetInputs).Methods("GET")
router.HandleFunc(base+"/outputs", api.GetOutputs).Methods("GET")
router.HandleFunc(base+"/active", api.GetActiveMap).Methods("GET")
router.HandleFunc(base+"/staged", api.GetStagedMap).Methods("GET")
router.HandleFunc(base+"/staged", api.PatchStagedMap).Methods("PATCH")
}
func (api *ChannelMappingAPI) GetInputs(w http.ResponseWriter, r *http.Request) {
api.mu.RLock()
defer api.mu.RUnlock()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(api.inputs)
}
func (api *ChannelMappingAPI) GetOutputs(w http.ResponseWriter, r *http.Request) {
api.mu.RLock()
defer api.mu.RUnlock()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(api.outputs)
}
func (api *ChannelMappingAPI) GetActiveMap(w http.ResponseWriter, r *http.Request) {
api.mu.RLock()
defer api.mu.RUnlock()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(api.activeMap)
}
func (api *ChannelMappingAPI) PatchStagedMap(w http.ResponseWriter, r *http.Request) {
var patch map[string][]ChannelMapping
if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
http.Error(w, "Geçersiz JSON", http.StatusBadRequest)
return
}
api.mu.Lock()
defer api.mu.Unlock()
// Staged mapping'leri güncelle
for outputID, mappings := range patch {
api.stagedMap[outputID] = mappings
}
// Anında aktive et (veya IS-05 gibi aktivasyonu yönet)
api.activeMap = api.stagedMap
fmt.Println("Kanal haritalama güncellendi")
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{
"status": "activated",
})
}
|
8. IS-09: System Parameters (Global Zamanlama)
IS-09, PTP zamanlama gibi global sistem parametrelerini yönetir.
8.1 IS-09 Genel Bakış
IS-09 şunları sağlar:
- Global PTP domain konfigürasyonu
- Sistem çapında zamanlama parametreleri
- Senkronizasyon ayarları
8.2 Basit IS-09 Implementasyonu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
// system/is09.go
package system
import (
"encoding/json"
"net/http"
"sync"
"github.com/gorilla/mux"
)
type SystemAPI struct {
mu sync.RWMutex
globalParams GlobalParams
}
type GlobalParams struct {
PTPDomain int `json:"ptp_domain"`
PTPProfile string `json:"ptp_profile"` // AES67, SMPTE 2059-2
}
func NewSystemAPI() *SystemAPI {
return &SystemAPI{
globalParams: GlobalParams{
PTPDomain: 127,
PTPProfile: "SMPTE 2059-2",
},
}
}
func (api *SystemAPI) SetupRoutes(router *mux.Router) {
base := "/x-nmos/system/v1.0"
router.HandleFunc(base+"/global", api.GetGlobal).Methods("GET")
router.HandleFunc(base+"/global", api.PatchGlobal).Methods("PATCH")
}
func (api *SystemAPI) GetGlobal(w http.ResponseWriter, r *http.Request) {
api.mu.RLock()
defer api.mu.RUnlock()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(api.globalParams)
}
func (api *SystemAPI) PatchGlobal(w http.ResponseWriter, r *http.Request) {
var patch GlobalParams
if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
http.Error(w, "Geçersiz JSON", http.StatusBadRequest)
return
}
api.mu.Lock()
defer api.mu.Unlock()
if patch.PTPDomain != 0 {
api.globalParams.PTPDomain = patch.PTPDomain
}
if patch.PTPProfile != "" {
api.globalParams.PTPProfile = patch.PTPProfile
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(api.globalParams)
}
|
9. Tam Entegrasyon Örneği
Hepsini bir araya getirelim ve tam bir workflow oluşturalım.
9.1 Tam Broadcast Workflow’u
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
// examples/full_workflow.go
package main
import (
"fmt"
"log"
"time"
"yourproject/client"
)
func main() {
fmt.Println("=== NMOS Broadcast Workflow Demo ===\n")
// 1. NMOS Registry'yi başlat (zaten :3210/:3211'de çalışıyor)
fmt.Println("✓ NMOS Registry çalışıyor")
// 2. Camera Node'u kaydet (Sender)
cameraNode := client.NewNMOSNode(
"http://localhost:3210",
"camera-01.studio.local",
)
cameraNode.AddVideoSender(
"Studio Camera 1",
"http://camera-01:8080/sdp/video.sdp",
)
if err := cameraNode.Start(); err != nil {
log.Fatal(err)
}
fmt.Println("✓ Camera Node kaydedildi")
time.Sleep(2 * time.Second)
// 3. Monitor Node'u kaydet (Receiver)
monitorNode := client.NewNMOSNode(
"http://localhost:3210",
"monitor-01.studio.local",
)
monitorNode.AddVideoReceiver(
"Program Monitor",
)
if err := monitorNode.Start(); err != nil {
log.Fatal(err)
}
fmt.Println("✓ Monitor Node kaydedildi")
time.Sleep(2 * time.Second)
// 4. Kullanılabilir sender'lar için registry'yi sorgula
senders := queryRegistry("http://localhost:3211/x-nmos/query/v1.3/senders")
fmt.Printf("✓ %d sender bulundu\n", len(senders))
receivers := queryRegistry("http://localhost:3211/x-nmos/query/v1.3/receivers")
fmt.Printf("✓ %d receiver bulundu\n", len(receivers))
// 5. IS-05 ile bağlantı kur
if len(senders) > 0 && len(receivers) > 0 {
senderID := senders[0]["id"].(string)
receiverID := receivers[0]["id"].(string)
fmt.Printf("\n✓ Sender %s -> Receiver %s bağlanıyor\n", senderID, receiverID)
makeConnection(receiverID, senderID)
fmt.Println("✓ Bağlantı aktive edildi!")
}
// 6. WebSocket ile izle
fmt.Println("\n✓ Registry değişikliklerini izliyor (Ctrl+C ile durdurun)...")
select {}
}
func queryRegistry(url string) []map[string]interface{} {
// Query API'ye HTTP GET isteği
// JSON response'u parse et
return []map[string]interface{}{}
}
func makeConnection(receiverID, senderID string) {
// Receiver'ın /staged endpoint'ine IS-05 PATCH isteği
}
|
9.1 IS-06: Network Control (SDN Entegrasyonu)
IS-06, NMOS’un ağ altyapısını (switch’ler, router’lar) otomatik kontrol etmesini sağlar.
9.1.1 IS-06 Neden Önemli?
Geleneksel Yöntem:
1
2
3
4
|
1. Manuel VLAN konfigürasyonu
2. Elle multicast routing kurulumu
3. Statik bandwidth tahsisi
4. Fiziksel değişiklikler gerektirir
|
IS-06 ile:
1
2
3
4
|
✅ Otomatik VLAN yönetimi
✅ Dinamik bandwidth rezervasyonu
✅ Otomatik multicast routing
✅ Yazılım-tanımlı ağ kontrolü
|
9.1.2 IS-06 Kullanım Senaryoları
Senaryo 1: Otomatik VLAN Yönetimi
1
2
3
4
5
|
Kamera 1 (VLAN 100) → Switch → Monitor (VLAN 200)
↓
IS-06 API
↓
"Bu akış için VLAN 100'e izin ver"
|
Senaryo 2: Bandwidth Rezervasyonu
1
2
3
4
5
|
4K Stream = 8 Gbps gerekiyor
↓
IS-06 API
↓
Switch'te 10 Gbps rezerve et
|
9.1.3 Go ile IS-06 Network Controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
|
// network/is06.go
package network
import (
"encoding/json"
"fmt"
"net/http"
"github.com/gorilla/mux"
)
// NetworkController, ağ cihazlarını yönetir
type NetworkController struct {
switches map[string]*Switch
}
// Switch, yönetilen bir network switch'i temsil eder
type Switch struct {
ID string `json:"id"`
Label string `json:"label"`
Hostname string `json:"hostname"`
Management string `json:"management_address"`
Ports []SwitchPort `json:"ports"`
VLANs []VLAN `json:"vlans"`
}
// SwitchPort, switch port'u temsil eder
type SwitchPort struct {
PortID string `json:"port_id"`
Label string `json:"label"`
Speed string `json:"speed"` // "10G", "25G", "100G"
State string `json:"state"` // "up", "down"
VLANs []int `json:"vlans"`
Bandwidth int64 `json:"bandwidth_mbps"`
Reserved int64 `json:"reserved_mbps"` // Rezerve edilmiş bant genişliği
}
// VLAN konfigürasyonu
type VLAN struct {
VLANID int `json:"vlan_id"`
Name string `json:"name"`
Ports []string `json:"ports"`
Multicast bool `json:"multicast_enabled"`
}
// FlowReservation, bir akış için ağ kaynağı rezervasyonu
type FlowReservation struct {
FlowID string `json:"flow_id"`
SenderID string `json:"sender_id"`
ReceiverID string `json:"receiver_id"`
BandwidthMbps int64 `json:"bandwidth_mbps"`
VLANID int `json:"vlan_id"`
MulticastGroup string `json:"multicast_group"`
SourcePort string `json:"source_port"`
DestPort string `json:"dest_port"`
}
func NewNetworkController() *NetworkController {
return &NetworkController{
switches: make(map[string]*Switch),
}
}
// SetupRoutes, IS-06 API route'larını yapılandırır
func (nc *NetworkController) SetupRoutes(router *mux.Router) {
base := "/x-nmos/netctrl/v1.0"
// Network endpoints
router.HandleFunc(base+"/endpoints", nc.GetEndpoints).Methods("GET")
// Switch yönetimi
router.HandleFunc(base+"/switches", nc.GetSwitches).Methods("GET")
router.HandleFunc(base+"/switches/{switchId}/ports", nc.GetSwitchPorts).Methods("GET")
// Flow rezervasyonu
router.HandleFunc(base+"/reservations", nc.CreateReservation).Methods("POST")
router.HandleFunc(base+"/reservations/{resId}", nc.DeleteReservation).Methods("DELETE")
}
// CreateReservation, bir NMOS flow için ağ kaynakları rezerve eder
func (nc *NetworkController) CreateReservation(w http.ResponseWriter, r *http.Request) {
var reservation FlowReservation
if err := json.NewDecoder(r.Body).Decode(&reservation); err != nil {
http.Error(w, "Geçersiz JSON", http.StatusBadRequest)
return
}
// 1. Bandwidth kontrolü
if err := nc.checkBandwidthAvailability(reservation); err != nil {
http.Error(w, err.Error(), http.StatusConflict)
return
}
// 2. VLAN konfigürasyonu
if err := nc.configureVLAN(reservation); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 3. Multicast routing
if err := nc.configureMulticast(reservation); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// 4. QoS ayarları
if err := nc.configureQoS(reservation); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(map[string]interface{}{
"reservation_id": reservation.FlowID,
"status": "active",
"bandwidth": reservation.BandwidthMbps,
"vlan": reservation.VLANID,
})
}
// checkBandwidthAvailability, yeterli bandwidth olup olmadığını kontrol eder
func (nc *NetworkController) checkBandwidthAvailability(res FlowReservation) error {
sw, exists := nc.switches[extractSwitchFromPort(res.SourcePort)]
if !exists {
return fmt.Errorf("switch bulunamadı")
}
for _, port := range sw.Ports {
if port.PortID == res.SourcePort {
available := port.Bandwidth - port.Reserved
if available < res.BandwidthMbps {
return fmt.Errorf("yetersiz bandwidth: mevcut %d Mbps, gerekli %d Mbps",
available, res.BandwidthMbps)
}
// Bandwidth'i rezerve et
port.Reserved += res.BandwidthMbps
return nil
}
}
return fmt.Errorf("port bulunamadı: %s", res.SourcePort)
}
// configureVLAN, switch'te VLAN konfigürasyonu yapar
func (nc *NetworkController) configureVLAN(res FlowReservation) error {
// Gerçek switch'e NETCONF/REST API ile bağlan
// Örnek: Cisco, Arista, Juniper API'leri
fmt.Printf("VLAN %d konfigüre ediliyor: port %s → %s\n",
res.VLANID, res.SourcePort, res.DestPort)
// NETCONF XML örneği (Cisco için)
netconfCommand := fmt.Sprintf(`
<config>
<interface>
<name>%s</name>
<switchport>
<mode>trunk</mode>
<trunk>
<allowed>
<vlan>%d</vlan>
</allowed>
</trunk>
</switchport>
</interface>
</config>
`, res.SourcePort, res.VLANID)
// Switch API'sine gönder
return nc.sendToSwitch(extractSwitchFromPort(res.SourcePort), netconfCommand)
}
// configureMulticast, multicast routing konfigürasyonu yapar
func (nc *NetworkController) configureMulticast(res FlowReservation) error {
fmt.Printf("Multicast group %s için routing konfigüre ediliyor\n", res.MulticastGroup)
// IGMP snooping aktivasyonu
// PIM (Protocol Independent Multicast) konfigürasyonu
command := fmt.Sprintf(`
ip multicast-routing
interface %s
ip pim sparse-mode
ip igmp version 3
`, res.SourcePort)
return nc.sendToSwitch(extractSwitchFromPort(res.SourcePort), command)
}
// configureQoS, Quality of Service ayarları yapar
func (nc *NetworkController) configureQoS(res FlowReservation) error {
fmt.Printf("QoS konfigüre ediliyor: %d Mbps garantili\n", res.BandwidthMbps)
// DSCP marking (ST 2110 için genelde EF - Expedited Forwarding)
// Rate limiting
// Priority queuing
command := fmt.Sprintf(`
class-map match-all NMOS-VIDEO
match dscp ef
policy-map NMOS-QOS
class NMOS-VIDEO
priority %d
police %d000000 conform-action transmit exceed-action drop
`, res.BandwidthMbps/10, res.BandwidthMbps)
return nc.sendToSwitch(extractSwitchFromPort(res.SourcePort), command)
}
// sendToSwitch, switch'e komut gönderir (NETCONF, REST API, SSH)
func (nc *NetworkController) sendToSwitch(switchID, command string) error {
sw, exists := nc.switches[switchID]
if !exists {
return fmt.Errorf("switch bulunamadı: %s", switchID)
}
// Gerçek implementasyonda:
// - NETCONF client kullan (github.com/Juniper/go-netconf)
// - Veya vendor-specific REST API
// - Veya SSH ile CLI komutları
fmt.Printf("Switch %s (%s) komut gönderiliyor...\n", sw.Label, sw.Management)
fmt.Printf("Command: %s\n", command)
return nil
}
func extractSwitchFromPort(portID string) string {
// Port ID formatı: "switch-01:eth1/1"
// Switch ID'yi çıkar
return "switch-01" // Basitleştirilmiş
}
|
9.1.4 Switch Entegrasyonu Örnekleri
Cisco Switch için:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// Cisco IOS-XE REST API
func (nc *NetworkController) configureCiscoSwitch(switchIP, vlanID, port string) error {
url := fmt.Sprintf("https://%s/restconf/data/Cisco-IOS-XE-native:native/vlan", switchIP)
vlanConfig := map[string]interface{}{
"Cisco-IOS-XE-vlan:vlan": map[string]interface{}{
"vlan-list": []map[string]interface{}{
{
"id": vlanID,
"name": fmt.Sprintf("NMOS-VLAN-%s", vlanID),
},
},
},
}
// REST API call
return sendRestAPI(url, vlanConfig)
}
|
Arista Switch için:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// Arista eAPI
func (nc *NetworkController) configureAristaSwitch(switchIP, vlanID string) error {
url := fmt.Sprintf("https://%s/command-api", switchIP)
commands := []string{
"enable",
fmt.Sprintf("configure"),
fmt.Sprintf("vlan %s", vlanID),
fmt.Sprintf("name NMOS-VLAN-%s", vlanID),
}
return nc.sendAristaCommand(url, commands)
}
|
9.1.5 Tam Entegrasyon: NMOS + SDN
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
// examples/nmos_sdn_integration.go
package main
import (
"fmt"
"yourproject/registry"
"yourproject/network"
)
func main() {
// 1. NMOS Registry başlat
reg := registry.NewRegistry()
// 2. Network Controller başlat
netCtrl := network.NewNetworkController()
// 3. Switch'leri ekle
netCtrl.AddSwitch(&network.Switch{
ID: "switch-core-01",
Label: "Core Switch 1",
Hostname: "core-sw-01.studio.local",
Management: "10.0.1.10",
Ports: []network.SwitchPort{
{PortID: "eth1/1", Speed: "100G", Bandwidth: 100000},
{PortID: "eth1/2", Speed: "100G", Bandwidth: 100000},
},
})
// 4. NMOS connection yapıldığında otomatik ağ konfigürasyonu
reg.OnConnection(func(senderID, receiverID string) {
// Flow bilgilerini al
sender := reg.GetSender(senderID)
// Bandwidth hesapla (1080p50 = 1.66 Gbps)
bandwidth := calculateBandwidth(sender)
// Ağ rezervasyonu yap
reservation := network.FlowReservation{
FlowID: sender.FlowID,
SenderID: senderID,
ReceiverID: receiverID,
BandwidthMbps: bandwidth,
VLANID: 100, // ST 2110 video VLAN
MulticastGroup: extractMulticast(sender.ManifestHref),
}
if err := netCtrl.CreateReservation(reservation); err != nil {
fmt.Printf("Ağ rezervasyonu başarısız: %v\n", err)
return
}
fmt.Println("✅ Ağ otomatik konfigüre edildi!")
})
}
func calculateBandwidth(sender *registry.Sender) int64 {
// 1080p50 10-bit 4:2:2 = 1.66 Gbps = 1660 Mbps
return 1660
}
|
9.1.6 IS-06 cURL Örnekleri
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# 1. Mevcut switch'leri listele
curl http://localhost:8080/x-nmos/netctrl/v1.0/switches | jq
# 2. Flow için bandwidth rezerve et
curl -X POST http://localhost:8080/x-nmos/netctrl/v1.0/reservations \
-H "Content-Type: application/json" \
-d '{
"flow_id": "flow-123",
"sender_id": "sender-456",
"receiver_id": "receiver-789",
"bandwidth_mbps": 1660,
"vlan_id": 100,
"multicast_group": "239.100.0.1"
}' | jq
# Response:
# {
# "reservation_id": "flow-123",
# "status": "active",
# "bandwidth": 1660,
# "vlan": 100
# }
# 3. Rezervasyonu sil
curl -X DELETE http://localhost:8080/x-nmos/netctrl/v1.0/reservations/flow-123
|
9.1.7 Production Deployment
Desteklenen Switch’ler:
- ✅ Cisco Nexus (NXAPI)
- ✅ Arista (eAPI)
- ✅ Juniper (NETCONF)
- ✅ Mellanox/NVIDIA (REST API)
- ✅ Dell (NETCONF)
Gereksinimler:
1
2
3
4
5
6
7
8
9
10
11
|
switch_requirements:
protocols:
- NETCONF
- REST API
- SSH (fallback)
features:
- VLAN tagging
- IGMP snooping v3
- PIM sparse-mode
- QoS/DSCP marking
- Rate limiting
|
9.1.8 Gerçek Dünya Faydaları
Eski Yöntem (Manuel):
1
2
3
4
5
|
1. Network engineer switch'e SSH ile bağlanır
2. Manuel VLAN ekler
3. Port'ları konfigüre eder
4. Multicast routing ayarlar
5. 30-60 dakika sürer ⏱️
|
IS-06 ile (Otomatik):
1
2
3
4
|
1. NMOS connection yap
2. IS-06 otomatik switch'i konfigüre eder
3. 5 saniyede tamamlanır ⚡
4. Hata riski minimum
|
ROI:
1
2
3
4
|
Manuel: 100 bağlantı × 30 dk = 50 saat işçilik
IS-06: 100 bağlantı × 5 sn = 8 dakika
Tasarruf: ~49 saat 🎯
|
10. Hata Yönetimi ve Kurtarma Pattern’leri
10.1 Yaygın Hata Senaryoları
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
// errors/nmos_errors.go
package errors
import (
"errors"
"fmt"
"time"
)
var (
ErrRegistryUnreachable = errors.New("NMOS registry erişilemez")
ErrHeartbeatTimeout = errors.New("heartbeat timeout")
ErrInvalidResource = errors.New("geçersiz kaynak")
ErrConnectionFailed = errors.New("bağlantı başarısız")
)
// ErrorHandler, retry mantığıyla NMOS hatalarını yönetir
type ErrorHandler struct {
maxRetries int
retryDelay time.Duration
onError func(error)
}
func NewErrorHandler() *ErrorHandler {
return &ErrorHandler{
maxRetries: 3,
retryDelay: 5 * time.Second,
onError: func(err error) { fmt.Printf("Hata: %v\n", err) },
}
}
// WithRetry, retry mantığıyla bir fonksiyonu çalıştırır
func (h *ErrorHandler) WithRetry(fn func() error) error {
var err error
for i := 0; i <= h.maxRetries; i++ {
err = fn()
if err == nil {
return nil
}
h.onError(fmt.Errorf("deneme %d/%d başarısız: %w", i+1, h.maxRetries+1, err))
if i < h.maxRetries {
time.Sleep(h.retryDelay)
}
}
return fmt.Errorf("tüm retry denemeleri başarısız: %w", err)
}
|
10.2 Registry Erişilemez - Fallback Stratejisi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
// client/resilient_node.go
package client
import (
"fmt"
"time"
)
// ResilientNode, registry başarısızlıklarını zarif bir şekilde yönetir
type ResilientNode struct {
*NMOSNode
fallbackRegistry string
cacheEnabled bool
lastKnownState map[string]interface{}
}
func (n *ResilientNode) StartWithResilience() error {
handler := NewErrorHandler()
// Retry ile kayıt olmayı dene
err := handler.WithRetry(func() error {
return n.registerResource(n.Node, "node")
})
if err != nil {
// Fallback: İkincil registry'yi dene
if n.fallbackRegistry != "" {
fmt.Println("Primary registry başarısız, fallback deneniyor...")
n.RegistryURL = n.fallbackRegistry
return n.Start()
}
// Son çare: Registry olmadan standalone modda çalış
fmt.Println("Standalone modda çalışıyor (registry yok)")
return n.runStandalone()
}
// Recovery ile heartbeat'i başlat
go n.resilientHeartbeat()
return nil
}
func (n *ResilientNode) resilientHeartbeat() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
consecutiveFailures := 0
maxFailures := 3
for range ticker.C {
err := n.sendHeartbeat()
if err != nil {
consecutiveFailures++
fmt.Printf("Heartbeat başarısız (%d/%d): %v\n", consecutiveFailures, maxFailures, err)
if consecutiveFailures >= maxFailures {
// Node'u yeniden kaydet
fmt.Println("Çok fazla heartbeat başarısızlığı, yeniden kaydediliyor...")
if err := n.Start(); err == nil {
consecutiveFailures = 0
}
}
} else {
consecutiveFailures = 0
}
}
}
func (n *ResilientNode) runStandalone() error {
// Registry olmadan çalış, local keşif sun
fmt.Println("Local mDNS duyurusu başlatılıyor...")
// Burada mDNS/DNS-SD duyurusunu implement et
return nil
}
|
10.3 Network Partition Yönetimi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// Split-brain önleme
func (r *Registry) DetectNetworkPartition() bool {
// Birden fazla registry'nin primary olduğunu düşünüp düşünmediğini kontrol et
// Consensus algoritması kullan (Raft, etcd, vb.)
// Registry instance'ları arasında basit heartbeat kontrolü
peers := []string{"registry-01:3210", "registry-02:3210"}
aliveCount := 0
for _, peer := range peers {
if r.ping(peer) {
aliveCount++
}
}
// Quorum kontrolü (çoğunluk erişilebilir olmalı)
quorum := (len(peers) + 1) / 2
return aliveCount >= quorum
}
func (r *Registry) ping(peer string) bool {
resp, err := http.Get(fmt.Sprintf("http://%s/health", peer))
if err != nil {
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}
|
10.4 Disconnect’te Kaynak Temizliği
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
// Kaynak temizliğiyle graceful shutdown
func (n *NMOSNode) GracefulShutdown() error {
fmt.Println("Graceful shutdown başlatılıyor...")
// 1. Yeni bağlantıları kabul etmeyi durdur
close(n.stopChan)
// 2. Tüm kaynakları registry'den sil
for _, sender := range n.Senders {
url := fmt.Sprintf("%s/x-nmos/registration/v1.3/resource/senders/%s",
n.RegistryURL, sender.ID)
req, _ := http.NewRequest("DELETE", url, nil)
n.httpClient.Do(req)
}
// 3. Node'u sil
url := fmt.Sprintf("%s/x-nmos/registration/v1.3/resource/nodes/%s",
n.RegistryURL, n.Node.ID)
req, _ := http.NewRequest("DELETE", url, nil)
n.httpClient.Do(req)
fmt.Println("Shutdown tamamlandı")
return nil
}
|
11. Docker ve Kubernetes Deployment
11.1 NMOS Registry için Dockerfile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
# Dockerfile
FROM golang:1.21-alpine AS builder
WORKDIR /app
# Go mod dosyalarını kopyala
COPY go.mod go.sum ./
RUN go mod download
# Kaynak kodunu kopyala
COPY . .
# Uygulamayı build et
RUN CGO_ENABLED=0 GOOS=linux go build -o nmos-registry ./cmd/registry
# Final stage
FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
# Binary'yi builder'dan kopyala
COPY --from=builder /app/nmos-registry .
# Portları expose et
EXPOSE 3210 3211
# Uygulamayı çalıştır
CMD ["./nmos-registry"]
|
11.2 Docker Compose Kurulumu
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# docker-compose.yml
version: '3.8'
services:
# Registry depolama için PostgreSQL
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: nmos
POSTGRES_USER: nmos
POSTGRES_PASSWORD: nmos_secret
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- nmos_network
# NMOS Registry
nmos-registry:
build: .
ports:
- "3210:3210" # Registration API
- "3211:3211" # Query API
environment:
DATABASE_URL: "postgres://nmos:nmos_secret@postgres:5432/nmos?sslmode=disable"
LOG_LEVEL: "info"
depends_on:
- postgres
networks:
- nmos_network
restart: unless-stopped
# Monitoring için Prometheus
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
networks:
- nmos_network
# Dashboard'lar için Grafana
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
volumes:
- grafana_data:/var/lib/grafana
networks:
- nmos_network
volumes:
postgres_data:
prometheus_data:
grafana_data:
networks:
nmos_network:
driver: bridge
|
11.3 Kubernetes Deployment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# k8s/registry-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: nmos-registry
namespace: broadcast
spec:
replicas: 3 # Yüksek erişilebilirlik
selector:
matchLabels:
app: nmos-registry
template:
metadata:
labels:
app: nmos-registry
spec:
containers:
- name: nmos-registry
image: yourregistry/nmos-registry:latest
ports:
- containerPort: 3210
name: registration
- containerPort: 3211
name: query
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: nmos-secrets
key: database-url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 3210
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 3210
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: nmos-registry
namespace: broadcast
spec:
selector:
app: nmos-registry
ports:
- name: registration
port: 3210
targetPort: 3210
- name: query
port: 3211
targetPort: 3211
type: LoadBalancer # Veya Ingress ile ClusterIP
---
apiVersion: v1
kind: Service
metadata:
name: nmos-registry-headless
namespace: broadcast
spec:
clusterIP: None
selector:
app: nmos-registry
ports:
- name: registration
port: 3210
- name: query
port: 3211
|
11.4 PostgreSQL için Kubernetes StatefulSet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# k8s/postgres-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgres
namespace: broadcast
spec:
serviceName: postgres
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:15-alpine
ports:
- containerPort: 5432
env:
- name: POSTGRES_DB
value: nmos
- name: POSTGRES_USER
value: nmos
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: nmos-secrets
key: postgres-password
volumeMounts:
- name: postgres-storage
mountPath: /var/lib/postgresql/data
volumeClaimTemplates:
- metadata:
name: postgres-storage
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 20Gi
|
11.5 Kubernetes’e Deploy Etme
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# Namespace oluştur
kubectl create namespace broadcast
# Secret'ları oluştur
kubectl create secret generic nmos-secrets \
--from-literal=database-url='postgres://nmos:secret@postgres:5432/nmos' \
--from-literal=postgres-password='secret' \
-n broadcast
# PostgreSQL'i deploy et
kubectl apply -f k8s/postgres-statefulset.yaml
# NMOS Registry'yi deploy et
kubectl apply -f k8s/registry-deployment.yaml
# Durumu kontrol et
kubectl get pods -n broadcast
kubectl logs -f deployment/nmos-registry -n broadcast
# Local erişim için port forward
kubectl port-forward svc/nmos-registry 3210:3210 3211:3211 -n broadcast
|
12. IS-10: Authentication ve Authorization
12.1 Temel OAuth2 Token Doğrulama
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
// auth/is10.go
package auth
import (
"crypto/rsa"
"errors"
"fmt"
"net/http"
"strings"
"github.com/golang-jwt/jwt/v5"
)
// IS10Middleware OAuth2 JWT token'larını doğrular
type IS10Middleware struct {
publicKey *rsa.PublicKey
issuer string
audience string
}
func NewIS10Middleware(publicKeyPEM string, issuer, audience string) (*IS10Middleware, error) {
publicKey, err := jwt.ParseRSAPublicKeyFromPEM([]byte(publicKeyPEM))
if err != nil {
return nil, err
}
return &IS10Middleware{
publicKey: publicKey,
issuer: issuer,
audience: audience,
}, nil
}
// ValidateToken middleware
func (m *IS10Middleware) ValidateToken(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Authorization header'dan token çıkar
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
http.Error(w, "Authorization header eksik", http.StatusUnauthorized)
return
}
tokenString := strings.TrimPrefix(authHeader, "Bearer ")
// JWT'yi parse et ve doğrula
token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
// Signing method'u doğrula
if _, ok := token.Method.(*jwt.SigningMethodRSA); !ok {
return nil, fmt.Errorf("beklenmeyen signing method: %v", token.Header["alg"])
}
return m.publicKey, nil
})
if err != nil || !token.Valid {
http.Error(w, "Geçersiz token", http.StatusUnauthorized)
return
}
// Claim'leri doğrula
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
http.Error(w, "Geçersiz token claim'leri", http.StatusUnauthorized)
return
}
// Issuer kontrol et
if iss, ok := claims["iss"].(string); !ok || iss != m.issuer {
http.Error(w, "Geçersiz issuer", http.StatusUnauthorized)
return
}
// Audience kontrol et
if aud, ok := claims["aud"].(string); !ok || aud != m.audience {
http.Error(w, "Geçersiz audience", http.StatusUnauthorized)
return
}
// Scope'ları kontrol et (read, write)
scopes, _ := claims["scope"].(string)
if !strings.Contains(scopes, "nmos:read") {
http.Error(w, "Yetersiz izinler", http.StatusForbidden)
return
}
// Token geçerli, devam et
next.ServeHTTP(w, r)
})
}
// Router'a uygula
func ApplyIS10Security(router *mux.Router, middleware *IS10Middleware) {
// Public endpoint'ler (auth gerekmez)
router.HandleFunc("/health", healthHandler).Methods("GET")
// Korunan endpoint'ler
protected := router.PathPrefix("/x-nmos").Subrouter()
protected.Use(middleware.ValidateToken)
// Tüm NMOS endpoint'leri artık geçerli JWT gerektirir
}
|
12.2 cURL ile Test Etme
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# 1. Authorization server'dan OAuth2 token al
TOKEN=$(curl -X POST https://auth.example.com/oauth/token \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials" \
-d "client_id=your_client_id" \
-d "client_secret=your_secret" \
-d "scope=nmos:read nmos:write" \
| jq -r '.access_token')
# 2. Korunan NMOS API'ye eriş
curl http://localhost:3211/x-nmos/query/v1.3/nodes \
-H "Authorization: Bearer $TOKEN" \
| jq
# Token olmadan (başarısız olmalı):
curl http://localhost:3211/x-nmos/query/v1.3/nodes
# Response: 401 Unauthorized
|
13. Troubleshooting Rehberi
13.1 Hızlı Tanı Kontrol Listesi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# ✅ Registry çalışıyor mu?
curl -s http://localhost:3210/health && echo "✓ Registration API OK" || echo "✗ Registration API DOWN"
curl -s http://localhost:3211/health && echo "✓ Query API OK" || echo "✗ Query API DOWN"
# ✅ Node'lar kaydoluyor mu?
NODE_COUNT=$(curl -s http://localhost:3211/x-nmos/query/v1.3/nodes | jq length)
echo "Kayıtlı node'lar: $NODE_COUNT"
# ✅ Heartbeat'ler çalışıyor mu?
# Test node kaydet ve 12 saniye bekle
# Kayboluyorsa, heartbeat başarısız
# ✅ mDNS/DNS-SD kontrol et
avahi-browse -a -t # Linux
dns-sd -B _nmos-registration._tcp # macOS
# ✅ Ağ bağlantısı
ping registry.local
telnet registry.local 3210
|
13.2 Yaygın Sorunlar ve Çözümler
| Sorun |
Belirti |
Çözüm |
| Registry erişilemez |
Node kayıt olamıyor |
Firewall, ağ bağlantısı, DNS kontrol et |
| Node’lar kayboluyor |
12s sonra node’lar yok oluyor |
Heartbeat implementasyonu kontrol et, ağ gecikmesi |
| WebSocket bağlanamıyor |
WS upgrade başarısız |
Proxy ayarları kontrol et, HTTP/1.1 gerekli |
| SDP fetch başarısız |
IS-05 bağlantı başarısız |
SDP URL erişilebilir mi kontrol et, CORS ayarları |
| Version uyuşmazlığı |
“Invalid version” hatası |
seconds:nanoseconds formatı kullan (örn., 1735545600:0) |
| UUID çakışması |
Duplicate resource ID |
Düzgün UUID’ler oluştur (github.com/google/uuid kullan) |
| PTP senkronize olmuyor |
Zamanlama sorunları |
PTP domain kontrol et, switch’ler PTP desteklemeli |
| Yüksek CPU kullanımı |
Registry yavaş |
Database index’leri ekle, connection pooling kullan |
13.3 Debug Logging
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// Detaylı logging'i etkinleştir
import "github.com/sirupsen/logrus"
logrus.SetLevel(logrus.DebugLevel)
logrus.SetFormatter(&logrus.JSONFormatter{})
// Handler'larınızda
logrus.WithFields(logrus.Fields{
"node_id": node.ID,
"version": node.Version,
"hostname": node.Hostname,
}).Info("Node kaydedildi")
logrus.WithFields(logrus.Fields{
"sender_id": sender.ID,
"receiver_id": receiver.ID,
"action": "connection",
}).Debug("IS-05 bağlantı istendi")
|
13.4 Ağ Debug’ı
1
2
3
4
5
6
7
8
9
10
11
12
|
# tcpdump ile NMOS trafiğini yakala
sudo tcpdump -i eth0 -w nmos-traffic.pcap 'port 3210 or port 3211'
# Wireshark ile analiz et
wireshark nmos-traffic.pcap
# Multicast routing kontrol et (ST 2110 stream'leri için)
ip mroute show
netstat -g
# WebSocket bağlantılarını izle
ss -t | grep 3211
|
14. Yaygın Hatalar ve Best Practice’ler
14.1 Yaygın Hatalar
❌ YAPMAYIN: Sıralı ID kullanın
1
2
|
// Kötü
node.ID = fmt.Sprintf("node-%d", counter++)
|
✅ YAPIN: Düzgün UUID kullanın
1
2
3
|
// İyi
import "github.com/google/uuid"
node.ID = uuid.New().String()
|
❌ YAPMAYIN: Değişikliklerde version güncellemeyi unutun
1
2
3
|
// Kötü - version aynı kalıyor
node.Label = "Yeni Label"
registry.RegisterNode(node)
|
✅ YAPIN: Version timestamp’ini güncelleyin
1
2
3
4
|
// İyi
node.Label = "Yeni Label"
node.Version = CreateVersion() // Mevcut timestamp'e günceller
registry.RegisterNode(node)
|
❌ YAPMAYIN: Heartbeat hatalarını görmezden gelin
1
2
|
// Kötü - sessiz başarısızlık
sendHeartbeat() // Hatayı görmezden gelir
|
✅ YAPIN: Heartbeat hatalarını yönetin
1
2
3
4
5
|
// İyi
if err := sendHeartbeat(); err != nil {
log.Printf("Heartbeat başarısız: %v, yeniden kaydediliyor...", err)
node.Start() // Yeniden kaydet
}
|
- Database Index’leri (PostgreSQL):
1
2
3
4
|
CREATE INDEX idx_nodes_id ON nodes(id);
CREATE INDEX idx_senders_device_id ON senders(device_id);
CREATE INDEX idx_resources_type ON resources(type);
CREATE INDEX idx_health_updated_at ON health(updated_at);
|
- Connection Pooling:
1
2
3
|
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(25)
db.SetConnMaxLifetime(5 * time.Minute)
|
- Caching (Redis):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// Sık erişilen kaynakları önbelleğe al
func (r *Registry) GetSenderCached(id string) (*Sender, error) {
// Önce cache'i dene
cached, err := redisClient.Get(ctx, "sender:"+id).Result()
if err == nil {
var sender Sender
json.Unmarshal([]byte(cached), &sender)
return &sender, nil
}
// Cache miss, DB'den al
sender := r.GetSender(id)
data, _ := json.Marshal(sender)
redisClient.Set(ctx, "sender:"+id, data, 30*time.Second)
return sender, nil
}
|
14.3 Güvenlik Best Practice’leri
- ✅ Production’da her zaman TLS kullanın (
https://)
- ✅ IS-10 OAuth2 authentication implement edin
- ✅ Tüm input’ları doğrulayın (JSON schema validation)
- ✅ API endpoint’lerini rate limit edin
- ✅ Güçlü UUID’ler kullanın (tahmin edilemez)
- ✅ Log çıktısını temizleyin (hassas veri yok)
- ✅ CORS’u düzgün implement edin
- ✅ Secrets management kullanın (Vault, K8s secrets)
14.4 Ölçeklenebilirlik İpuçları
Büyük Tesisler İçin (1000+ cihaz):
- Kaynak türüne göre shard’lama:
1
2
3
|
// Farklı kaynak türleri için ayrı database'ler
nodesDB := connectDB("nodes_db")
sendersDB := connectDB("senders_db")
|
- Event’ler için message queue kullan:
1
2
3
4
|
// In-memory channel'lar yerine, Kafka/RabbitMQ kullan
func (r *Registry) PublishEvent(event ResourceUpdate) {
kafkaProducer.Send("nmos-events", event)
}
|
- Load balancer ile horizontal ölçekleme:
1
2
3
4
5
|
┌─> Registry Instance 1
Client -> LB -> ┼─> Registry Instance 2
└─> Registry Instance 3
↓
Paylaşılan PostgreSQL
|
15. SDI’dan NMOS’a Geçiş: Adım Adım Rehber
15.1 Geçiş Stratejisi Genel Bakış
SDI’dan NMOS/ST 2110’a geçiş “big bang” olmak zorunda değil. İşte pratik bir aşamalı yaklaşım:
1
2
3
4
5
6
7
|
Faz 1: Değerlendirme (1-2 ay)
↓
Faz 2: Pilot (2-3 ay)
↓
Faz 3: Hibrit İşletme (6-12 ay)
↓
Faz 4: Tam IP Geçişi (6-12 ay)
|
15.2 Faz 1: Değerlendirme
Amaç: Mevcut altyapıyı anlamak ve geçişi planlamak
Görevler:
- SDI Ekipman Envanteri
1
2
3
4
5
|
# Bir spreadsheet oluşturun:
- Cihaz adı/türü
- I/O port sayısı
- Sinyal yolları (kimin kime bağlandığı)
- Kritik vs kritik olmayan cihazlar
|
- Sinyal Akışlarını Haritalama
1
2
3
|
Örnek mevcut akış:
Kamera 1 (SDI) → Router → Encoder → Playout
Kamera 2 (SDI) → Router → Monitor Duvarı
|
- Ağ Değerlendirmesi
1
2
3
4
5
6
7
8
|
# Mevcut ağ altyapısını kontrol edin:
- Switch kapasitesi (ST 2110 için minimum 10GbE gerekli)
- PTP desteği (IEEE 1588)
- Multicast routing yetenekleri
- Bant genişliği hesaplamaları
# Örnek: 1080p50 sıkıştırılmamış (ST 2110-20)
# = stream başına ~1.66 Gbps
|
- Yükseltme Önceliklerini Belirleyin
- Kritik olmayan yollardan başlayın (monitoring, QC)
- Kritik yolları (on-air playout) daha sonra bırakın
- Hızlı kazanımları belirleyin (örn., SDI dağıtım amplifikatörlerini IP multicast ile değiştirin)
Çıktı: Zaman çizelgesi ve bütçe ile geçiş yol haritası
15.3 Faz 2: Pilot Deployment
Amaç: Teknolojiyi küçük ölçekli deployment ile doğrulama
Görevler:
- NMOS Registry Deploy Edin
1
2
3
4
5
|
# Kolay kurulum için Docker kullanın
docker-compose up -d nmos-registry
# Registration API'yi doğrulayın
curl http://registry:3210/health
|
- 2-3 IP Cihaz Kurun
- Monitoring/multiviewer sistemleriyle başlayın
- Bunlar genellikle on-air için kritik değildir
- SDI ↔ IP Gateway Ekleyin
1
2
3
|
SDI Kamera → Gateway → NMOS Registry
↓
IP Monitor (test)
|
Önerilen Gateway’ler:
- Grass Valley IQUCP25
- Evertz 570IPG
- Embrionix emSFP
- Workflow’ları Doğrulayın
1
2
3
4
5
6
|
# Test kontrol listesi:
✓ Cihaz otomatik keşfi
✓ IS-05 ile bağlantı kurma
✓ IS-08 ile ses yönlendirmesi
✓ PTP senkronizasyonu
✓ Failover senaryoları
|
Süre: 2-3 ay
Bütçe: $50k-$100k (küçük pilot)
15.4 Faz 3: Hibrit İşletme
Amaç: SDI ve IP’yi paralel çalıştırma
Mimari:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
┌─────────────────────────────────────────┐
│ NMOS Registry (HA) │
│ Primary: 10.0.1.10 Backup: 10.0.1.11│
└─────────────────────────────────────────┘
↓
┌─────────┴─────────┐
↓ ↓
[IP Cihazlar] [SDI→IP Gateway'ler]
↓ ↓
ST 2110 SDI Cihazlar
Akışlar (Legacy)
↓ ↓
└─────────┬─────────┘
↓
[IP→SDI Gateway]
↓
Legacy Playout
|
Temel Stratejiler:
- Kritik Yolları Başlangıçta SDI’da Tutun
1
2
3
4
5
|
On-Air Zinciri (SDI'da tutun):
Canlı Kamera → SDI Router → Master Control → Playout
Kritik Olmayan (IP'ye taşıyın):
Monitoring, editing, QC, grafikler
|
- Gateway’leri Stratejik Olarak Kullanın
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// Örnek: SDI kamerayı IP workflow'a köprüleyin
type SDItoIPGateway struct {
SDIInput string
IPOutput *registry.Sender
NMOSNode *client.NMOSNode
}
func (g *SDItoIPGateway) Start() {
// Gateway'i NMOS node olarak kaydedin
g.NMOSNode.AddVideoSender(
"Gateway SDI Input 1",
"http://gateway:8080/sdp/input1.sdp",
)
g.NMOSNode.Start()
}
|
- Personeli Eğitin
- Eski yol: Fiziksel patch panel
- Yeni yol: Yazılım kontrolü (NMOS controller UI)
Süre: 6-12 ay
Bütçe: $200k-$500k (tesis boyutuna bağlı)
15.5 Faz 4: Tam IP Geçişi
Amaç: Tüm SDI’yi kaldır, saf IP tesisi
Görevler:
- SDI Cihazları Değiştirin
1
2
3
4
|
Eski: 10 SDI kamera → Yeni: NMOS'lu 10 IP kamera
Eski: SDI router → Yeni: 10GbE ağ switch'leri
Eski: SDI monitoring → Yeni: IP multiviewer
Eski: SDI encoder → Yeni: Yazılım encoder (NMOS-aware)
|
- Gateway’leri Devre Dışı Bırakın
- SDI→IP gateway’lerini kaldırın
- Eski SDI ekipmanını satın/yeniden kullanın
- Ağı Optimize Edin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# Son ağ tasarımı
core_switches:
- model: Arista 7280R3
capacity: 100GbE
ptp: true
edge_switches:
- model: Cisco Nexus 93180YC-FX
ports: 48x 25GbE
ptp: true
igmp_snooping: true
# VLAN tasarımı
vlans:
management: 10
st2110_video: 100
st2110_audio: 101
control: 200
|
- Otomasyon ve Orkestrasyon
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// NMOS ile tam workflow otomasyonu
func AutomateNewsShow() {
// 1. Kullanılabilir kameraları keşfet
cameras := queryRegistry("senders?format=video")
// 2. Kamera 1'i program çıkışına yönlendir
connectSenderToReceiver(cameras[0].ID, programReceiverID)
// 3. Tüm kameraları multiviewer'a yönlendir
for _, cam := range cameras {
connectToMultiviewer(cam.ID)
}
// 4. Ses yönlendirmesini yapılandır (mikrofonlar mikser'e)
audioSources := queryRegistry("senders?format=audio")
for i, src := range audioSources {
routeAudioChannel(src.ID, mixerInputs[i])
}
}
|
Süre: 6-12 ay
Bütçe: $500k-$2M (tam tesis değiştirme)
15.6 Geçiş Kontrol Listesi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
## Geçiş Öncesi
- [ ] Ağ altyapısı denetimi (minimum 10GbE)
- [ ] PTP-uyumlu switch'ler kurulu
- [ ] Personel eğitimi tamamlandı
- [ ] Yedekleme/geri alma planı dokümante edildi
## Geçiş Sırasında
- [ ] NMOS registry deploy edildi (HA)
- [ ] DNS-SD/mDNS yapılandırıldı
- [ ] Monitoring sistemi (Prometheus/Grafana)
- [ ] Önce kritik olmayan yollarda bağlantıları test edin
- [ ] Tüm cihaz konfigürasyonlarını dokümante edin
## Geçiş Sonrası
- [ ] Eski SDI ekipmanı kaldırıldı
- [ ] Dokümantasyon güncellendi
- [ ] Disaster recovery test edildi
- [ ] Performance benchmark'ları doğrulandı
- [ ] Personel yeni workflow'larla rahat
|
15.7 Yaygın Geçiş Hataları
| Hata |
Etki |
Çözüm |
| PTP desteği yok |
Ses/video senkronizasyon sorunları |
Switch’lerin IEEE 1588 desteklediğinden emin olun |
| Yetersiz bant genişliği |
Frame kayıpları |
Bant genişliğini hesaplayın: 1080p50 = ~1.66Gbps |
| Yedeklilik yok |
Tek hata noktası |
Çift registry, yedekli switch’ler |
| Zayıf ağ tasarımı |
Yüksek gecikme, jitter |
Broadcast ağ danışmanı işe alın |
| Personel direnci |
Operasyonel sorunlar |
Eğitim, değişim yönetimi |
15.6 Maliyet Karşılaştırması: SDI vs NMOS
Küçük Stüdyo (5 kamera, 10 I/O noktası):
| Bileşen |
SDI Maliyeti |
NMOS Maliyeti |
Tasarruf |
| Router |
$50k |
$0 (ağ) |
$50k |
| Kablolama |
$10k |
$2k (Ethernet) |
$8k |
| Monitoring |
$30k |
$5k (yazılım) |
$25k |
| Toplam |
$90k |
$7k |
$83k |
Büyük Tesis (100 kamera, 500 I/O noktası):
| Bileşen |
SDI Maliyeti |
NMOS Maliyeti |
Tasarruf |
| Router’lar |
$500k |
$150k (ağ) |
$350k |
| Kablolama |
$200k |
$30k |
$170k |
| Monitoring |
$300k |
$50k |
$250k |
| Toplam |
$1M |
$230k |
$770k |
ROI Zaman Çizelgesi: 18-24 ay
Gerçek dünya NMOS Registry implementasyonu performance testi.
16.1 Test Ortamı
1
2
3
4
5
6
7
|
Donanım:
- Cloud: AWS c5.2xlarge (8 vCPU, 16GB RAM)
- Storage: PostgreSQL on io2 EBS (10K IOPS)
- Cache: Redis 7.0 (r6g.large)
Load Testing Tool: k6 (Grafana)
Test Süresi: Senaryo başına 1 saat
|
| Senaryo |
Node’lar |
Sender’lar |
İstek/sn |
Ort. Gecikme |
P95 Gecikme |
P99 Gecikme |
| Küçük |
100 |
1,000 |
5,000 |
8ms |
15ms |
25ms |
| Orta |
500 |
5,000 |
4,200 |
18ms |
28ms |
45ms |
| Büyük |
1,000 |
10,000 |
3,500 |
25ms |
45ms |
80ms |
| Çok Büyük |
5,000 |
50,000 |
2,800 |
40ms |
85ms |
150ms |
Notlar:
- PostgreSQL connection pooling ile (25 bağlantı)
- Redis cache etkin (30s TTL)
- Heartbeat yükü: 100 node × 0.2 req/s = 20 req/s temel
| Senaryo |
Toplam Kaynak |
Sorgu Türü |
İstek/sn |
Ort. Gecikme |
P95 Gecikme |
| Küçük |
10,000 |
GET /senders |
8,000 |
5ms |
12ms |
| Orta |
50,000 |
GET /senders |
6,500 |
12ms |
25ms |
| Büyük |
100,000 |
GET /senders |
5,000 |
22ms |
40ms |
| Filtreli |
100,000 |
GET /senders?format=video |
4,500 |
28ms |
50ms |
Optimizasyon Etkisi:
| Optimizasyon |
Gecikme İyileştirmesi |
Throughput İyileştirmesi |
| Redis cache ekle |
-60% |
+150% |
| PostgreSQL index’leri |
-40% |
+80% |
| Connection pooling |
-30% |
+50% |
| Gzip sıkıştırma |
+5% (hafif artış) |
+200% (bant genişliği) |
16.4 WebSocket Ölçeklenebilirliği
| Eşzamanlı Bağlantılar |
CPU Kullanımı |
Bellek Kullanımı |
Event Teslim Süresi |
| 100 |
15% |
200MB |
<10ms |
| 500 |
35% |
600MB |
<20ms |
| 1,000 |
60% |
1.2GB |
<50ms |
| 5,000 |
85% |
5GB |
<200ms |
Öneri: Registry instance başına 1,000 WebSocket bağlantısı
16.5 Heartbeat İşleme
| Node’lar |
Heartbeat/sn |
CPU Kullanımı |
Bellek Kullanımı |
| 100 |
20 |
5% |
100MB |
| 500 |
100 |
12% |
300MB |
| 1,000 |
200 |
20% |
500MB |
| 5,000 |
1,000 |
45% |
2GB |
Formül: Heartbeat oranı = (Node’lar × 0.2) req/s (her 5 saniyede)
En Yaygın Sorgular:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
-- Sorgu 1: Tüm sender'ları al (1000x/sn çalışır)
SELECT * FROM senders;
-- Index'lerden önce: 150ms
-- Index'lerden sonra: 8ms
-- Sorgu 2: Device'a göre sender'ları al (500x/sn çalışır)
SELECT * FROM senders WHERE device_id = ?;
-- Index'den önce: 200ms
-- Index'den sonra: 5ms
-- Sorgu 3: Tür ve label'a göre kaynakları al (300x/sn çalışır)
SELECT * FROM resources WHERE type = ? AND label LIKE ?;
-- Index'den önce: 300ms
-- Index'den sonra: 15ms
|
Kritik Index’ler:
1
2
3
4
5
6
|
CREATE INDEX idx_senders_id ON senders(id);
CREATE INDEX idx_senders_device_id ON senders(device_id);
CREATE INDEX idx_resources_type ON resources(type);
CREATE INDEX idx_resources_label ON resources(label);
CREATE INDEX idx_health_updated_at ON health(updated_at);
CREATE INDEX idx_nodes_version ON nodes(version DESC);
|
16.7 Ölçekleme Önerileri
Benchmark’lara dayalı:
| Tesis Boyutu |
Kaynaklar |
Registry Instance’ları |
Database |
Cache |
| Küçük |
<10k |
1 (HA yok) |
PostgreSQL (small) |
Opsiyonel |
| Orta |
10k-50k |
2 (HA) |
PostgreSQL (medium) + Read replicas |
Redis |
| Büyük |
50k-200k |
3+ (HA) |
PostgreSQL (large) + Read replicas |
Redis Cluster |
| Enterprise |
200k+ |
5+ (HA) |
PostgreSQL Cluster + Sharding |
Redis Cluster |
Dikey Ölçekleme (Tek Instance):
- 100k kaynağa kadar: c5.2xlarge (8 vCPU, 16GB RAM)
- 200k kaynağa kadar: c5.4xlarge (16 vCPU, 32GB RAM)
Yatay Ölçekleme (Birden Fazla Instance):
1
2
3
4
5
6
7
8
9
10
11
|
Load Balancer (Round Robin)
↓
┌───────────────┼───────────────┐
↓ ↓ ↓
Registry 1 Registry 2 Registry 3
↓ ↓ ↓
└───────────────┼───────────────┘
↓
PostgreSQL Primary
↓ ↓
Read Replica 1 Read Replica 2
|
16.8 Gerçek Dünya Production İstatistikleri
BBC Production Tesisi (kamuya açık bilgilere dayalı tahmin):
- 500+ NMOS-etkin cihaz
- 30,000+ kayıtlı kaynak
- 200+ WebSocket bağlantısı (controller’lar, monitoring)
- %99.99 uptime gereksinimi
- 3-node registry cluster (active-active-active)
Performance:
- Query gecikmesi: <20ms (p95)
- Registration gecikmesi: <30ms (p95)
- Event teslimatı: <50ms (p95)
17. Terimler Sözlüğü
A-E:
AMWA: Advanced Media Workflow Association
ANC (Ancillary Data): Görüntü dışı veri (altyazılar, timecode, vb.)
Device: Bir Node içindeki işlevsellik birimi (encoder, decoder, mixer)
DNS-SD: NMOS registry’lerini bulmak için DNS Service Discovery
Essence: Gerçek medya içeriği (video/ses örnekleri)
F-N:
Flow: Belirli encoding/format ile mantıksal medya akışı
Grain: Bireysel örnek/frame veya NMOS event mesajı
Heartbeat: Periyodik sağlık kontrolü (NMOS’ta her 5 saniye)
IS-04: NMOS Keşif ve Kayıt spesifikasyonu
IS-05: NMOS Bağlantı Yönetimi spesifikasyonu
IS-08: NMOS Kanal Haritalama spesifikasyonu
IS-09: NMOS Sistem Parametreleri spesifikasyonu
IS-10: NMOS Yetkilendirme spesifikasyonu (OAuth2)
Node: NMOS kaynaklarını barındıran fiziksel/sanal cihaz
O-Z:
PTP: Senkronizasyon için Precision Time Protocol (IEEE 1588)
Receiver: Medya akışlarını alan cihaz/arayüz
Registry: Kaynak kaydı ve keşfi için merkezi NMOS servisi
Sender: Medya akışlarını ileten cihaz/arayüz
Source: Medya içeriğinin kaynağı (kamera sensörü, dosya, vb.)
ST 2110: IP üzerinden profesyonel medya suite (video, ses, veri)
ST 2110-20: Sıkıştırılmamış video
ST 2110-30: PCM ses
ST 2110-40: Ancillary data
UUID: Universally Unique Identifier (128-bit)
WebSocket: Gerçek zamanlı NMOS event’leri için full-duplex iletişim
18. Sonuç
18.1 Ne Oluşturduk
Bu makale, production-ready Go implementasyonları ile kapsamlı bir AMWA NMOS rehberi sağladı:
✅ Temel NMOS Spesifikasyonları:
- IS-04: WebSocket subscription’larla Keşif ve Kayıt
- IS-05: SDP parsing ile Bağlantı Yönetimi
- IS-08: Ses yönlendirmesi için Kanal Haritalama
- IS-09: PTP zamanlama için Sistem Parametreleri
✅ Production Özellikleri:
- Error handling ve recovery pattern’leri
- Docker ve Kubernetes deployment
- IS-10 OAuth2 authentication
- Performance optimization (PostgreSQL, Redis, caching)
- Prometheus/Grafana ile monitoring
✅ Pratik Bilgi:
- SDI → NMOS geçiş stratejisi
- Yaygın sorunlarla troubleshooting rehberi
- Performance benchmark’ları (50k kaynak’a kadar)
- Gerçek dünya maliyet analizi ve ROI
18.2 NMOS Neden Önemli
NMOS, broadcast workflow’larını dönüştürüyor:
| Geleneksel SDI |
Modern NMOS |
| Manuel konfigürasyon |
Otomatik keşif |
| Fiziksel patching |
Yazılım tabanlı yönlendirme |
| Sınırlı ölçeklenebilirlik |
Cloud-native, sınırsız ölçek |
| Yüksek capex |
Düşük maliyetler |
| Vendor lock-in |
Açık standartlar, birlikte çalışabilirlik |
Gerçek Etki:
- BBC: IP’ye geçti, kablolama maliyetlerini %70 azalttı
- Discovery Networks: Otomatik workflow’lar, %50 daha hızlı
- Bölgesel Yayıncılar: NMOS ile cloud production, %60 maliyet tasarrufu
18.3 Okuyucular İçin Sonraki Adımlar
Başlayanlar:
- Docker Compose ile registry’yi kurun
- Bu makaledeki curl örneklerini test edin
- Basit bir node client deploy edin
- IS-05 bağlantılarıyla deney yapın
Orta Seviye:
- IS-07 (Event & Tally) implement edin
- Production güvenliği için IS-10 OAuth2 ekleyin
- Görsel kontrol için web UI oluşturun
- Mevcut broadcast otomasyonu ile entegre edin
İleri Seviye:
- SDN entegrasyonu için IS-06 implement edin
- Büyük tesisler için BCP-002-02 (Natural Grouping) ekleyin
- Workflow otomasyonu ile özel controller oluşturun
- AMWA NMOS açık kaynak projelerine katkıda bulunun
18.4 Broadcast IT’nin Geleceği
NMOS + ST 2110 şunları mümkün kılar:
Cloud-Native Broadcasting:
1
2
3
4
|
On-Premise Kameralar → NMOS Registry → Cloud İşleme → CDN → İzleyiciler
↓
Otomatik Workflow'lar
(Kubernetes + NMOS)
|
Yazılım Tanımlı Tesisler:
- Infrastructure as Code (Terraform + NMOS API’leri)
- Broadcast konfigürasyonları için GitOps
- Talebe göre otomatik ölçekleme
- Multi-cloud yedeklilik
AI/ML Entegrasyonu:
- İçerik analizine dayalı otomatik kamera seçimi
- Öngörücü bakım (NMOS istatistikleri ile cihaz sağlığı)
- Ağ koşullarına göre akıllı yönlendirme
- Gerçek zamanlı kalite izleme
18.5 Öğrenmeye Devam Etmek İçin Kaynaklar
Resmi Spesifikasyonlar:
Açık Kaynak:
Topluluk:
18.6 Son Düşünceler
NMOS, broadcast teknolojisinde temel bir değişimi temsil ediyor. Sadece kabloları Ethernet ile değiştirmekle ilgili değil:
- Otomasyon: Manuel patching ve konfigürasyonu ortadan kaldırın
- Esneklik: Tesisleri saatler yerine saniyeler içinde yeniden yapılandırın
- Ölçeklenebilirlik: Yeniden tasarım olmadan 10’dan 10,000 cihaza büyüyün
- Birlikte Çalışabilirlik: Vendor’ları sorunsuzca karıştırıp eşleştirin
- İnovasyon: SDI ile imkansız olan yeni workflow’ları etkinleştirin
Bu makaledeki Go implementasyonları, production broadcast sistemleri oluşturmak için sağlam bir temel sağlar. NMOS vendor-agnostic, API-first ve modern DevOps pratikleri için tasarlanmıştır—yeni nesil broadcast tesisleri için mükemmel.
Broadcast’in geleceği IP’dir. NMOS bunu pratik hale getirir.
19. Production Dikkat Edilmesi Gerekenler
19.1 Registry Keşfi için DNS-SD
Production’da, node’lar DNS-SD (mDNS veya unicast DNS) kullanarak registry’yi keşfeder:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
// discovery/dns_sd.go
package discovery
import (
"context"
"fmt"
"time"
"github.com/grandcat/zeroconf"
)
// DiscoverRegistry mDNS ile NMOS registry bulur
func DiscoverRegistry() (string, error) {
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
return "", err
}
entries := make(chan *zeroconf.ServiceEntry)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// _nmos-registration._tcp servisini ara
err = resolver.Browse(ctx, "_nmos-registration._tcp", "local.", entries)
if err != nil {
return "", err
}
select {
case entry := <-entries:
registryURL := fmt.Sprintf("http://%s:%d", entry.HostName, entry.Port)
return registryURL, nil
case <-ctx.Done():
return "", fmt.Errorf("registry bulunamadı")
}
}
|
19.2 Registry için PostgreSQL Depolama
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
// storage/postgres.go
package storage
import (
"database/sql"
"encoding/json"
_ "github.com/lib/pq"
)
type PostgresStorage struct {
db *sql.DB
}
func NewPostgresStorage(connStr string) (*PostgresStorage, error) {
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, err
}
return &PostgresStorage{db: db}, nil
}
func (s *PostgresStorage) SaveNode(node *registry.Node) error {
data, _ := json.Marshal(node)
_, err := s.db.Exec(`
INSERT INTO nodes (id, data, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (id) DO UPDATE SET data = $2, updated_at = NOW()
`, node.ID, data)
return err
}
func (s *PostgresStorage) GetNodes() ([]*registry.Node, error) {
rows, err := s.db.Query("SELECT data FROM nodes")
if err != nil {
return nil, err
}
defer rows.Close()
var nodes []*registry.Node
for rows.Next() {
var data []byte
rows.Scan(&data)
var node registry.Node
json.Unmarshal(data, &node)
nodes = append(nodes, &node)
}
return nodes, nil
}
|
19.3 Prometheus ile Monitoring
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// monitoring/metrics.go
package monitoring
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
registeredNodes = promauto.NewGauge(prometheus.GaugeOpts{
Name: "nmos_registered_nodes",
Help: "Kayıtlı NMOS node sayısı",
})
registeredSenders = promauto.NewGauge(prometheus.GaugeOpts{
Name: "nmos_registered_senders",
Help: "Kayıtlı sender sayısı",
})
activeConnections = promauto.NewGauge(prometheus.GaugeOpts{
Name: "nmos_active_connections",
Help: "Aktif IS-05 bağlantı sayısı",
})
)
func UpdateMetrics(reg *registry.Registry) {
registeredNodes.Set(float64(len(reg.GetNodes())))
registeredSenders.Set(float64(len(reg.GetSenders())))
}
|
19.1 Test ve Debug
AMWA resmi bir test aracı sağlar: nmos-testing
1
2
3
4
5
6
7
|
# nmos-testing yükle
git clone https://github.com/AMWA-TV/nmos-testing.git
cd nmos-testing
pip install -r requirements.txt
# Registry'nize karşı testleri çalıştırın
python nmos-test.py --host localhost --port 3210 --version v1.3
|
19.1.2 Debug İpuçları
- Wireshark: Keşfi görmek için mDNS trafiğini yakalayın
- Postman: API endpoint’lerini manuel olarak test edin
- WebSocket Inspector: Gerçek zamanlı güncellemeleri izleyin
- Logging: Kayıt akışını takip etmek için yapılandırılmış logging ekleyin
1
2
3
4
5
6
7
|
// Yapılandırılmış logging için logrus veya zap kullanın
import "github.com/sirupsen/logrus"
logrus.WithFields(logrus.Fields{
"node_id": node.ID,
"action": "register",
}).Info("Node kaydedildi")
|
19.2 NMOS’u Eylem Halinde Görselleştirme
19.2.1 Önerilen Görselleştirmeler
Bu makale implementasyona odaklansa da, görsel yardımcılar anlamayı büyük ölçüde kolaylaştırır:
1. WebSocket Gerçek Zamanlı Güncellemeler Demo’su
1
2
3
4
5
6
7
8
9
10
11
|
# Bunu eylem halinde kaydedin:
# Terminal 1: Registry'yi başlat
./nmos-registry
# Terminal 2: WebSocket event'lerini izle
wscat -c ws://localhost:3211/x-nmos/query/v1.3/subscriptions/xxx/ws
# Terminal 3: Yeni bir sender kaydet
curl -X POST http://localhost:3210/x-nmos/registration/v1.3/resource -d @sender.json
# Event'in Terminal 2'de anında göründüğünü görün!
|
2. Grafana Dashboard Screenshot
Önerilen paneller:
- Kayıtlı kaynaklar (timeseries)
- Query API gecikmesi (gauge)
- Heartbeat başarı oranı (%)
- WebSocket bağlantıları (graph)
- Database sorgu zamanı (heatmap)
3. NMOS Controller Web UI
Popüler açık kaynak controller’lar:
4. Network Packet Capture
NMOS için Wireshark filtreleri:
1
2
3
4
5
6
7
8
9
10
11
|
# mDNS keşfi
dns.qry.name contains "nmos-registration"
# Registration API
http.host == "localhost:3210"
# Query API
http.host == "localhost:3211"
# WebSocket
websocket
|
19.2.2 Canlı Demo Kurulumu
Hızlı demo ortamı:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# Demo repo'yu klonla (varsayımsal)
git clone https://github.com/yourusername/nmos-demo
cd nmos-demo
# Her şeyi Docker Compose ile başlat
docker-compose up -d
# Erişim:
# - Registry: http://localhost:3210, http://localhost:3211
# - Grafana: http://localhost:3000 (admin/admin)
# - Mock cihazlar: 5 sanal kamera, 3 monitör
# Grafana dashboard'unda otomatik kaydı izle
|
20. Gerçek Dünya Kullanım Senaryoları
20.1 Canlı Haber Prodüksiyonu
Senaryo: 5 kamera, grafik sistemi, ses mikseri olan haber stüdyosu
NMOS Faydaları:
- Otomatik Keşif: Tüm cihazlar kendilerini kaydeder
- Hızlı Yeniden Konfigürasyon: Kamera feed’leri arasında anında geçiş IS-05 ile
- Ses Yönlendirmesi: Belirli mikrofonları belirli çıkışlara yönlendirme IS-08 ile
- Merkezi Kontrol: Bir controller tüm tesisi yönetir
20.2 Spor Yayıncılığı
Senaryo: 20+ kamera, yorum pozisyonları, OB van olan stadyum
NMOS Faydaları:
- Ölçeklenebilirlik: Yüzlerce sender/receiver’ı yönet
- Uzaktan Prodüksiyon (REMI): Kameralar stadyumda, prodüksiyon merkezi tesiste
- Yedeklilik: IS-04 heartbeat monitoring kullanarak otomatik failover
- Multi-viewer: Controller’lar monitoring duvarları için tüm kaynakları keşfedebilir
20.3 Cloud Production
Senaryo: On-premise kameralar ile cloud-tabanlı video işleme
NMOS Faydaları:
- Hibrit Workflow’lar: On-premise node’lar cloud registry ile kaydolur
- Dinamik Ölçekleme: Cloud instance’ları ekle/kaldır, otomatik kayıt
- API-First: Orkestrasyon sistemleriyle (Kubernetes, vb.) kolay entegrasyon
21. Referanslar ve Kaynaklar
21.1 Resmi Spesifikasyonlar
AMWA NMOS:
SMPTE Standartları:
İlgili Standartlar:
- AES67 - IP Üzerinden Ses
- IEEE 1588 - Precision Time Protocol
- RFC 4566 - SDP: Session Description Protocol
- RFC 3550 - RTP: Real-time Transport Protocol
21.2 Açık Kaynak Projeleri
NMOS Implementasyonları:
NMOS Controller’lar:
Araçlar ve Yardımcı Programlar:
21.3 Kullanılan Go Kütüphaneleri
Temel Bağımlılıklar:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
require (
github.com/gorilla/mux v1.8.1 // HTTP routing
github.com/gorilla/websocket v1.5.1 // WebSocket desteği
github.com/google/uuid v1.5.0 // UUID oluşturma
github.com/lib/pq v1.10.9 // PostgreSQL driver
github.com/go-redis/redis/v8 v8.11.5 // Redis client
github.com/sirupsen/logrus v1.9.3 // Yapılandırılmış logging
github.com/prometheus/client_golang v1.18.0 // Metrikler
github.com/golang-jwt/jwt/v5 v5.2.0 // JWT işleme
github.com/grandcat/zeroconf v1.0.0 // mDNS/DNS-SD
gorm.io/gorm v1.25.5 // ORM
gorm.io/driver/postgres v1.5.4 // GORM PostgreSQL
)
|
21.4 Kitaplar ve Yayınlar
Broadcast IP:
- “IP in Broadcast Production” - Gerard Phillips
- “Networked Media Systems” - Brad Gilmer
- “Cloud-Based Broadcasting” - Tony Brown
Go Programlama:
- “The Go Programming Language” - Donovan & Kernighan
- “Concurrency in Go” - Katherine Cox-Buday
- “Building Microservices with Go” - Nic Jackson
21.5 Online Kurslar ve Eğitim
Broadcast IP:
Go Programlama:
21.6 Topluluklar ve Forumlar
NMOS/Broadcast:
Go:
21.7 Faydalı Araçlar
Geliştirme:
- Postman - API testing
- wscat - WebSocket testing
- jq - JSON işleme
- k6 - Load testing
Ağ:
Monitoring:
21.8 İlgili Makaleler (Yazar Tarafından)
21.9 Güncel Kalın
Abone olun:
Konferanslar:
- NAB Show (Las Vegas, Nisan)
- IBC (Amsterdam, Eylül)
- SMPTE Technical Conference
Sorularınız veya geri bildiriminiz mi var? GitHub veya LinkedIn‘de bana ulaşabilirsiniz.
Yasal Uyarı: Kod örnekleri eğitim amaçlıdır. Production kullanımından önce kapsamlı test yapın. NMOS spesifikasyonları güncellemelere tabidir—her zaman en son yayınlanan versiyonlara başvurun.