İçerikler

AMWA NMOS: SMPTE ST 2110 için Kontrol Düzlemi ve Go ile Uygulama

İçerikler

AMWA NMOS: SMPTE ST 2110 için Kontrol Düzlemi ve Go ile Uygulama

Özet

  • NMOS Genel Bakış: Ağ üzerinden medya kontrolü ve yönetimi için açık spesifikasyonlar
  • IS-04: Cihazların ve kaynakların keşfi ve kaydı
  • IS-05: Otomatik yönlendirme için cihaz bağlantı yönetimi
  • IS-06: Switch’ler ve ağ altyapısı için SDN kontrolü
  • IS-08: Cihazlar içinde ses yönlendirmesi için kanal haritalama
  • IS-09: Global ağ zamanlaması için sistem parametreleri
  • Go Implementasyonu: Production-ready NMOS client ve registry server
  • Gerçek Dünya Kullanım Senaryoları: Otomatik iş akışları, kaynak keşfi, bağlantı yönetimi, SDN entegrasyonu

Not: Bu makale, AMWA NMOS spesifikasyonlarının kapsamlı bir şekilde ele alınmasını ve pratik Go implementasyonlarını içerir. Tüm kod örnekleri gerçek broadcast ortamlarına dayanır ve NMOS best practice’lerini takip eder.


1. Giriş: AMWA NMOS Nedir?

1.1 Geleneksel Broadcast Kontrolünün Sorunu

SMPTE ST 2110 hakkında yazdığım makalede, video, ses ve metadata’nın IP ağları üzerinden nasıl taşındığını tartışmıştık. Ancak ST 2110 sadece veri düzlemini (data plane - medyanın nasıl aktığını) tanımlar. Şunları ele almaz:

  1. Cihaz Keşfi: Cihazlar birbirlerini ağda nasıl bulur?
  2. Bağlantı Yönetimi: Video/sesi cihazlar arasında nasıl yönlendiririz?
  3. Konfigürasyon: Cihaz parametrelerini nasıl yönetiriz?
  4. İzleme: Kaynak kullanılabilirliğini ve sağlığını nasıl takip ederiz?
  5. Birlikte Çalışabilirlik: Farklı üreticilerin cihazları nasıl birlikte çalışır?

Geleneksel SDI ortamlarında, kontrol panelli fiziksel router’larımız vardı. IP tabanlı ST 2110’da, standartlaştırılmış bir kontrol düzlemine (control plane) ihtiyacımız var.

1.2 SDI vs NMOS: Temel Farklar

NMOS’a dalmadan önce, geleneksel SDI iş akışlarıyla nasıl karşılaştırıldığını anlayalım:

Özellik Geleneksel SDI NMOS/ST 2110
Keşif Manuel (etiketler, dokümanlar) Otomatik (IS-04)
Yönlendirme Fiziksel router/patch panel Yazılım tabanlı (IS-05)
Ölçeklenebilirlik Fiziksel portlarla sınırlı Sınırsız (ağ bant genişliği)
Esneklik Sabit kablo yolları Dinamik, yeniden yapılandırılabilir
Maliyet Yüksek (kablolar, router’lar) Düşük (Ethernet altyapısı)
Uzaktan Kontrol Sınırlı Tam API erişimi
Metadata Sınırlı (VANC) Zengin (JSON)
Multi-viewer Özel donanım Yazılım tabanlı
Yedeklilik Çift kablolama Ağ yedekliliği
Entegrasyon Zor API-first, kolay

Gerçek Dünya Etkisi:

  • SDI: 100 kameralı tesis, 100+ kablo, 100+ I/O’lu fiziksel router gerektirir
  • NMOS: Aynı tesis 10GbE ağ üzerinde, sanal yönlendirme, yeniden kablolama olmadan kamera ekleme

1.3 AMWA NMOS: Çözüm

AMWA (Advanced Media Workflow Association), bu zorlukları çözmek için NMOS (Networked Media Open Specifications) geliştirdi. NMOS, şunları sağlayan bir spesifikasyon ailesidir:

  • Otomatik Keşif: Cihazlar kendilerini kaydeder ve yeteneklerini duyurur
  • RESTful API’ler: Cihaz kontrolü için standart HTTP API’leri
  • WebSocket Event’leri: Ağ değişikliklerinin gerçek zamanlı bildirimleri
  • Vendor Agnostic: Herhangi bir üreticinin ST 2110 ekipmanıyla çalışır
  • Ölçeklenebilir: Küçük stüdyolardan büyük broadcast tesislerine

1.4 NMOS Spesifikasyon Ailesi

NMOS, birden fazla spesifikasyondan (“IS” - Interface Specifications) oluşur:

Spesifikasyon Amaç Durum
IS-04 Keşif ve Kayıt Stabil
IS-05 Cihaz Bağlantı Yönetimi Stabil
IS-06 Ağ Kontrolü (SDN) ✅ Makale ile birlikte
IS-07 Event ve Tally Stabil
IS-08 Kanal Haritalama (Ses) Stabil
IS-09 Sistem Parametreleri (Zamanlama) Stabil
IS-10 Yetkilendirme (OAuth2) Stabil
BCP-003 Güvenlik Best Practice’leri Stabil

Bu makalede, IS-04, IS-05, IS-08 ve IS-09 spesifikasyonlarına ve Go implementasyonlarına odaklanacağız.


2. NMOS Mimarisi Genel Bakış

2.1 Üst Seviye Mimari

2.2 Temel Bileşenler

  1. NMOS Node: NMOS’a katılan herhangi bir cihaz (kameralar, encoder’lar, switcher’lar)
  2. NMOS Registry: Kaynak kaydı ve keşfi için merkezi servis
  3. NMOS Controller: Bağlantıları ve iş akışlarını yöneten uygulama

3. IS-04: Keşif ve Kayıt

3.1 IS-04 Genel Bakış

IS-04, NMOS’un temelidir. Cihazların şunları nasıl yaptığını tanımlar:

  • DNS-SD (mDNS/unicast DNS) kullanarak NMOS Registry’yi keşfetme
  • Kaynaklarını kaydetme (nodes, devices, sources, flows, senders, receivers)
  • Mevcut kaynakları sorgulama
  • WebSocket üzerinden gerçek zamanlı güncellemeler alma

3.2 IS-04 Kaynak Modeli

IS-04 kaynak hiyerarşisi:

1
2
3
4
5
6
Node (Cihaz/Sunucu)
  └── Device (Mantıksal İşlem Birimi)
       ├── Source (Medya Kaynağı)
       ├── Flow (Mantıksal Medya Akışı)
       ├── Sender (Flow'u İletir)
       └── Receiver (Flow'u Alır)

Örnek Senaryo: Gömülü encoder’lı kamera

  • Node: Kamera donanımı
  • Device: Encoder chip
  • Source: Kamera sensör çıkışı
  • Flow: Sıkıştırılmış video akışı (H.264)
  • Sender: Akışı ileten ağ arayüzü
  • Receiver: (dönüş feed monitörü varsa)

3.3 Kaynak Nitelikleri

Her kaynağın şunları vardır:

  • id: UUID (örn. 58f6b536-ca4c-43fd-880e-9df2af5d5d94)
  • version: Zaman damgası (örn. 1735545600:0)
  • label: İnsan tarafından okunabilir isim
  • description: Detaylı açıklama
  • tags: Gruplama için metadata

3.4 IS-04 API’leri

IS-04 iki ana API tanımlar:

  1. Registration API (port 3210): Node’ların kaynakları kaydetmesi için
  2. Query API (port 3211): Controller’ların kaynakları keşfetmesi için

3.5 IS-04 Kayıt Akışı

İşte tam kayıt sıralaması:

Önemli Noktalar:

  1. Kaynaklar hiyerarşik sırayla kaydedilmelidir (Node → Device → Source/Flow → Sender)
  2. Heartbeat her 5 saniyede zorunludur (12 saniyelik timeout)
  3. WebSocket aboneleri gerçek zamanlı güncellemeler alır
  4. Graceful shutdown açık DELETE istekleri gerektirir

4. Go ile NMOS Registry Oluşturma

IS-04’ü implement eden production-ready bir NMOS Registry oluşturalım.

4.1 Proje Yapısı

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
nmos-registry/
├── main.go
├── registry/
│   ├── registry.go        # Temel registry mantığı
│   ├── registration.go    # Registration API (IS-04)
│   ├── query.go           # Query API (IS-04)
│   ├── websocket.go       # WebSocket subscription'lar
│   └── models.go          # NMOS kaynak modelleri
├── storage/
│   ├── memory.go          # In-memory depolama
│   └── postgres.go        # PostgreSQL depolama (production)
└── go.mod

4.2 NMOS Kaynak Modelleri

Önce IS-04 kaynak modellerini tanımlayalım:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// registry/models.go
package registry

import (
    "time"
)

// Resource, tüm NMOS kaynakları için temel
type Resource struct {
    ID          string            `json:"id"`
    Version     string            `json:"version"`
    Label       string            `json:"label"`
    Description string            `json:"description"`
    Tags        map[string]string `json:"tags"`
}

// Node, cihazlar için mantıksal host'u temsil eder
type Node struct {
    Resource
    Hostname    string      `json:"hostname"`
    APIVersions []string    `json:"api"`
    Caps        interface{} `json:"caps"`
    Services    []Service   `json:"services"`
    Clocks      []Clock     `json:"clocks"`
    Interfaces  []Interface `json:"interfaces"`
}

// Device, bir işlevsellik birimini temsil eder (encoder, decoder, vb.)
type Device struct {
    Resource
    Type      string   `json:"type"` // generic, pipeline, host
    NodeID    string   `json:"node_id"`
    Senders   []string `json:"senders"`   // Sender ID'leri dizisi
    Receivers []string `json:"receivers"` // Receiver ID'leri dizisi
    Controls  []Control `json:"controls"`
}

// Source, medya içeriğinin kaynağını temsil eder
type Source struct {
    Resource
    Format    string      `json:"format"` // video, audio, data
    DeviceID  string      `json:"device_id"`
    ClockName string      `json:"clock_name"`
    GrainRate *Rational   `json:"grain_rate,omitempty"`
    // ... ek alanlar: caps, parents, ses için channels
}

// Flow, bir medya akışını temsil eder
type Flow struct {
    Resource
    Format       string    `json:"format"` // urn:x-nmos:format:video, audio, data
    SourceID     string    `json:"source_id"`
    DeviceID     string    `json:"device_id"`
    GrainRate    *Rational `json:"grain_rate,omitempty"`
    MediaType    string    `json:"media_type"` // video/raw, audio/L24, vb.
    
    // Video-spesifik: frame_width, frame_height, colorspace, interlace_mode
    // Ses-spesifik: sample_rate, bit_depth, channels
    // ... tam alan listesi için NMOS IS-04 spec'e bakın
}

// Sender bir flow'u iletir
type Sender struct {
    Resource
    FlowID          string         `json:"flow_id"`
    Transport       string         `json:"transport"` // urn:x-nmos:transport:rtp
    DeviceID        string         `json:"device_id"`
    ManifestHref    string         `json:"manifest_href"` // SDP dosya URL'i
    InterfaceBindings []string     `json:"interface_bindings"`
    SubscriptionID  string         `json:"subscription,omitempty"`
}

// Receiver bir flow'u alır
type Receiver struct {
    Resource
    Format       string         `json:"format"` // urn:x-nmos:format:video
    Caps         ReceiverCaps   `json:"caps"`
    Transport    string         `json:"transport"`
    DeviceID     string         `json:"device_id"`
    InterfaceBindings []string  `json:"interface_bindings"`
    SubscriptionID string       `json:"subscription,omitempty"`
}

// Destekleyici yapılar (basitleştirilmiş - tüm alanlar için spec'e bakın)
type Service struct {
    Href string `json:"href"`
    Type string `json:"type"`
}

type Clock struct {
    Name    string `json:"name"`
    RefType string `json:"ref_type"` // internal, ptp, vb.
}

type Interface struct {
    Name string `json:"name"`
    // ... chassis_id, port_id, vb.
}

type Rational struct {
    Numerator   int `json:"numerator"`
    Denominator int `json:"denominator"`
}

type ReceiverCaps struct {
    MediaTypes []string `json:"media_types,omitempty"`
}

// Tam kaynak tanımları için: https://specs.amwa.tv/is-04/

// Version, NMOS version zaman damgası oluşturur
func CreateVersion() string {
    now := time.Now()
    seconds := now.Unix()
    nanos := now.Nanosecond()
    return fmt.Sprintf("%d:%d", seconds, nanos)
}

4.3 Registry Temel Mantığı

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
// registry/registry.go
package registry

import (
    "fmt"
    "sync"
    "time"
)

// Registry, NMOS kaynaklarını yönetir
type Registry struct {
    mu         sync.RWMutex
    nodes      map[string]*Node
    devices    map[string]*Device
    sources    map[string]*Source
    flows      map[string]*Flow
    senders    map[string]*Sender
    receivers  map[string]*Receiver
    
    // Sağlık takibi
    health     map[string]time.Time // node_id -> son heartbeat
    
    // WebSocket subscription'lar
    subscribers map[string]*Subscriber
}

type Subscriber struct {
    ID       string
    Query    SubscriptionQuery
    Updates  chan ResourceUpdate
}

type SubscriptionQuery struct {
    ResourceType string            // node, device, sender, receiver
    Params       map[string]string // sorgu parametreleri
}

type ResourceUpdate struct {
    Action   string      // create, update, delete
    Type     string      // kaynak türü
    Resource interface{} // kaynak
}

// NewRegistry yeni bir NMOS registry oluşturur
func NewRegistry() *Registry {
    r := &Registry{
        nodes:       make(map[string]*Node),
        devices:     make(map[string]*Device),
        sources:     make(map[string]*Source),
        flows:       make(map[string]*Flow),
        senders:     make(map[string]*Sender),
        receivers:   make(map[string]*Receiver),
        health:      make(map[string]time.Time),
        subscribers: make(map[string]*Subscriber),
    }
    
    // Sağlık kontrolü goroutine'ini başlat
    go r.healthCheckLoop()
    
    return r
}

// RegisterNode bir node'u kaydeder veya günceller
func (r *Registry) RegisterNode(node *Node) error {
    r.mu.Lock()
    defer r.mu.Unlock()
    
    existing, exists := r.nodes[node.ID]
    action := "create"
    
    if exists {
        action = "update"
        // Version'ı güncelle
        node.Version = CreateVersion()
    }
    
    r.nodes[node.ID] = node
    r.health[node.ID] = time.Now()
    
    // Aboneleri bilgilendir
    r.notifySubscribers(ResourceUpdate{
        Action:   action,
        Type:     "node",
        Resource: node,
    })
    
    return nil
}

// RegisterDevice, RegisterSource, RegisterFlow, RegisterSender, RegisterReceiver
// Hepsi aynı pattern'i takip eder. İşte generic implementasyon:

func (r *Registry) registerResource(id string, resource interface{}, resourceType string, 
    resourceMap map[string]interface{}, validate func() error) error {
    r.mu.Lock()
    defer r.mu.Unlock()
    
    // Opsiyonel validasyon
    if validate != nil {
        if err := validate(); err != nil {
            return err
        }
    }
    
    action := "create"
    if _, exists := resourceMap[id]; exists {
        action = "update"
        // Version'ı güncelle (kaynak türüne bağlı implementasyon)
    }
    
    resourceMap[id] = resource
    
    r.notifySubscribers(ResourceUpdate{
        Action:   action,
        Type:     resourceType,
        Resource: resource,
    })
    
    return nil
}

// Kolaylık wrapper'ları:
func (r *Registry) RegisterDevice(device *Device) error {
    return r.registerResource(device.ID, device, "device", 
        cast(r.devices), func() error {
            if _, exists := r.nodes[device.NodeID]; !exists {
                return fmt.Errorf("node %s bulunamadı", device.NodeID)
            }
            return nil
        })
}

func (r *Registry) RegisterSender(sender *Sender) error {
    return r.registerResource(sender.ID, sender, "sender", cast(r.senders), nil)
}

func (r *Registry) RegisterReceiver(receiver *Receiver) error {
    return r.registerResource(receiver.ID, receiver, "receiver", cast(r.receivers), nil)
}

// RegisterSource, RegisterFlow için benzer şekilde...

// Query metotları - Generic getter
func getResources[T any](mu *sync.RWMutex, resourceMap map[string]*T) []*T {
    mu.RLock()
    defer mu.RUnlock()
    
    resources := make([]*T, 0, len(resourceMap))
    for _, resource := range resourceMap {
        resources = append(resources, resource)
    }
    return resources
}

// Kolaylık metotları
func (r *Registry) GetNodes() []*Node {
    return getResources(&r.mu, r.nodes)
}

func (r *Registry) GetSenders() []*Sender {
    return getResources(&r.mu, r.senders)
}

func (r *Registry) GetReceivers() []*Receiver {
    return getResources(&r.mu, r.receivers)
}

// GetDevices(), GetSources(), GetFlows() için benzer şekilde...

// Sağlık kontrolü - Eski node'ları temizle
func (r *Registry) healthCheckLoop() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        r.checkHealth()
    }
}

func (r *Registry) checkHealth() {
    r.mu.Lock()
    defer r.mu.Unlock()
    
    staleThreshold := 12 * time.Second // IS-04 spec: node'lar her 5s'de heartbeat göndermeli
    now := time.Now()
    
    for nodeID, lastSeen := range r.health {
        if now.Sub(lastSeen) > staleThreshold {
            // Eski node ve kaynaklarını kaldır
            delete(r.nodes, nodeID)
            delete(r.health, nodeID)
            
            // İlişkili device, sender, receiver'ları kaldır
            r.removeNodeResources(nodeID)
            
            fmt.Printf("Eski node kaldırıldı: %s\n", nodeID)
        }
    }
}

func (r *Registry) removeNodeResources(nodeID string) {
    // Device'ları kaldır
    for id, device := range r.devices {
        if device.NodeID == nodeID {
            delete(r.devices, id)
        }
    }
    
    // Kaldırılan device'lara ait sender/receiver'ları kaldır
    // (basitleştirilmiş - production'da device ilişkilerini takip edin)
}

// Heartbeat, node sağlığını günceller
func (r *Registry) Heartbeat(nodeID string) error {
    r.mu.Lock()
    defer r.mu.Unlock()
    
    if _, exists := r.nodes[nodeID]; !exists {
        return fmt.Errorf("node %s kayıtlı değil", nodeID)
    }
    
    r.health[nodeID] = time.Now()
    return nil
}

// Subscriber yönetimi
func (r *Registry) Subscribe(query SubscriptionQuery) *Subscriber {
    r.mu.Lock()
    defer r.mu.Unlock()
    
    sub := &Subscriber{
        ID:      generateUUID(),
        Query:   query,
        Updates: make(chan ResourceUpdate, 100),
    }
    
    r.subscribers[sub.ID] = sub
    return sub
}

func (r *Registry) Unsubscribe(subID string) {
    r.mu.Lock()
    defer r.mu.Unlock()
    
    if sub, exists := r.subscribers[subID]; exists {
        close(sub.Updates)
        delete(r.subscribers, subID)
    }
}

func (r *Registry) notifySubscribers(update ResourceUpdate) {
    for _, sub := range r.subscribers {
        // Subscription query'sine göre filtrele
        if r.matchesQuery(update, sub.Query) {
            select {
            case sub.Updates <- update:
            default:
                // Subscriber yavaş, atla
            }
        }
    }
}

func (r *Registry) matchesQuery(update ResourceUpdate, query SubscriptionQuery) bool {
    // Kaynak türünü eşleştir
    if query.ResourceType != "" && query.ResourceType != update.Type {
        return false
    }
    
    // Ek sorgu parametrelerini eşleştir
    // (basitleştirilmiş - production'da tam sorgu eşleştirmesi implement edin)
    
    return true
}

func generateUUID() string {
    // Production'da github.com/google/uuid kullanın
    return fmt.Sprintf("%d", time.Now().UnixNano())
}

4.4 Registration API (HTTP Handler’lar)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// registry/registration.go
package registry

import (
    "encoding/json"
    "fmt"
    "net/http"
    "strings"
    
    "github.com/gorilla/mux"
)

// RegistrationAPI, IS-04 Registration API'sini yönetir
type RegistrationAPI struct {
    registry *Registry
}

func NewRegistrationAPI(registry *Registry) *RegistrationAPI {
    return &RegistrationAPI{registry: registry}
}

// SetupRoutes, Registration API için HTTP route'larını yapılandırır
func (api *RegistrationAPI) SetupRoutes(router *mux.Router) {
    // IS-04 v1.3 Registration API route'ları
    v13 := router.PathPrefix("/x-nmos/registration/v1.3").Subrouter()
    
    // Kaynak kaydı
    v13.HandleFunc("/resource", api.RegisterResource).Methods("POST")
    v13.HandleFunc("/resource", api.DeleteAllResources).Methods("DELETE")
    
    // Sağlık kontrolü
    v13.HandleFunc("/health/nodes/{nodeId}", api.Heartbeat).Methods("POST")
    
    // Kaynak silme
    v13.HandleFunc("/resource/{resourceType}/{resourceId}", api.DeleteResource).Methods("DELETE")
}

// RegisterResource, kaynak kaydını işler (POST /resource)
func (api *RegistrationAPI) RegisterResource(w http.ResponseWriter, r *http.Request) {
    var rawResource map[string]interface{}
    
    if err := json.NewDecoder(r.Body).Decode(&rawResource); err != nil {
        http.Error(w, "Geçersiz JSON", http.StatusBadRequest)
        return
    }
    
    // Veri'den kaynak türünü belirle
    resourceType := inferResourceType(rawResource)
    
    var err error
    switch resourceType {
    case "node":
        var node Node
        if err = remarshal(rawResource, &node); err == nil {
            err = api.registry.RegisterNode(&node)
        }
    case "device":
        var device Device
        if err = remarshal(rawResource, &device); err == nil {
            err = api.registry.RegisterDevice(&device)
        }
    case "source":
        var source Source
        if err = remarshal(rawResource, &source); err == nil {
            err = api.registry.RegisterSource(&source)
        }
    case "flow":
        var flow Flow
        if err = remarshal(rawResource, &flow); err == nil {
            err = api.registry.RegisterFlow(&flow)
        }
    case "sender":
        var sender Sender
        if err = remarshal(rawResource, &sender); err == nil {
            err = api.registry.RegisterSender(&sender)
        }
    case "receiver":
        var receiver Receiver
        if err = remarshal(rawResource, &receiver); err == nil {
            err = api.registry.RegisterReceiver(&receiver)
        }
    default:
        http.Error(w, "Bilinmeyen kaynak türü", http.StatusBadRequest)
        return
    }
    
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    
    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(map[string]string{
        "status": "registered",
        "type":   resourceType,
    })
}

// Heartbeat, node sağlık kontrollerini işler
func (api *RegistrationAPI) Heartbeat(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    nodeID := vars["nodeId"]
    
    if err := api.registry.Heartbeat(nodeID); err != nil {
        http.Error(w, err.Error(), http.StatusNotFound)
        return
    }
    
    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{
        "status": "healthy",
    })
}

// DeleteResource, kaynak silme işlemini yönetir
func (api *RegistrationAPI) DeleteResource(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    resourceType := vars["resourceType"]
    resourceID := vars["resourceId"]
    
    if err := api.registry.DeleteResource(resourceType, resourceID); err != nil {
        http.Error(w, err.Error(), http.StatusNotFound)
        return
    }
    
    w.WriteHeader(http.StatusNoContent)
}

// DeleteAllResources, bir node için tüm kaynakları kaldırır
func (api *RegistrationAPI) DeleteAllResources(w http.ResponseWriter, r *http.Request) {
    // Bu, bir node graceful shutdown yaparken çağrılır
    w.WriteHeader(http.StatusNoContent)
}

// Yardımcı fonksiyonlar
func inferResourceType(data map[string]interface{}) string {
    // NMOS kaynakları farklı zorunlu alanlara sahiptir
    if _, hasHostname := data["hostname"]; hasHostname {
        return "node"
    }
    if _, hasManifest := data["manifest_href"]; hasManifest {
        return "sender"
    }
    if caps, hasCaps := data["caps"]; hasCaps {
        if capsMap, ok := caps.(map[string]interface{}); ok {
            if _, hasMediaTypes := capsMap["media_types"]; hasMediaTypes {
                return "receiver"
            }
        }
        return "source"
    }
    if _, hasTransport := data["transport"]; hasTransport {
        if _, hasFlowID := data["flow_id"]; hasFlowID {
            return "sender"
        }
        return "receiver"
    }
    if _, hasSourceID := data["source_id"]; hasSourceID {
        return "flow"
    }
    if _, hasNodeID := data["node_id"]; hasNodeID {
        return "device"
    }
    
    return "unknown"
}

func remarshal(from interface{}, to interface{}) error {
    data, err := json.Marshal(from)
    if err != nil {
        return err
    }
    return json.Unmarshal(data, to)
}

4.4.1 cURL ile Registration API’yi Test Etme

Registration API’yi test etmek için pratik örnekler:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 1. Node Kaydet
curl -X POST http://localhost:3210/x-nmos/registration/v1.3/resource \
  -H "Content-Type: application/json" \
  -d '{
    "id": "58f6b536-ca4c-43fd-880e-9df2af5d5d94",
    "version": "1735545600:0",
    "label": "Camera Encoder 01",
    "description": "Production camera encoder",
    "hostname": "encoder-01.local",
    "api": ["v1.3"],
    "caps": {},
    "services": [],
    "clocks": [{"name": "clk0", "ref_type": "ptp"}],
    "interfaces": [{"name": "eth0"}],
    "tags": {}
  }'

# 2. Sender Kaydet
curl -X POST http://localhost:3210/x-nmos/registration/v1.3/resource \
  -H "Content-Type: application/json" \
  -d '{
    "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    "version": "1735545600:0",
    "label": "Camera 1 Video",
    "description": "1080p59.94 video sender",
    "flow_id": "f1f2f3f4-f5f6-f7f8-f9f0-f1f2f3f4f5f6",
    "transport": "urn:x-nmos:transport:rtp",
    "device_id": "d1d2d3d4-d5d6-d7d8-d9d0-d1d2d3d4d5d6",
    "manifest_href": "http://encoder-01.local:8080/sdp/camera1.sdp",
    "interface_bindings": ["eth0"],
    "tags": {}
  }'

# 3. Heartbeat Gönder
NODE_ID="58f6b536-ca4c-43fd-880e-9df2af5d5d94"
curl -X POST http://localhost:3210/x-nmos/registration/v1.3/health/nodes/$NODE_ID

# 4. Kaynak Sil
curl -X DELETE http://localhost:3210/x-nmos/registration/v1.3/resource/senders/a1b2c3d4-e5f6-7890-abcd-ef1234567890

# Response Örnekleri:
# Başarılı (201 Created):
# {"status":"registered","type":"node"}

# Heartbeat Başarılı (200 OK):
# {"status":"healthy"}

4.5 Query API (HTTP Handler’lar)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// registry/query.go
package registry

import (
    "encoding/json"
    "net/http"
    "strings"
    
    "github.com/gorilla/mux"
)

// QueryAPI, IS-04 Query API'sini yönetir
type QueryAPI struct {
    registry *Registry
}

func NewQueryAPI(registry *Registry) *QueryAPI {
    return &QueryAPI{registry: registry}
}

// SetupRoutes, Query API için HTTP route'larını yapılandırır
func (api *QueryAPI) SetupRoutes(router *mux.Router) {
    // IS-04 v1.3 Query API route'ları
    v13 := router.PathPrefix("/x-nmos/query/v1.3").Subrouter()
    
    // Kaynak sorguları
    v13.HandleFunc("/nodes", api.GetNodes).Methods("GET")
    v13.HandleFunc("/devices", api.GetDevices).Methods("GET")
    v13.HandleFunc("/sources", api.GetSources).Methods("GET")
    v13.HandleFunc("/flows", api.GetFlows).Methods("GET")
    v13.HandleFunc("/senders", api.GetSenders).Methods("GET")
    v13.HandleFunc("/receivers", api.GetReceivers).Methods("GET")
    
    // Tek kaynak sorgusu
    v13.HandleFunc("/nodes/{nodeId}", api.GetNode).Methods("GET")
    v13.HandleFunc("/senders/{senderId}", api.GetSender).Methods("GET")
    v13.HandleFunc("/receivers/{receiverId}", api.GetReceiver).Methods("GET")
    
    // WebSocket subscription
    v13.HandleFunc("/subscriptions", api.CreateSubscription).Methods("POST")
    v13.HandleFunc("/subscriptions/{subscriptionId}", api.DeleteSubscription).Methods("DELETE")
}

// GetNodes, tüm kayıtlı node'ları döndürür
func (api *QueryAPI) GetNodes(w http.ResponseWriter, r *http.Request) {
    nodes := api.registry.GetNodes()
    
    // Query parametrelerini uygula (label, description filtreleri)
    nodes = api.filterNodes(nodes, r.URL.Query())
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(nodes)
}

// GetSenders, tüm kayıtlı sender'ları döndürür
func (api *QueryAPI) GetSenders(w http.ResponseWriter, r *http.Request) {
    senders := api.registry.GetSenders()
    
    // Filtreleri uygula
    if transport := r.URL.Query().Get("transport"); transport != "" {
        filtered := []*Sender{}
        for _, s := range senders {
            if strings.Contains(s.Transport, transport) {
                filtered = append(filtered, s)
            }
        }
        senders = filtered
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(senders)
}

// GetReceivers, tüm kayıtlı receiver'ları döndürür
func (api *QueryAPI) GetReceivers(w http.ResponseWriter, r *http.Request) {
    receivers := api.registry.GetReceivers()
    
    // Filtreleri uygula
    if format := r.URL.Query().Get("format"); format != "" {
        filtered := []*Receiver{}
        for _, rcv := range receivers {
            if strings.Contains(rcv.Format, format) {
                filtered = append(filtered, rcv)
            }
        }
        receivers = filtered
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(receivers)
}

// GetNode, ID'ye göre belirli bir node döndürür
func (api *QueryAPI) GetNode(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    nodeID := vars["nodeId"]
    
    node := api.registry.GetNode(nodeID)
    if node == nil {
        http.Error(w, "Node bulunamadı", http.StatusNotFound)
        return
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(node)
}

// CreateSubscription, WebSocket subscription oluşturur
func (api *QueryAPI) CreateSubscription(w http.ResponseWriter, r *http.Request) {
    var query SubscriptionQuery
    
    if err := json.NewDecoder(r.Body).Decode(&query); err != nil {
        http.Error(w, "Geçersiz subscription sorgusu", http.StatusBadRequest)
        return
    }
    
    sub := api.registry.Subscribe(query)
    
    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(map[string]string{
        "id":           sub.ID,
        "ws_href":      fmt.Sprintf("ws://localhost:3211/x-nmos/query/v1.3/subscriptions/%s/ws", sub.ID),
        "resource_path": fmt.Sprintf("/%s", query.ResourceType),
    })
}

// DeleteSubscription, bir subscription'ı kaldırır
func (api *QueryAPI) DeleteSubscription(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    subID := vars["subscriptionId"]
    
    api.registry.Unsubscribe(subID)
    w.WriteHeader(http.StatusNoContent)
}

// Filtre yardımcıları
func (api *QueryAPI) filterNodes(nodes []*Node, params url.Values) []*Node {
    if label := params.Get("label"); label != "" {
        filtered := []*Node{}
        for _, n := range nodes {
            if strings.Contains(strings.ToLower(n.Label), strings.ToLower(label)) {
                filtered = append(filtered, n)
            }
        }
        return filtered
    }
    return nodes
}

4.6 Gerçek Zamanlı Güncellemeler için WebSocket Implementasyonu

NMOS’un güçlü özelliklerinden biri WebSocket üzerinden gerçek zamanlı bildirimlerdir. Haydi implement edelim:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// registry/websocket.go
package registry

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"
    
    "github.com/gorilla/mux"
    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool {
        return true // Production'da origin'i doğrulayın
    },
}

// WebSocketHandler, subscription'lar için WebSocket bağlantılarını yönetir
func (api *QueryAPI) WebSocketHandler(w http.ResponseWriter, r *http.Request) {
    vars := mux.Vars(r)
    subID := vars["subscriptionId"]
    
    // Subscription'ı al
    sub := api.registry.GetSubscription(subID)
    if sub == nil {
        http.Error(w, "Subscription bulunamadı", http.StatusNotFound)
        return
    }
    
    // WebSocket'e yükselt
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Printf("WebSocket upgrade başarısız: %v", err)
        return
    }
    defer conn.Close()
    
    log.Printf("WebSocket client bağlandı: subscription %s", subID)
    
    // İlk grain'i gönder (mevcut durum)
    if err := api.sendInitialGrain(conn, sub); err != nil {
        log.Printf("İlk grain gönderilemedi: %v", err)
        return
    }
    
    // Gerçek zamanlı güncellemeleri yönet
    done := make(chan struct{})
    
    // Read pump (ping/pong ve client disconnect'i yönet)
    go func() {
        defer close(done)
        conn.SetReadDeadline(time.Now().Add(60 * time.Second))
        conn.SetPongHandler(func(string) error {
            conn.SetReadDeadline(time.Now().Add(60 * time.Second))
            return nil
        })
        
        for {
            if _, _, err := conn.ReadMessage(); err != nil {
                log.Printf("WebSocket okuma hatası: %v", err)
                return
            }
        }
    }()
    
    // Write pump (güncellemeleri client'a gönder)
    ticker := time.NewTicker(30 * time.Second) // Ping aralığı
    defer ticker.Stop()
    
    for {
        select {
        case update := <-sub.Updates:
            // Kaynak güncellemeyi client'a gönder
            if err := api.sendUpdate(conn, update); err != nil {
                log.Printf("Güncelleme gönderilemedi: %v", err)
                return
            }
            
        case <-ticker.C:
            // Ping gönder
            if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
            
        case <-done:
            return
        }
    }
}

// sendInitialGrain, kaynakların mevcut durumunu gönderir
func (api *QueryAPI) sendInitialGrain(conn *websocket.Conn, sub *Subscriber) error {
    var resources []interface{}
    
    // Subscription sorguya göre kaynakları al
    switch sub.Query.ResourceType {
    case "nodes":
        for _, node := range api.registry.GetNodes() {
            resources = append(resources, node)
        }
    case "senders":
        for _, sender := range api.registry.GetSenders() {
            resources = append(resources, sender)
        }
    case "receivers":
        for _, receiver := range api.registry.GetReceivers() {
            resources = append(resources, receiver)
        }
    default:
        // Tüm kaynakları al
        for _, node := range api.registry.GetNodes() {
            resources = append(resources, node)
        }
    }
    
    // Grain mesajı oluştur
    grain := map[string]interface{}{
        "grain_type": "event",
        "source_id":  "nmos-registry",
        "flow_id":    "query-api-events",
        "origin_timestamp": time.Now().Format(time.RFC3339Nano),
        "sync_timestamp":   time.Now().Format(time.RFC3339Nano),
        "creation_timestamp": time.Now().Format(time.RFC3339Nano),
        "rate": map[string]int{
            "numerator":   0,
            "denominator": 1,
        },
        "duration": map[string]int{
            "numerator":   0,
            "denominator": 1,
        },
        "grain": map[string]interface{}{
            "type": "urn:x-nmos:format:data.event",
            "topic": fmt.Sprintf("/%s/", sub.Query.ResourceType),
            "data": resources,
        },
    }
    
    return conn.WriteJSON(grain)
}

// sendUpdate, kaynak değişikliği bildirimi gönderir
func (api *QueryAPI) sendUpdate(conn *websocket.Conn, update ResourceUpdate) error {
    // Event grain oluştur
    var eventType string
    switch update.Action {
    case "create":
        eventType = "resource_added"
    case "update":
        eventType = "resource_modified"
    case "delete":
        eventType = "resource_removed"
    }
    
    grain := map[string]interface{}{
        "grain_type": "event",
        "source_id":  "nmos-registry",
        "flow_id":    "query-api-events",
        "origin_timestamp": time.Now().Format(time.RFC3339Nano),
        "sync_timestamp":   time.Now().Format(time.RFC3339Nano),
        "creation_timestamp": time.Now().Format(time.RFC3339Nano),
        "rate": map[string]int{
            "numerator":   0,
            "denominator": 1,
        },
        "duration": map[string]int{
            "numerator":   0,
            "denominator": 1,
        },
        "grain": map[string]interface{}{
            "type": "urn:x-nmos:format:data.event",
            "topic": fmt.Sprintf("/%s/", update.Type),
            "data": []map[string]interface{}{
                {
                    "path": fmt.Sprintf("/%s", update.Type),
                    "pre":  nil, // Önceki durum (update/delete ise)
                    "post": update.Resource, // Yeni durum (create/update ise)
                },
            },
        },
    }
    
    return conn.WriteJSON(grain)
}

// QueryAPI.SetupRoutes'a WebSocket endpoint'i ekleyin
// Bu route'u ekleyin:
// v13.HandleFunc("/subscriptions/{subscriptionId}/ws", api.WebSocketHandler).Methods("GET")

wscat ile WebSocket’i Test Etme:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# wscat yükle
npm install -g wscat

# Önce subscription oluştur
SUB_ID=$(curl -X POST http://localhost:3211/x-nmos/query/v1.3/subscriptions \
  -H "Content-Type: application/json" \
  -d '{"resource_type":"senders"}' | jq -r '.id')

# WebSocket'e bağlan
wscat -c "ws://localhost:3211/x-nmos/query/v1.3/subscriptions/$SUB_ID/ws"

# Sender eklendiğinde/kaldırıldığında/değiştirildiğinde gerçek zamanlı güncellemeler alacaksınız

4.6.1 cURL ile Query API’yi Test Etme

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 1. Tüm node'ları al
curl http://localhost:3211/x-nmos/query/v1.3/nodes | jq

# 2. Tüm sender'ları al
curl http://localhost:3211/x-nmos/query/v1.3/senders | jq

# 3. Belirli sender'ı al
SENDER_ID="a1b2c3d4-e5f6-7890-abcd-ef1234567890"
curl http://localhost:3211/x-nmos/query/v1.3/senders/$SENDER_ID | jq

# 4. Filtrelerle sorgula
# Sadece video sender'ları al
curl "http://localhost:3211/x-nmos/query/v1.3/senders?format=urn:x-nmos:format:video" | jq

# Label'a göre node'ları al
curl "http://localhost:3211/x-nmos/query/v1.3/nodes?label=camera" | jq

# 5. WebSocket subscription oluştur
curl -X POST http://localhost:3211/x-nmos/query/v1.3/subscriptions \
  -H "Content-Type: application/json" \
  -d '{
    "resource_type": "senders",
    "params": {}
  }' | jq

# Response:
# {
#   "id": "sub-12345",
#   "ws_href": "ws://localhost:3211/x-nmos/query/v1.3/subscriptions/sub-12345/ws",
#   "resource_path": "/senders"
# }

# 6. Pagination (büyük registry'ler için)
curl "http://localhost:3211/x-nmos/query/v1.3/senders?paging.limit=10&paging.offset=0" | jq

4.7 Ana Sunucu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// main.go
package main

import (
    "fmt"
    "log"
    "net/http"
    
    "github.com/gorilla/mux"
    "yourproject/registry"
)

func main() {
    // Registry oluştur
    reg := registry.NewRegistry()
    
    // API'leri kur
    registrationAPI := registry.NewRegistrationAPI(reg)
    queryAPI := registry.NewQueryAPI(reg)
    
    // Router oluştur
    router := mux.NewRouter()
    
    // Route'ları kur
    registrationAPI.SetupRoutes(router)
    queryAPI.SetupRoutes(router)
    
    // Health check endpoint
    router.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        fmt.Fprintf(w, "OK")
    }).Methods("GET")
    
    // Registration API'yi port 3210'da başlat
    go func() {
        regRouter := mux.NewRouter()
        registrationAPI.SetupRoutes(regRouter)
        log.Println("Registration API :3210'da dinliyor")
        log.Fatal(http.ListenAndServe(":3210", regRouter))
    }()
    
    // Query API'yi port 3211'de başlat
    log.Println("Query API :3211'de dinliyor")
    log.Fatal(http.ListenAndServe(":3211", router))
}

5. Go ile NMOS Node Client Oluşturma

Şimdi registry’mize video sender kaydeden bir client oluşturalım.

5.1 NMOS Node Client

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
// client/node.go
package client

import (
    "bytes"
    "encoding/json"
    "fmt"
    "net/http"
    "time"
    
    "yourproject/registry"
)

// NMOSNode, bir NMOS uyumlu cihazı temsil eder
type NMOSNode struct {
    RegistryURL string
    Node        *registry.Node
    Devices     []*registry.Device
    Senders     []*registry.Sender
    Receivers   []*registry.Receiver
    
    httpClient  *http.Client
    stopChan    chan struct{}
}

// NewNMOSNode, yeni bir NMOS node client oluşturur
func NewNMOSNode(registryURL string, hostname string) *NMOSNode {
    nodeID := generateUUID()
    
    node := &registry.Node{
        Resource: registry.Resource{
            ID:          nodeID,
            Version:     registry.CreateVersion(),
            Label:       fmt.Sprintf("Video Encoder %s", hostname),
            Description: "NMOS-etkin video encoder",
            Tags:        map[string]string{},
        },
        Hostname: hostname,
        APIVersions: []string{"v1.3"},
        Caps:        map[string]interface{}{},
        Services: []registry.Service{
            {
                Href: fmt.Sprintf("http://%s:8080/", hostname),
                Type: "urn:x-manufacturer:service:encoder",
            },
        },
        Clocks: []registry.Clock{
            {
                Name:    "clk0",
                RefType: "ptp",
            },
        },
        Interfaces: []registry.Interface{
            {
                Name: "eth0",
            },
        },
    }
    
    return &NMOSNode{
        RegistryURL: registryURL,
        Node:        node,
        Devices:     []*registry.Device{},
        Senders:     []*registry.Sender{},
        Receivers:   []*registry.Receiver{},
        httpClient:  &http.Client{Timeout: 5 * time.Second},
        stopChan:    make(chan struct{}),
    }
}

// Start, node'u kaydeder ve heartbeat'i başlatır
func (n *NMOSNode) Start() error {
    // Node'u kaydet
    if err := n.registerResource(n.Node, "node"); err != nil {
        return fmt.Errorf("node kaydedilemedi: %w", err)
    }
    
    // Device'ları kaydet
    for _, device := range n.Devices {
        if err := n.registerResource(device, "device"); err != nil {
            return fmt.Errorf("device kaydedilemedi: %w", err)
        }
    }
    
    // Sender'ları kaydet
    for _, sender := range n.Senders {
        if err := n.registerResource(sender, "sender"); err != nil {
            return fmt.Errorf("sender kaydedilemedi: %w", err)
        }
    }
    
    // Receiver'ları kaydet
    for _, receiver := range n.Receivers {
        if err := n.registerResource(receiver, "receiver"); err != nil {
            return fmt.Errorf("receiver kaydedilemedi: %w", err)
        }
    }
    
    // Heartbeat goroutine'ini başlat (IS-04 spec'e göre her 5 saniyede)
    go n.heartbeatLoop()
    
    fmt.Println("NMOS Node başarıyla kaydedildi")
    return nil
}

// Stop, node'u graceful olarak kapatır
func (n *NMOSNode) Stop() error {
    close(n.stopChan)
    
    // Tüm kaynakların kaydını kaldır
    // Production'da registry'ye DELETE istekleri gönder
    
    return nil
}

// AddVideoSender, bu node'a video sender ekler
func (n *NMOSNode) AddVideoSender(label string, sdpURL string) error {
    deviceID := generateUUID()
    sourceID := generateUUID()
    flowID := generateUUID()
    senderID := generateUUID()
    
    // Device oluştur
    device := &registry.Device{
        Resource: registry.Resource{
            ID:          deviceID,
            Version:     registry.CreateVersion(),
            Label:       label,
            Description: "Video encoder device",
        },
        Type:     "urn:x-nmos:device:pipeline",
        NodeID:   n.Node.ID,
        Senders:  []string{senderID},
        Receivers: []string{},
    }
    
    // Source oluştur
    source := &registry.Source{
        Resource: registry.Resource{
            ID:      sourceID,
            Version: registry.CreateVersion(),
            Label:   fmt.Sprintf("%s Source", label),
        },
        Format:    "urn:x-nmos:format:video",
        DeviceID:  deviceID,
        ClockName: "clk0",
        GrainRate: &registry.Rational{Numerator: 25, Denominator: 1}, // 25fps
    }
    
    // Flow oluştur
    flow := &registry.Flow{
        Resource: registry.Resource{
            ID:      flowID,
            Version: registry.CreateVersion(),
            Label:   fmt.Sprintf("%s Flow", label),
        },
        Format:      "urn:x-nmos:format:video",
        SourceID:    sourceID,
        DeviceID:    deviceID,
        GrainRate:   &registry.Rational{Numerator: 25, Denominator: 1},
        FrameWidth:  1920,
        FrameHeight: 1080,
        Colorspace:  "BT709",
        Interlace:   "progressive",
        MediaType:   "video/raw",
    }
    
    // Sender oluştur
    sender := &registry.Sender{
        Resource: registry.Resource{
            ID:      senderID,
            Version: registry.CreateVersion(),
            Label:   fmt.Sprintf("%s Sender", label),
        },
        FlowID:            flowID,
        Transport:         "urn:x-nmos:transport:rtp",
        DeviceID:          deviceID,
        ManifestHref:      sdpURL,
        InterfaceBindings: []string{"eth0"},
    }
    
    // Node'a ekle
    n.Devices = append(n.Devices, device)
    n.Senders = append(n.Senders, sender)
    
    // Node zaten başlatıldıysa, hemen kaydet
    if n.isRunning() {
        n.registerResource(device, "device")
        n.registerResource(source, "source")
        n.registerResource(flow, "flow")
        n.registerResource(sender, "sender")
    }
    
    return nil
}

// registerResource, registry'ye POST isteği gönderir
func (n *NMOSNode) registerResource(resource interface{}, resourceType string) error {
    url := fmt.Sprintf("%s/x-nmos/registration/v1.3/resource", n.RegistryURL)
    
    data, err := json.Marshal(resource)
    if err != nil {
        return err
    }
    
    resp, err := n.httpClient.Post(url, "application/json", bytes.NewBuffer(data))
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusOK {
        return fmt.Errorf("kayıt başarısız, status %d", resp.StatusCode)
    }
    
    fmt.Printf("%s kaydedildi: %v\n", resourceType, resource)
    return nil
}

// heartbeatLoop, periyodik heartbeat'ler gönderir
func (n *NMOSNode) heartbeatLoop() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            if err := n.sendHeartbeat(); err != nil {
                fmt.Printf("Heartbeat başarısız: %v\n", err)
            }
        case <-n.stopChan:
            return
        }
    }
}

// sendHeartbeat, /health/nodes/{nodeId}'ye POST gönderir
func (n *NMOSNode) sendHeartbeat() error {
    url := fmt.Sprintf("%s/x-nmos/registration/v1.3/health/nodes/%s", 
        n.RegistryURL, n.Node.ID)
    
    resp, err := n.httpClient.Post(url, "application/json", nil)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf("heartbeat başarısız, status %d", resp.StatusCode)
    }
    
    return nil
}

func (n *NMOSNode) isRunning() bool {
    select {
    case <-n.stopChan:
        return false
    default:
        return true
    }
}

func generateUUID() string {
    // Production'da github.com/google/uuid kullanın
    return fmt.Sprintf("%d", time.Now().UnixNano())
}

5.2 Örnek: Video Encoder Node

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// examples/encoder_node.go
package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    
    "yourproject/client"
)

func main() {
    // NMOS node oluştur
    node := client.NewNMOSNode(
        "http://localhost:3210", // Registry URL
        "encoder-01.local",      // Hostname
    )
    
    // Video sender ekle (örn. kamera girişi)
    err := node.AddVideoSender(
        "Camera 1 HD",
        "http://encoder-01.local:8080/sdp/camera1.sdp",
    )
    if err != nil {
        log.Fatalf("Sender eklenemedi: %v", err)
    }
    
    // Node'u başlat (registry'ye kaydet)
    if err := node.Start(); err != nil {
        log.Fatalf("Node başlatılamadı: %v", err)
    }
    
    fmt.Println("NMOS Node çalışıyor. Durdurmak için Ctrl+C basın.")
    
    // Interrupt bekle
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan
    
    // Graceful shutdown
    fmt.Println("\nKapatılıyor...")
    node.Stop()
}

⚠️ GÜVENLİK NOTU:
Bu örneklerdeki API çağrıları authentication ve TLS kullanmamaktadır.
Production’da:

  • HTTPS kullanın (HTTP yerine)
  • Authorization header ekleyin (Authorization: Bearer <token>)
  • Registry’den alınan API endpoint’leri doğrulayın
  • Rate limiting ve API throttling uygulayın

Güvenli implementasyon için Section 13 (IS-10) bölümüne bakın.


6. IS-05: Bağlantı Yönetimi

IS-05, sender’lar ve receiver’lar arasında bağlantı kurulmasını tanımlar.

6.1 IS-05 Genel Bakış

IS-05, şunlar için bir REST API sağlar:

  • Aktif bağlantılar: Şu anda neyin bağlı olduğu
  • Staged bağlantılar: Sırada neyin bağlanacağı
  • Bağlantı aktivasyonu: Staged bağlantıları uygulama

6.2 IS-05 Bağlantı Akışı

6.3 IS-05 için SDP Parsing

IS-05’i implement etmeden önce, bağlantı parametrelerini çıkarmak için SDP dosyalarını parse etmemiz gerekiyor:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// connection/sdp.go
package connection

import (
    "bufio"
    "fmt"
    "strconv"
    "strings"
)

// SDPInfo, parse edilmiş SDP bilgisini tutar
type SDPInfo struct {
    // Video/Audio parametreleri
    MediaType   string // video, audio
    Encoding    string // raw, H264, L24, vb.
    
    // RTP parametreleri
    MulticastIP string
    Port        int
    TTL         int
    PayloadType int
    
    // ST 2110'a özel
    PacketTime  float64 // ptime milisaniye cinsinden
    SamplingRate int    // Ses için
    Channels    int     // Ses için
    Width       int     // Video için
    Height      int     // Video için
    Framerate   string  // örn., "25/1"
}

// ParseSDP bir SDP dosyasını parse eder
func ParseSDP(sdpContent string) (*SDPInfo, error) {
    info := &SDPInfo{}
    scanner := bufio.NewScanner(strings.NewReader(sdpContent))
    
    for scanner.Scan() {
        line := scanner.Text()
        
        switch {
        case strings.HasPrefix(line, "m="):
            // Media satırı: m=video 5004 RTP/AVP 96
            parts := strings.Fields(line)
            if len(parts) >= 4 {
                info.MediaType = parts[0][2:] // "m=" kısmını kaldır
                if port, err := strconv.Atoi(parts[1]); err == nil {
                    info.Port = port
                }
                if pt, err := strconv.Atoi(parts[3]); err == nil {
                    info.PayloadType = pt
                }
            }
            
        case strings.HasPrefix(line, "c="):
            // Connection satırı: c=IN IP4 239.0.0.1/32
            parts := strings.Fields(line)
            if len(parts) >= 3 {
                addrParts := strings.Split(parts[2], "/")
                info.MulticastIP = addrParts[0]
                if len(addrParts) > 1 {
                    if ttl, err := strconv.Atoi(addrParts[1]); err == nil {
                        info.TTL = ttl
                    }
                }
            }
            
        case strings.HasPrefix(line, "a=rtpmap:"):
            // RTP map: a=rtpmap:96 raw/90000
            parts := strings.Fields(line)
            if len(parts) >= 2 {
                encodingInfo := strings.Split(parts[1], "/")
                if len(encodingInfo) >= 2 {
                    info.Encoding = encodingInfo[0]
                    if rate, err := strconv.Atoi(encodingInfo[1]); err == nil {
                        info.SamplingRate = rate
                    }
                    if len(encodingInfo) >= 3 {
                        if ch, err := strconv.Atoi(encodingInfo[2]); err == nil {
                            info.Channels = ch
                        }
                    }
                }
            }
            
        case strings.HasPrefix(line, "a=fmtp:"):
            // Format parametreleri: a=fmtp:96 sampling=YCbCr-4:2:2; width=1920; height=1080
            if strings.Contains(line, "width=") {
                info.Width = extractIntParam(line, "width=")
            }
            if strings.Contains(line, "height=") {
                info.Height = extractIntParam(line, "height=")
            }
            if strings.Contains(line, "exactframerate=") {
                info.Framerate = extractStringParam(line, "exactframerate=")
            }
            
        case strings.HasPrefix(line, "a=ptime:"):
            // Packet time: a=ptime:1.000
            parts := strings.Fields(line)
            if len(parts) >= 1 {
                ptimeStr := strings.TrimPrefix(parts[0], "a=ptime:")
                if pt, err := strconv.ParseFloat(ptimeStr, 64); err == nil {
                    info.PacketTime = pt
                }
            }
        }
    }
    
    if err := scanner.Err(); err != nil {
        return nil, err
    }
    
    return info, nil
}

func extractIntParam(line, key string) int {
    start := strings.Index(line, key)
    if start == -1 {
        return 0
    }
    start += len(key)
    end := strings.IndexAny(line[start:], "; \t")
    if end == -1 {
        end = len(line) - start
    }
    valueStr := line[start : start+end]
    value, _ := strconv.Atoi(valueStr)
    return value
}

func extractStringParam(line, key string) string {
    start := strings.Index(line, key)
    if start == -1 {
        return ""
    }
    start += len(key)
    end := strings.IndexAny(line[start:], "; \t")
    if end == -1 {
        return line[start:]
    }
    return line[start : start+end]
}

// FetchAndParseSDP URL'den SDP'yi alır ve parse eder
func FetchAndParseSDP(sdpURL string) (*SDPInfo, error) {
    resp, err := http.Get(sdpURL)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    
    body, err := io.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }
    
    return ParseSDP(string(body))
}

Örnek SDP Dosyası (ST 2110-20 Video):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
v=0
o=- 1234567890 1234567890 IN IP4 192.168.1.100
s=ST 2110-20 Video Stream
t=0 0
m=video 5004 RTP/AVP 96
c=IN IP4 239.100.0.1/32
a=source-filter: incl IN IP4 239.100.0.1 192.168.1.100
a=rtpmap:96 raw/90000
a=fmtp:96 sampling=YCbCr-4:2:2; width=1920; height=1080; exactframerate=25; depth=10; TCS=SDR; colorimetry=BT709; PM=2110GPM; SSN=ST2110-20:2017; TP=2110TPN
a=mediaclk:direct=0
a=ts-refclk:ptp=IEEE1588-2008:00-00-00-00-00-00-00-00:0

6.4 IS-05 Implementasyonu (Receiver Tarafı)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// connection/is05_receiver.go
package connection

import (
    "encoding/json"
    "fmt"
    "net/http"
    "sync"
    "time"
    
    "github.com/gorilla/mux"
)

// ConnectionAPI, IS-05 Bağlantı Yönetimini implement eder
type ConnectionAPI struct {
    mu              sync.RWMutex
    receiverID      string
    activeParams    TransportParams
    stagedParams    TransportParams
    activationState ActivationState
}

// TransportParams, SDP ve transport bilgisini tutar
type TransportParams struct {
    SenderID      string                 `json:"sender_id"`
    ManifestHref  string                 `json:"manifest_href"` // SDP URL
    MasterEnable  bool                   `json:"master_enable"`
    TransportFile map[string]interface{} `json:"transport_file"` // Parse edilmiş SDP
}

// ActivationState, aktivasyon zamanlamasını tutar
type ActivationState struct {
    Mode          string `json:"mode"` // activate_immediate, activate_scheduled_absolute
    RequestedTime string `json:"requested_time,omitempty"`
    ActivationTime string `json:"activation_time,omitempty"`
}

func NewConnectionAPI(receiverID string) *ConnectionAPI {
    return &ConnectionAPI{
        receiverID: receiverID,
        activeParams: TransportParams{
            MasterEnable: false,
        },
        stagedParams: TransportParams{
            MasterEnable: false,
        },
        activationState: ActivationState{
            Mode: "activate_immediate",
        },
    }
}

// SetupRoutes, receiver için IS-05 route'larını yapılandırır
func (api *ConnectionAPI) SetupRoutes(router *mux.Router, receiverID string) {
    base := fmt.Sprintf("/x-nmos/connection/v1.1/single/receivers/%s", receiverID)
    
    // Aktif bağlantı
    router.HandleFunc(base+"/active", api.GetActive).Methods("GET")
    
    // Staged bağlantı
    router.HandleFunc(base+"/staged", api.GetStaged).Methods("GET")
    router.HandleFunc(base+"/staged", api.PatchStaged).Methods("PATCH")
    
    // Constraints (yetenekler)
    router.HandleFunc(base+"/constraints", api.GetConstraints).Methods("GET")
}

// GetActive, mevcut aktif bağlantıyı döndürür
func (api *ConnectionAPI) GetActive(w http.ResponseWriter, r *http.Request) {
    api.mu.RLock()
    defer api.mu.RUnlock()
    
    response := map[string]interface{}{
        "sender_id":      api.activeParams.SenderID,
        "master_enable":  api.activeParams.MasterEnable,
        "activation":     api.activationState,
        "transport_file": api.activeParams.TransportFile,
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

// GetStaged, staged bağlantı parametrelerini döndürür
func (api *ConnectionAPI) GetStaged(w http.ResponseWriter, r *http.Request) {
    api.mu.RLock()
    defer api.mu.RUnlock()
    
    response := map[string]interface{}{
        "sender_id":      api.stagedParams.SenderID,
        "master_enable":  api.stagedParams.MasterEnable,
        "activation":     api.activationState,
        "transport_file": api.stagedParams.TransportFile,
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

// PatchStaged, staged parametreleri günceller ve opsiyonel olarak aktive eder
func (api *ConnectionAPI) PatchStaged(w http.ResponseWriter, r *http.Request) {
    var patch map[string]interface{}
    
    if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
        http.Error(w, "Geçersiz JSON", http.StatusBadRequest)
        return
    }
    
    api.mu.Lock()
    defer api.mu.Unlock()
    
    // Staged parametreleri güncelle
    if senderID, ok := patch["sender_id"].(string); ok {
        api.stagedParams.SenderID = senderID
        
        // Sender'dan SDP al
        if senderID != "" {
            // Sender'ın SDP dosyasını al
            sender := getSenderFromRegistry(senderID) // NMOS registry'yi sorgula
            if sender != nil {
                api.stagedParams.ManifestHref = sender.ManifestHref
                
                // Bağlantı parametrelerini almak için SDP'yi parse et
                sdpInfo, err := FetchAndParseSDP(sender.ManifestHref)
                if err == nil {
                    // Parse edilmiş SDP'yi daha sonra kullanmak üzere sakla
                    api.stagedParams.TransportFile = map[string]interface{}{
                        "multicast_ip": sdpInfo.MulticastIP,
                        "port":         sdpInfo.Port,
                        "rtp_enabled":  true,
                    }
                }
            }
        }
    }
    
    if masterEnable, ok := patch["master_enable"].(bool); ok {
        api.stagedParams.MasterEnable = masterEnable
    }
    
    // Aktivasyonu yönet
    if activation, ok := patch["activation"].(map[string]interface{}); ok {
        mode := activation["mode"].(string)
        api.activationState.Mode = mode
        
        switch mode {
        case "activate_immediate":
            // Şimdi aktive et
            api.activateConnection()
            
            w.Header().Set("Content-Type", "application/json")
            w.WriteHeader(http.StatusOK)
            json.NewEncoder(w).Encode(map[string]interface{}{
                "status": "activated",
            })
            return
            
        case "activate_scheduled_absolute":
            requestedTime := activation["requested_time"].(string)
            api.activationState.RequestedTime = requestedTime
            
            // Aktivasyonu zamanla
            go api.scheduleActivation(requestedTime)
            
            w.Header().Set("Content-Type", "application/json")
            w.WriteHeader(http.StatusAccepted)
            json.NewEncoder(w).Encode(map[string]interface{}{
                "status": "scheduled",
            })
            return
        }
    }
    
    // Sadece staged, aktive edilmedi
    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]interface{}{
        "status": "staged",
    })
}

// GetConstraints, receiver yeteneklerini döndürür
func (api *ConnectionAPI) GetConstraints(w http.ResponseWriter, r *http.Request) {
    constraints := []map[string]interface{}{
        {
            "parameter": "sender_id",
            "type":      "string",
        },
        {
            "parameter": "master_enable",
            "type":      "boolean",
        },
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(constraints)
}

// activateConnection, staged parametreleri aktif hale getirir
func (api *ConnectionAPI) activateConnection() {
    api.activeParams = api.stagedParams
    api.activationState.ActivationTime = time.Now().Format(time.RFC3339Nano)
    
    fmt.Printf("Bağlantı aktive edildi: Sender %s -> Receiver %s\n", 
        api.activeParams.SenderID, api.receiverID)
    
    // Production'da: gerçekten RTP stream almaya başla
    if api.activeParams.MasterEnable {
        // GStreamer pipeline, FFmpeg veya başka medya receiver başlat
        api.startReceiving()
    } else {
        api.stopReceiving()
    }
}

// scheduleActivation, gelecekte bir zamanda aktive eder
func (api *ConnectionAPI) scheduleActivation(timeStr string) {
    activationTime, err := time.Parse(time.RFC3339Nano, timeStr)
    if err != nil {
        fmt.Printf("Geçersiz aktivasyon zamanı: %v\n", err)
        return
    }
    
    // Aktivasyon zamanına kadar bekle
    time.Sleep(time.Until(activationTime))
    
    api.mu.Lock()
    api.activateConnection()
    api.mu.Unlock()
}

// startReceiving, gerçek medya pipeline'ını başlatır
func (api *ConnectionAPI) startReceiving() {
    fmt.Println("Medya receiver başlatılıyor...")
    // Örnek: exec.Command("gst-launch-1.0", "udpsrc", "port=5004", "...")
}

func (api *ConnectionAPI) stopReceiving() {
    fmt.Println("Medya receiver durduruluyor...")
}

func getSenderFromRegistry(senderID string) *registry.Sender {
    // Sender detayları için NMOS registry'yi sorgula
    // Production'da: registry'ye HTTP GET
    return nil
}

6.5 IS-05 Receiver cURL Örnekleri

1. Mevcut aktif bağlantıyı göster:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
curl http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/active | jq

# Response:
# {
#   "sender_id": "sender-123",
#   "master_enable": true,
#   "activation": {
#     "mode": "activate_immediate",
#     "activation_time": "2025-12-30T10:15:30.123Z"
#   },
#   "transport_file": {
#     "multicast_ip": "239.100.0.1",
#     "port": 5004,
#     "rtp_enabled": true
#   }
# }

2. Staged parametreleri göster:

1
curl http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/staged | jq

3. Sender’a bağlan (immediate activation):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
curl -X PATCH http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/staged \
  -H "Content-Type: application/json" \
  -d '{
    "sender_id": "sender-789",
    "master_enable": true,
    "activation": {
      "mode": "activate_immediate"
    }
  }' | jq

# Response:
# {
#   "status": "activated"
# }

4. Zamanlanmış aktivasyon:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
curl -X PATCH http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/staged \
  -H "Content-Type: application/json" \
  -d '{
    "sender_id": "sender-789",
    "master_enable": true,
    "activation": {
      "mode": "activate_scheduled_absolute",
      "requested_time": "2025-12-30T15:00:00.000Z"
    }
  }' | jq

# Response:
# {
#   "status": "scheduled"
# }

5. Bağlantıyı kes:

1
2
3
4
5
6
7
8
9
curl -X PATCH http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/staged \
  -H "Content-Type: application/json" \
  -d '{
    "sender_id": null,
    "master_enable": false,
    "activation": {
      "mode": "activate_immediate"
    }
  }' | jq

6. Receiver yeteneklerini kontrol et:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
curl http://localhost:8080/x-nmos/connection/v1.1/single/receivers/receiver-456/constraints | jq

# Response:
# [
#   {
#     "parameter": "sender_id",
#     "type": "string"
#   },
#   {
#     "parameter": "master_enable",
#     "type": "boolean"
#   }
# ]

6.4.1 cURL ile IS-05 Test Etme

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 1. Receiver'ın mevcut aktif bağlantısını kontrol et
RECEIVER_ID="a8b4c2d1-5f6e-4a3b-9c8d-1e2f3a4b5c6d"
curl http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/active | jq

# Response:
# {
#   "sender_id": null,
#   "master_enable": false,
#   "activation": {...},
#   "transport_file": {}
# }

# 2. Staged parametreleri kontrol et
curl http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/staged | jq

# 3. Bir bağlantıyı stage et (anında aktivasyon)
SENDER_ID="a1b2c3d4-e5f6-7890-abcd-ef1234567890"
curl -X PATCH http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/staged \
  -H "Content-Type: application/json" \
  -d '{
    "sender_id": "'$SENDER_ID'",
    "master_enable": true,
    "activation": {
      "mode": "activate_immediate"
    }
  }' | jq

# Response (200 OK):
# {"status": "activated"}

# 4. Bir bağlantıyı stage et (zamanlanmış aktivasyon)
ACTIVATION_TIME=$(date -u -v+10S +"%Y-%m-%dT%H:%M:%S.%NZ") # 10 saniye sonra
curl -X PATCH http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/staged \
  -H "Content-Type: application/json" \
  -d '{
    "sender_id": "'$SENDER_ID'",
    "master_enable": true,
    "activation": {
      "mode": "activate_scheduled_absolute",
      "requested_time": "'$ACTIVATION_TIME'"
    }
  }' | jq

# Response (202 Accepted):
# {"status": "scheduled"}

# 5. Bağlantıyı kes (sender_id null yap)
curl -X PATCH http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/staged \
  -H "Content-Type: application/json" \
  -d '{
    "sender_id": null,
    "master_enable": false,
    "activation": {
      "mode": "activate_immediate"
    }
  }' | jq

# 6. Receiver constraints kontrol et
curl http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/$RECEIVER_ID/constraints | jq

6.5 IS-05 Controller Örneği

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// examples/is05_controller.go
package main

import (
    "bytes"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
)

func main() {
    // Camera 1 Sender'ı Monitor Receiver'a bağla
    
    senderID := "58f6b536-ca4c-43fd-880e-9df2af5d5d94"
    receiverID := "a8b4c2d1-5f6e-4a3b-9c8d-1e2f3a4b5c6d"
    
    // Bağlantıyı stage et
    stageRequest := map[string]interface{}{
        "sender_id":     senderID,
        "master_enable": true,
        "activation": map[string]interface{}{
            "mode": "activate_immediate",
        },
    }
    
    data, _ := json.Marshal(stageRequest)
    
    url := fmt.Sprintf("http://receiver-node:8080/x-nmos/connection/v1.1/single/receivers/%s/staged", receiverID)
    
    resp, err := http.Post(url, "application/json", bytes.NewBuffer(data))
    if err != nil {
        log.Fatalf("Bağlantı başarısız: %v", err)
    }
    defer resp.Body.Close()
    
    if resp.StatusCode == http.StatusOK {
        fmt.Println("Bağlantı başarıyla aktive edildi!")
    } else {
        fmt.Printf("Bağlantı başarısız status: %d\n", resp.StatusCode)
    }
}

⚠️ GÜVENLİK NOTU:
Bu örneklerde IS-10 (Authorization) ve BCP-003 (Secure Communication) uygulanmamıştır.
Production ortamlarında mutlaka:

  • TLS/HTTPS kullanın (BCP-003)
  • OAuth2/JWT token authentication ekleyin (IS-10)
  • mTLS (mutual TLS) için client sertifikaları yapılandırın
  • HTTPS-only registry ve node endpoint’leri

Örnek güvenli bağlantı:

1
2
3
curl -H "Authorization: Bearer $TOKEN" \
     --cacert /path/to/ca.crt \
     https://registry.studio.local:443/x-nmos/registration/v1.3/resource

Detaylar için makalenin Section 13 (IS-10 Authentication) bölümüne bakın.


7. IS-08: Channel Mapping (Ses Yönlendirmesi)

IS-08, cihazlar içinde esnek ses kanalı yönlendirmesine izin verir.

7.1 IS-08 Kullanım Senaryosu

Şu özelliklere sahip bir cihaz düşünün:

  • Giriş: 16 kanallı ses (8 stereo çift)
  • Çıkış: 8 kanallı ses (4 stereo çift)

IS-08, hangi giriş kanallarının hangi çıkış kanallarına gideceğini haritalandırmanıza izin verir.

7.2 IS-08 Implementasyonu

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// channel/is08.go
package channel

import (
    "encoding/json"
    "fmt"
    "net/http"
    "sync"
    
    "github.com/gorilla/mux"
)

// ChannelMappingAPI, IS-08'i implement eder
type ChannelMappingAPI struct {
    mu             sync.RWMutex
    deviceID       string
    inputs         []IOChannel
    outputs        []IOChannel
    activeMap      map[string][]ChannelMapping // output_id -> input mappings
    stagedMap      map[string][]ChannelMapping
}

type IOChannel struct {
    Name        string `json:"name"`
    Description string `json:"description"`
    ChannelID   string `json:"channel_id"`
}

type ChannelMapping struct {
    InputChannelID  string  `json:"input"`
    OutputChannelID string  `json:"output"`
    GainDB          float64 `json:"gain_db"`
}

func NewChannelMappingAPI(deviceID string) *ChannelMappingAPI {
    api := &ChannelMappingAPI{
        deviceID:  deviceID,
        inputs:    make([]IOChannel, 0),
        outputs:   make([]IOChannel, 0),
        activeMap: make(map[string][]ChannelMapping),
        stagedMap: make(map[string][]ChannelMapping),
    }
    
    // Örnek: 16 giriş kanalı, 8 çıkış kanalı
    for i := 0; i < 16; i++ {
        api.inputs = append(api.inputs, IOChannel{
            Name:      fmt.Sprintf("Input %d", i+1),
            ChannelID: fmt.Sprintf("in%d", i),
        })
    }
    
    for i := 0; i < 8; i++ {
        api.outputs = append(api.outputs, IOChannel{
            Name:      fmt.Sprintf("Output %d", i+1),
            ChannelID: fmt.Sprintf("out%d", i),
        })
    }
    
    return api
}

func (api *ChannelMappingAPI) SetupRoutes(router *mux.Router, deviceID string) {
    base := fmt.Sprintf("/x-nmos/channelmapping/v1.0/map/%s", deviceID)
    
    router.HandleFunc(base+"/inputs", api.GetInputs).Methods("GET")
    router.HandleFunc(base+"/outputs", api.GetOutputs).Methods("GET")
    router.HandleFunc(base+"/active", api.GetActiveMap).Methods("GET")
    router.HandleFunc(base+"/staged", api.GetStagedMap).Methods("GET")
    router.HandleFunc(base+"/staged", api.PatchStagedMap).Methods("PATCH")
}

func (api *ChannelMappingAPI) GetInputs(w http.ResponseWriter, r *http.Request) {
    api.mu.RLock()
    defer api.mu.RUnlock()
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(api.inputs)
}

func (api *ChannelMappingAPI) GetOutputs(w http.ResponseWriter, r *http.Request) {
    api.mu.RLock()
    defer api.mu.RUnlock()
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(api.outputs)
}

func (api *ChannelMappingAPI) GetActiveMap(w http.ResponseWriter, r *http.Request) {
    api.mu.RLock()
    defer api.mu.RUnlock()
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(api.activeMap)
}

func (api *ChannelMappingAPI) PatchStagedMap(w http.ResponseWriter, r *http.Request) {
    var patch map[string][]ChannelMapping
    
    if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
        http.Error(w, "Geçersiz JSON", http.StatusBadRequest)
        return
    }
    
    api.mu.Lock()
    defer api.mu.Unlock()
    
    // Staged mapping'leri güncelle
    for outputID, mappings := range patch {
        api.stagedMap[outputID] = mappings
    }
    
    // Anında aktive et (veya IS-05 gibi aktivasyonu yönet)
    api.activeMap = api.stagedMap
    
    fmt.Println("Kanal haritalama güncellendi")
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(map[string]string{
        "status": "activated",
    })
}

8. IS-09: System Parameters (Global Zamanlama)

IS-09, PTP zamanlama gibi global sistem parametrelerini yönetir.

8.1 IS-09 Genel Bakış

IS-09 şunları sağlar:

  • Global PTP domain konfigürasyonu
  • Sistem çapında zamanlama parametreleri
  • Senkronizasyon ayarları

8.2 Basit IS-09 Implementasyonu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// system/is09.go
package system

import (
    "encoding/json"
    "net/http"
    "sync"
    
    "github.com/gorilla/mux"
)

type SystemAPI struct {
    mu              sync.RWMutex
    globalParams    GlobalParams
}

type GlobalParams struct {
    PTPDomain  int    `json:"ptp_domain"`
    PTPProfile string `json:"ptp_profile"` // AES67, SMPTE 2059-2
}

func NewSystemAPI() *SystemAPI {
    return &SystemAPI{
        globalParams: GlobalParams{
            PTPDomain:  127,
            PTPProfile: "SMPTE 2059-2",
        },
    }
}

func (api *SystemAPI) SetupRoutes(router *mux.Router) {
    base := "/x-nmos/system/v1.0"
    
    router.HandleFunc(base+"/global", api.GetGlobal).Methods("GET")
    router.HandleFunc(base+"/global", api.PatchGlobal).Methods("PATCH")
}

func (api *SystemAPI) GetGlobal(w http.ResponseWriter, r *http.Request) {
    api.mu.RLock()
    defer api.mu.RUnlock()
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(api.globalParams)
}

func (api *SystemAPI) PatchGlobal(w http.ResponseWriter, r *http.Request) {
    var patch GlobalParams
    
    if err := json.NewDecoder(r.Body).Decode(&patch); err != nil {
        http.Error(w, "Geçersiz JSON", http.StatusBadRequest)
        return
    }
    
    api.mu.Lock()
    defer api.mu.Unlock()
    
    if patch.PTPDomain != 0 {
        api.globalParams.PTPDomain = patch.PTPDomain
    }
    
    if patch.PTPProfile != "" {
        api.globalParams.PTPProfile = patch.PTPProfile
    }
    
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(api.globalParams)
}

9. Tam Entegrasyon Örneği

Hepsini bir araya getirelim ve tam bir workflow oluşturalım.

9.1 Tam Broadcast Workflow’u

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// examples/full_workflow.go
package main

import (
    "fmt"
    "log"
    "time"
    
    "yourproject/client"
)

func main() {
    fmt.Println("=== NMOS Broadcast Workflow Demo ===\n")
    
    // 1. NMOS Registry'yi başlat (zaten :3210/:3211'de çalışıyor)
    fmt.Println("✓ NMOS Registry çalışıyor")
    
    // 2. Camera Node'u kaydet (Sender)
    cameraNode := client.NewNMOSNode(
        "http://localhost:3210",
        "camera-01.studio.local",
    )
    
    cameraNode.AddVideoSender(
        "Studio Camera 1",
        "http://camera-01:8080/sdp/video.sdp",
    )
    
    if err := cameraNode.Start(); err != nil {
        log.Fatal(err)
    }
    fmt.Println("✓ Camera Node kaydedildi")
    
    time.Sleep(2 * time.Second)
    
    // 3. Monitor Node'u kaydet (Receiver)
    monitorNode := client.NewNMOSNode(
        "http://localhost:3210",
        "monitor-01.studio.local",
    )
    
    monitorNode.AddVideoReceiver(
        "Program Monitor",
    )
    
    if err := monitorNode.Start(); err != nil {
        log.Fatal(err)
    }
    fmt.Println("✓ Monitor Node kaydedildi")
    
    time.Sleep(2 * time.Second)
    
    // 4. Kullanılabilir sender'lar için registry'yi sorgula
    senders := queryRegistry("http://localhost:3211/x-nmos/query/v1.3/senders")
    fmt.Printf("✓ %d sender bulundu\n", len(senders))
    
    receivers := queryRegistry("http://localhost:3211/x-nmos/query/v1.3/receivers")
    fmt.Printf("✓ %d receiver bulundu\n", len(receivers))
    
    // 5. IS-05 ile bağlantı kur
    if len(senders) > 0 && len(receivers) > 0 {
        senderID := senders[0]["id"].(string)
        receiverID := receivers[0]["id"].(string)
        
        fmt.Printf("\n✓ Sender %s -> Receiver %s bağlanıyor\n", senderID, receiverID)
        
        makeConnection(receiverID, senderID)
        
        fmt.Println("✓ Bağlantı aktive edildi!")
    }
    
    // 6. WebSocket ile izle
    fmt.Println("\n✓ Registry değişikliklerini izliyor (Ctrl+C ile durdurun)...")
    
    select {}
}

func queryRegistry(url string) []map[string]interface{} {
    // Query API'ye HTTP GET isteği
    // JSON response'u parse et
    return []map[string]interface{}{}
}

func makeConnection(receiverID, senderID string) {
    // Receiver'ın /staged endpoint'ine IS-05 PATCH isteği
}

9.1 IS-06: Network Control (SDN Entegrasyonu)

IS-06, NMOS’un ağ altyapısını (switch’ler, router’lar) otomatik kontrol etmesini sağlar.

9.1.1 IS-06 Neden Önemli?

Geleneksel Yöntem:

1
2
3
4
1. Manuel VLAN konfigürasyonu
2. Elle multicast routing kurulumu
3. Statik bandwidth tahsisi
4. Fiziksel değişiklikler gerektirir

IS-06 ile:

1
2
3
4
✅ Otomatik VLAN yönetimi
✅ Dinamik bandwidth rezervasyonu
✅ Otomatik multicast routing
✅ Yazılım-tanımlı ağ kontrolü

9.1.2 IS-06 Kullanım Senaryoları

Senaryo 1: Otomatik VLAN Yönetimi

1
2
3
4
5
Kamera 1 (VLAN 100) → Switch → Monitor (VLAN 200)
         IS-06 API
     "Bu akış için VLAN 100'e izin ver"

Senaryo 2: Bandwidth Rezervasyonu

1
2
3
4
5
4K Stream = 8 Gbps gerekiyor
   IS-06 API
Switch'te 10 Gbps rezerve et

9.1.3 Go ile IS-06 Network Controller

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
// network/is06.go
package network

import (
    "encoding/json"
    "fmt"
    "net/http"
    
    "github.com/gorilla/mux"
)

// NetworkController, ağ cihazlarını yönetir
type NetworkController struct {
    switches map[string]*Switch
}

// Switch, yönetilen bir network switch'i temsil eder
type Switch struct {
    ID          string        `json:"id"`
    Label       string        `json:"label"`
    Hostname    string        `json:"hostname"`
    Management  string        `json:"management_address"`
    Ports       []SwitchPort  `json:"ports"`
    VLANs       []VLAN        `json:"vlans"`
}

// SwitchPort, switch port'u temsil eder
type SwitchPort struct {
    PortID      string   `json:"port_id"`
    Label       string   `json:"label"`
    Speed       string   `json:"speed"`        // "10G", "25G", "100G"
    State       string   `json:"state"`        // "up", "down"
    VLANs       []int    `json:"vlans"`
    Bandwidth   int64    `json:"bandwidth_mbps"`
    Reserved    int64    `json:"reserved_mbps"` // Rezerve edilmiş bant genişliği
}

// VLAN konfigürasyonu
type VLAN struct {
    VLANID      int      `json:"vlan_id"`
    Name        string   `json:"name"`
    Ports       []string `json:"ports"`
    Multicast   bool     `json:"multicast_enabled"`
}

// FlowReservation, bir akış için ağ kaynağı rezervasyonu
type FlowReservation struct {
    FlowID          string `json:"flow_id"`
    SenderID        string `json:"sender_id"`
    ReceiverID      string `json:"receiver_id"`
    BandwidthMbps   int64  `json:"bandwidth_mbps"`
    VLANID          int    `json:"vlan_id"`
    MulticastGroup  string `json:"multicast_group"`
    SourcePort      string `json:"source_port"`
    DestPort        string `json:"dest_port"`
}

func NewNetworkController() *NetworkController {
    return &NetworkController{
        switches: make(map[string]*Switch),
    }
}

// SetupRoutes, IS-06 API route'larını yapılandırır
func (nc *NetworkController) SetupRoutes(router *mux.Router) {
    base := "/x-nmos/netctrl/v1.0"
    
    // Network endpoints
    router.HandleFunc(base+"/endpoints", nc.GetEndpoints).Methods("GET")
    
    // Switch yönetimi
    router.HandleFunc(base+"/switches", nc.GetSwitches).Methods("GET")
    router.HandleFunc(base+"/switches/{switchId}/ports", nc.GetSwitchPorts).Methods("GET")
    
    // Flow rezervasyonu
    router.HandleFunc(base+"/reservations", nc.CreateReservation).Methods("POST")
    router.HandleFunc(base+"/reservations/{resId}", nc.DeleteReservation).Methods("DELETE")
}

// CreateReservation, bir NMOS flow için ağ kaynakları rezerve eder
func (nc *NetworkController) CreateReservation(w http.ResponseWriter, r *http.Request) {
    var reservation FlowReservation
    
    if err := json.NewDecoder(r.Body).Decode(&reservation); err != nil {
        http.Error(w, "Geçersiz JSON", http.StatusBadRequest)
        return
    }
    
    // 1. Bandwidth kontrolü
    if err := nc.checkBandwidthAvailability(reservation); err != nil {
        http.Error(w, err.Error(), http.StatusConflict)
        return
    }
    
    // 2. VLAN konfigürasyonu
    if err := nc.configureVLAN(reservation); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    // 3. Multicast routing
    if err := nc.configureMulticast(reservation); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    // 4. QoS ayarları
    if err := nc.configureQoS(reservation); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    w.WriteHeader(http.StatusCreated)
    json.NewEncoder(w).Encode(map[string]interface{}{
        "reservation_id": reservation.FlowID,
        "status":         "active",
        "bandwidth":      reservation.BandwidthMbps,
        "vlan":           reservation.VLANID,
    })
}

// checkBandwidthAvailability, yeterli bandwidth olup olmadığını kontrol eder
func (nc *NetworkController) checkBandwidthAvailability(res FlowReservation) error {
    sw, exists := nc.switches[extractSwitchFromPort(res.SourcePort)]
    if !exists {
        return fmt.Errorf("switch bulunamadı")
    }
    
    for _, port := range sw.Ports {
        if port.PortID == res.SourcePort {
            available := port.Bandwidth - port.Reserved
            if available < res.BandwidthMbps {
                return fmt.Errorf("yetersiz bandwidth: mevcut %d Mbps, gerekli %d Mbps", 
                    available, res.BandwidthMbps)
            }
            // Bandwidth'i rezerve et
            port.Reserved += res.BandwidthMbps
            return nil
        }
    }
    
    return fmt.Errorf("port bulunamadı: %s", res.SourcePort)
}

// configureVLAN, switch'te VLAN konfigürasyonu yapar
func (nc *NetworkController) configureVLAN(res FlowReservation) error {
    // Gerçek switch'e NETCONF/REST API ile bağlan
    // Örnek: Cisco, Arista, Juniper API'leri
    
    fmt.Printf("VLAN %d konfigüre ediliyor: port %s → %s\n", 
        res.VLANID, res.SourcePort, res.DestPort)
    
    // NETCONF XML örneği (Cisco için)
    netconfCommand := fmt.Sprintf(`
        <config>
          <interface>
            <name>%s</name>
            <switchport>
              <mode>trunk</mode>
              <trunk>
                <allowed>
                  <vlan>%d</vlan>
                </allowed>
              </trunk>
            </switchport>
          </interface>
        </config>
    `, res.SourcePort, res.VLANID)
    
    // Switch API'sine gönder
    return nc.sendToSwitch(extractSwitchFromPort(res.SourcePort), netconfCommand)
}

// configureMulticast, multicast routing konfigürasyonu yapar
func (nc *NetworkController) configureMulticast(res FlowReservation) error {
    fmt.Printf("Multicast group %s için routing konfigüre ediliyor\n", res.MulticastGroup)
    
    // IGMP snooping aktivasyonu
    // PIM (Protocol Independent Multicast) konfigürasyonu
    
    command := fmt.Sprintf(`
        ip multicast-routing
        interface %s
          ip pim sparse-mode
          ip igmp version 3
    `, res.SourcePort)
    
    return nc.sendToSwitch(extractSwitchFromPort(res.SourcePort), command)
}

// configureQoS, Quality of Service ayarları yapar
func (nc *NetworkController) configureQoS(res FlowReservation) error {
    fmt.Printf("QoS konfigüre ediliyor: %d Mbps garantili\n", res.BandwidthMbps)
    
    // DSCP marking (ST 2110 için genelde EF - Expedited Forwarding)
    // Rate limiting
    // Priority queuing
    
    command := fmt.Sprintf(`
        class-map match-all NMOS-VIDEO
          match dscp ef
        policy-map NMOS-QOS
          class NMOS-VIDEO
            priority %d
            police %d000000 conform-action transmit exceed-action drop
    `, res.BandwidthMbps/10, res.BandwidthMbps)
    
    return nc.sendToSwitch(extractSwitchFromPort(res.SourcePort), command)
}

// sendToSwitch, switch'e komut gönderir (NETCONF, REST API, SSH)
func (nc *NetworkController) sendToSwitch(switchID, command string) error {
    sw, exists := nc.switches[switchID]
    if !exists {
        return fmt.Errorf("switch bulunamadı: %s", switchID)
    }
    
    // Gerçek implementasyonda:
    // - NETCONF client kullan (github.com/Juniper/go-netconf)
    // - Veya vendor-specific REST API
    // - Veya SSH ile CLI komutları
    
    fmt.Printf("Switch %s (%s) komut gönderiliyor...\n", sw.Label, sw.Management)
    fmt.Printf("Command: %s\n", command)
    
    return nil
}

func extractSwitchFromPort(portID string) string {
    // Port ID formatı: "switch-01:eth1/1"
    // Switch ID'yi çıkar
    return "switch-01" // Basitleştirilmiş
}

9.1.4 Switch Entegrasyonu Örnekleri

Cisco Switch için:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Cisco IOS-XE REST API
func (nc *NetworkController) configureCiscoSwitch(switchIP, vlanID, port string) error {
    url := fmt.Sprintf("https://%s/restconf/data/Cisco-IOS-XE-native:native/vlan", switchIP)
    
    vlanConfig := map[string]interface{}{
        "Cisco-IOS-XE-vlan:vlan": map[string]interface{}{
            "vlan-list": []map[string]interface{}{
                {
                    "id":   vlanID,
                    "name": fmt.Sprintf("NMOS-VLAN-%s", vlanID),
                },
            },
        },
    }
    
    // REST API call
    return sendRestAPI(url, vlanConfig)
}

Arista Switch için:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Arista eAPI
func (nc *NetworkController) configureAristaSwitch(switchIP, vlanID string) error {
    url := fmt.Sprintf("https://%s/command-api", switchIP)
    
    commands := []string{
        "enable",
        fmt.Sprintf("configure"),
        fmt.Sprintf("vlan %s", vlanID),
        fmt.Sprintf("name NMOS-VLAN-%s", vlanID),
    }
    
    return nc.sendAristaCommand(url, commands)
}

9.1.5 Tam Entegrasyon: NMOS + SDN

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// examples/nmos_sdn_integration.go
package main

import (
    "fmt"
    "yourproject/registry"
    "yourproject/network"
)

func main() {
    // 1. NMOS Registry başlat
    reg := registry.NewRegistry()
    
    // 2. Network Controller başlat
    netCtrl := network.NewNetworkController()
    
    // 3. Switch'leri ekle
    netCtrl.AddSwitch(&network.Switch{
        ID:       "switch-core-01",
        Label:    "Core Switch 1",
        Hostname: "core-sw-01.studio.local",
        Management: "10.0.1.10",
        Ports: []network.SwitchPort{
            {PortID: "eth1/1", Speed: "100G", Bandwidth: 100000},
            {PortID: "eth1/2", Speed: "100G", Bandwidth: 100000},
        },
    })
    
    // 4. NMOS connection yapıldığında otomatik ağ konfigürasyonu
    reg.OnConnection(func(senderID, receiverID string) {
        // Flow bilgilerini al
        sender := reg.GetSender(senderID)
        
        // Bandwidth hesapla (1080p50 = 1.66 Gbps)
        bandwidth := calculateBandwidth(sender)
        
        // Ağ rezervasyonu yap
        reservation := network.FlowReservation{
            FlowID:        sender.FlowID,
            SenderID:      senderID,
            ReceiverID:    receiverID,
            BandwidthMbps: bandwidth,
            VLANID:        100, // ST 2110 video VLAN
            MulticastGroup: extractMulticast(sender.ManifestHref),
        }
        
        if err := netCtrl.CreateReservation(reservation); err != nil {
            fmt.Printf("Ağ rezervasyonu başarısız: %v\n", err)
            return
        }
        
        fmt.Println("✅ Ağ otomatik konfigüre edildi!")
    })
}

func calculateBandwidth(sender *registry.Sender) int64 {
    // 1080p50 10-bit 4:2:2 = 1.66 Gbps = 1660 Mbps
    return 1660
}

9.1.6 IS-06 cURL Örnekleri

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 1. Mevcut switch'leri listele
curl http://localhost:8080/x-nmos/netctrl/v1.0/switches | jq

# 2. Flow için bandwidth rezerve et
curl -X POST http://localhost:8080/x-nmos/netctrl/v1.0/reservations \
  -H "Content-Type: application/json" \
  -d '{
    "flow_id": "flow-123",
    "sender_id": "sender-456",
    "receiver_id": "receiver-789",
    "bandwidth_mbps": 1660,
    "vlan_id": 100,
    "multicast_group": "239.100.0.1"
  }' | jq

# Response:
# {
#   "reservation_id": "flow-123",
#   "status": "active",
#   "bandwidth": 1660,
#   "vlan": 100
# }

# 3. Rezervasyonu sil
curl -X DELETE http://localhost:8080/x-nmos/netctrl/v1.0/reservations/flow-123

9.1.7 Production Deployment

Desteklenen Switch’ler:

  • ✅ Cisco Nexus (NXAPI)
  • ✅ Arista (eAPI)
  • ✅ Juniper (NETCONF)
  • ✅ Mellanox/NVIDIA (REST API)
  • ✅ Dell (NETCONF)

Gereksinimler:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
switch_requirements:
  protocols:
    - NETCONF
    - REST API
    - SSH (fallback)
  features:
    - VLAN tagging
    - IGMP snooping v3
    - PIM sparse-mode
    - QoS/DSCP marking
    - Rate limiting

9.1.8 Gerçek Dünya Faydaları

Eski Yöntem (Manuel):

1
2
3
4
5
1. Network engineer switch'e SSH ile bağlanır
2. Manuel VLAN ekler
3. Port'ları konfigüre eder
4. Multicast routing ayarlar
5. 30-60 dakika sürer ⏱️

IS-06 ile (Otomatik):

1
2
3
4
1. NMOS connection yap
2. IS-06 otomatik switch'i konfigüre eder
3. 5 saniyede tamamlanır ⚡
4. Hata riski minimum

ROI:

1
2
3
4
Manuel: 100 bağlantı × 30 dk = 50 saat işçilik
IS-06:  100 bağlantı × 5 sn = 8 dakika

Tasarruf: ~49 saat 🎯

10. Hata Yönetimi ve Kurtarma Pattern’leri

10.1 Yaygın Hata Senaryoları

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// errors/nmos_errors.go
package errors

import (
    "errors"
    "fmt"
    "time"
)

var (
    ErrRegistryUnreachable = errors.New("NMOS registry erişilemez")
    ErrHeartbeatTimeout    = errors.New("heartbeat timeout")
    ErrInvalidResource     = errors.New("geçersiz kaynak")
    ErrConnectionFailed    = errors.New("bağlantı başarısız")
)

// ErrorHandler, retry mantığıyla NMOS hatalarını yönetir
type ErrorHandler struct {
    maxRetries    int
    retryDelay    time.Duration
    onError       func(error)
}

func NewErrorHandler() *ErrorHandler {
    return &ErrorHandler{
        maxRetries: 3,
        retryDelay: 5 * time.Second,
        onError:    func(err error) { fmt.Printf("Hata: %v\n", err) },
    }
}

// WithRetry, retry mantığıyla bir fonksiyonu çalıştırır
func (h *ErrorHandler) WithRetry(fn func() error) error {
    var err error
    for i := 0; i <= h.maxRetries; i++ {
        err = fn()
        if err == nil {
            return nil
        }
        
        h.onError(fmt.Errorf("deneme %d/%d başarısız: %w", i+1, h.maxRetries+1, err))
        
        if i < h.maxRetries {
            time.Sleep(h.retryDelay)
        }
    }
    return fmt.Errorf("tüm retry denemeleri başarısız: %w", err)
}

10.2 Registry Erişilemez - Fallback Stratejisi

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// client/resilient_node.go
package client

import (
    "fmt"
    "time"
)

// ResilientNode, registry başarısızlıklarını zarif bir şekilde yönetir
type ResilientNode struct {
    *NMOSNode
    fallbackRegistry string
    cacheEnabled     bool
    lastKnownState   map[string]interface{}
}

func (n *ResilientNode) StartWithResilience() error {
    handler := NewErrorHandler()
    
    // Retry ile kayıt olmayı dene
    err := handler.WithRetry(func() error {
        return n.registerResource(n.Node, "node")
    })
    
    if err != nil {
        // Fallback: İkincil registry'yi dene
        if n.fallbackRegistry != "" {
            fmt.Println("Primary registry başarısız, fallback deneniyor...")
            n.RegistryURL = n.fallbackRegistry
            return n.Start()
        }
        
        // Son çare: Registry olmadan standalone modda çalış
        fmt.Println("Standalone modda çalışıyor (registry yok)")
        return n.runStandalone()
    }
    
    // Recovery ile heartbeat'i başlat
    go n.resilientHeartbeat()
    
    return nil
}

func (n *ResilientNode) resilientHeartbeat() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    consecutiveFailures := 0
    maxFailures := 3
    
    for range ticker.C {
        err := n.sendHeartbeat()
        if err != nil {
            consecutiveFailures++
            fmt.Printf("Heartbeat başarısız (%d/%d): %v\n", consecutiveFailures, maxFailures, err)
            
            if consecutiveFailures >= maxFailures {
                // Node'u yeniden kaydet
                fmt.Println("Çok fazla heartbeat başarısızlığı, yeniden kaydediliyor...")
                if err := n.Start(); err == nil {
                    consecutiveFailures = 0
                }
            }
        } else {
            consecutiveFailures = 0
        }
    }
}

func (n *ResilientNode) runStandalone() error {
    // Registry olmadan çalış, local keşif sun
    fmt.Println("Local mDNS duyurusu başlatılıyor...")
    // Burada mDNS/DNS-SD duyurusunu implement et
    return nil
}

10.3 Network Partition Yönetimi

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Split-brain önleme
func (r *Registry) DetectNetworkPartition() bool {
    // Birden fazla registry'nin primary olduğunu düşünüp düşünmediğini kontrol et
    // Consensus algoritması kullan (Raft, etcd, vb.)
    
    // Registry instance'ları arasında basit heartbeat kontrolü
    peers := []string{"registry-01:3210", "registry-02:3210"}
    
    aliveCount := 0
    for _, peer := range peers {
        if r.ping(peer) {
            aliveCount++
        }
    }
    
    // Quorum kontrolü (çoğunluk erişilebilir olmalı)
    quorum := (len(peers) + 1) / 2
    return aliveCount >= quorum
}

func (r *Registry) ping(peer string) bool {
    resp, err := http.Get(fmt.Sprintf("http://%s/health", peer))
    if err != nil {
        return false
    }
    defer resp.Body.Close()
    return resp.StatusCode == http.StatusOK
}

10.4 Disconnect’te Kaynak Temizliği

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Kaynak temizliğiyle graceful shutdown
func (n *NMOSNode) GracefulShutdown() error {
    fmt.Println("Graceful shutdown başlatılıyor...")
    
    // 1. Yeni bağlantıları kabul etmeyi durdur
    close(n.stopChan)
    
    // 2. Tüm kaynakları registry'den sil
    for _, sender := range n.Senders {
        url := fmt.Sprintf("%s/x-nmos/registration/v1.3/resource/senders/%s", 
            n.RegistryURL, sender.ID)
        req, _ := http.NewRequest("DELETE", url, nil)
        n.httpClient.Do(req)
    }
    
    // 3. Node'u sil
    url := fmt.Sprintf("%s/x-nmos/registration/v1.3/resource/nodes/%s", 
        n.RegistryURL, n.Node.ID)
    req, _ := http.NewRequest("DELETE", url, nil)
    n.httpClient.Do(req)
    
    fmt.Println("Shutdown tamamlandı")
    return nil
}

11. Docker ve Kubernetes Deployment

11.1 NMOS Registry için Dockerfile

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Dockerfile
FROM golang:1.21-alpine AS builder

WORKDIR /app

# Go mod dosyalarını kopyala
COPY go.mod go.sum ./
RUN go mod download

# Kaynak kodunu kopyala
COPY . .

# Uygulamayı build et
RUN CGO_ENABLED=0 GOOS=linux go build -o nmos-registry ./cmd/registry

# Final stage
FROM alpine:latest

RUN apk --no-cache add ca-certificates

WORKDIR /root/

# Binary'yi builder'dan kopyala
COPY --from=builder /app/nmos-registry .

# Portları expose et
EXPOSE 3210 3211

# Uygulamayı çalıştır
CMD ["./nmos-registry"]

11.2 Docker Compose Kurulumu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# docker-compose.yml
version: '3.8'

services:
  # Registry depolama için PostgreSQL
  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_DB: nmos
      POSTGRES_USER: nmos
      POSTGRES_PASSWORD: nmos_secret
    volumes:
      - postgres_data:/var/lib/postgresql/data
    networks:
      - nmos_network

  # NMOS Registry
  nmos-registry:
    build: .
    ports:
      - "3210:3210"  # Registration API
      - "3211:3211"  # Query API
    environment:
      DATABASE_URL: "postgres://nmos:nmos_secret@postgres:5432/nmos?sslmode=disable"
      LOG_LEVEL: "info"
    depends_on:
      - postgres
    networks:
      - nmos_network
    restart: unless-stopped

  # Monitoring için Prometheus
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    networks:
      - nmos_network

  # Dashboard'lar için Grafana
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
    volumes:
      - grafana_data:/var/lib/grafana
    networks:
      - nmos_network

volumes:
  postgres_data:
  prometheus_data:
  grafana_data:

networks:
  nmos_network:
    driver: bridge

11.3 Kubernetes Deployment

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# k8s/registry-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nmos-registry
  namespace: broadcast
spec:
  replicas: 3  # Yüksek erişilebilirlik
  selector:
    matchLabels:
      app: nmos-registry
  template:
    metadata:
      labels:
        app: nmos-registry
    spec:
      containers:
      - name: nmos-registry
        image: yourregistry/nmos-registry:latest
        ports:
        - containerPort: 3210
          name: registration
        - containerPort: 3211
          name: query
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: nmos-secrets
              key: database-url
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 3210
          initialDelaySeconds: 10
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /health
            port: 3210
          initialDelaySeconds: 5
          periodSeconds: 10

---
apiVersion: v1
kind: Service
metadata:
  name: nmos-registry
  namespace: broadcast
spec:
  selector:
    app: nmos-registry
  ports:
  - name: registration
    port: 3210
    targetPort: 3210
  - name: query
    port: 3211
    targetPort: 3211
  type: LoadBalancer  # Veya Ingress ile ClusterIP

---
apiVersion: v1
kind: Service
metadata:
  name: nmos-registry-headless
  namespace: broadcast
spec:
  clusterIP: None
  selector:
    app: nmos-registry
  ports:
  - name: registration
    port: 3210
  - name: query
    port: 3211

11.4 PostgreSQL için Kubernetes StatefulSet

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# k8s/postgres-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: postgres
  namespace: broadcast
spec:
  serviceName: postgres
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
      - name: postgres
        image: postgres:15-alpine
        ports:
        - containerPort: 5432
        env:
        - name: POSTGRES_DB
          value: nmos
        - name: POSTGRES_USER
          value: nmos
        - name: POSTGRES_PASSWORD
          valueFrom:
            secretKeyRef:
              name: nmos-secrets
              key: postgres-password
        volumeMounts:
        - name: postgres-storage
          mountPath: /var/lib/postgresql/data
  volumeClaimTemplates:
  - metadata:
      name: postgres-storage
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 20Gi

11.5 Kubernetes’e Deploy Etme

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# Namespace oluştur
kubectl create namespace broadcast

# Secret'ları oluştur
kubectl create secret generic nmos-secrets \
  --from-literal=database-url='postgres://nmos:secret@postgres:5432/nmos' \
  --from-literal=postgres-password='secret' \
  -n broadcast

# PostgreSQL'i deploy et
kubectl apply -f k8s/postgres-statefulset.yaml

# NMOS Registry'yi deploy et
kubectl apply -f k8s/registry-deployment.yaml

# Durumu kontrol et
kubectl get pods -n broadcast
kubectl logs -f deployment/nmos-registry -n broadcast

# Local erişim için port forward
kubectl port-forward svc/nmos-registry 3210:3210 3211:3211 -n broadcast

12. IS-10: Authentication ve Authorization

12.1 Temel OAuth2 Token Doğrulama

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
// auth/is10.go
package auth

import (
    "crypto/rsa"
    "errors"
    "fmt"
    "net/http"
    "strings"
    
    "github.com/golang-jwt/jwt/v5"
)

// IS10Middleware OAuth2 JWT token'larını doğrular
type IS10Middleware struct {
    publicKey    *rsa.PublicKey
    issuer       string
    audience     string
}

func NewIS10Middleware(publicKeyPEM string, issuer, audience string) (*IS10Middleware, error) {
    publicKey, err := jwt.ParseRSAPublicKeyFromPEM([]byte(publicKeyPEM))
    if err != nil {
        return nil, err
    }
    
    return &IS10Middleware{
        publicKey: publicKey,
        issuer:    issuer,
        audience:  audience,
    }, nil
}

// ValidateToken middleware
func (m *IS10Middleware) ValidateToken(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // Authorization header'dan token çıkar
        authHeader := r.Header.Get("Authorization")
        if authHeader == "" {
            http.Error(w, "Authorization header eksik", http.StatusUnauthorized)
            return
        }
        
        tokenString := strings.TrimPrefix(authHeader, "Bearer ")
        
        // JWT'yi parse et ve doğrula
        token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
            // Signing method'u doğrula
            if _, ok := token.Method.(*jwt.SigningMethodRSA); !ok {
                return nil, fmt.Errorf("beklenmeyen signing method: %v", token.Header["alg"])
            }
            return m.publicKey, nil
        })
        
        if err != nil || !token.Valid {
            http.Error(w, "Geçersiz token", http.StatusUnauthorized)
            return
        }
        
        // Claim'leri doğrula
        claims, ok := token.Claims.(jwt.MapClaims)
        if !ok {
            http.Error(w, "Geçersiz token claim'leri", http.StatusUnauthorized)
            return
        }
        
        // Issuer kontrol et
        if iss, ok := claims["iss"].(string); !ok || iss != m.issuer {
            http.Error(w, "Geçersiz issuer", http.StatusUnauthorized)
            return
        }
        
        // Audience kontrol et
        if aud, ok := claims["aud"].(string); !ok || aud != m.audience {
            http.Error(w, "Geçersiz audience", http.StatusUnauthorized)
            return
        }
        
        // Scope'ları kontrol et (read, write)
        scopes, _ := claims["scope"].(string)
        if !strings.Contains(scopes, "nmos:read") {
            http.Error(w, "Yetersiz izinler", http.StatusForbidden)
            return
        }
        
        // Token geçerli, devam et
        next.ServeHTTP(w, r)
    })
}

// Router'a uygula
func ApplyIS10Security(router *mux.Router, middleware *IS10Middleware) {
    // Public endpoint'ler (auth gerekmez)
    router.HandleFunc("/health", healthHandler).Methods("GET")
    
    // Korunan endpoint'ler
    protected := router.PathPrefix("/x-nmos").Subrouter()
    protected.Use(middleware.ValidateToken)
    
    // Tüm NMOS endpoint'leri artık geçerli JWT gerektirir
}

12.2 cURL ile Test Etme

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# 1. Authorization server'dan OAuth2 token al
TOKEN=$(curl -X POST https://auth.example.com/oauth/token \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "grant_type=client_credentials" \
  -d "client_id=your_client_id" \
  -d "client_secret=your_secret" \
  -d "scope=nmos:read nmos:write" \
  | jq -r '.access_token')

# 2. Korunan NMOS API'ye eriş
curl http://localhost:3211/x-nmos/query/v1.3/nodes \
  -H "Authorization: Bearer $TOKEN" \
  | jq

# Token olmadan (başarısız olmalı):
curl http://localhost:3211/x-nmos/query/v1.3/nodes
# Response: 401 Unauthorized

13. Troubleshooting Rehberi

13.1 Hızlı Tanı Kontrol Listesi

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# ✅ Registry çalışıyor mu?
curl -s http://localhost:3210/health && echo "✓ Registration API OK" || echo "✗ Registration API DOWN"
curl -s http://localhost:3211/health && echo "✓ Query API OK" || echo "✗ Query API DOWN"

# ✅ Node'lar kaydoluyor mu?
NODE_COUNT=$(curl -s http://localhost:3211/x-nmos/query/v1.3/nodes | jq length)
echo "Kayıtlı node'lar: $NODE_COUNT"

# ✅ Heartbeat'ler çalışıyor mu?
# Test node kaydet ve 12 saniye bekle
# Kayboluyorsa, heartbeat başarısız

# ✅ mDNS/DNS-SD kontrol et
avahi-browse -a -t  # Linux
dns-sd -B _nmos-registration._tcp  # macOS

# ✅ Ağ bağlantısı
ping registry.local
telnet registry.local 3210

13.2 Yaygın Sorunlar ve Çözümler

Sorun Belirti Çözüm
Registry erişilemez Node kayıt olamıyor Firewall, ağ bağlantısı, DNS kontrol et
Node’lar kayboluyor 12s sonra node’lar yok oluyor Heartbeat implementasyonu kontrol et, ağ gecikmesi
WebSocket bağlanamıyor WS upgrade başarısız Proxy ayarları kontrol et, HTTP/1.1 gerekli
SDP fetch başarısız IS-05 bağlantı başarısız SDP URL erişilebilir mi kontrol et, CORS ayarları
Version uyuşmazlığı “Invalid version” hatası seconds:nanoseconds formatı kullan (örn., 1735545600:0)
UUID çakışması Duplicate resource ID Düzgün UUID’ler oluştur (github.com/google/uuid kullan)
PTP senkronize olmuyor Zamanlama sorunları PTP domain kontrol et, switch’ler PTP desteklemeli
Yüksek CPU kullanımı Registry yavaş Database index’leri ekle, connection pooling kullan

13.3 Debug Logging

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Detaylı logging'i etkinleştir
import "github.com/sirupsen/logrus"

logrus.SetLevel(logrus.DebugLevel)
logrus.SetFormatter(&logrus.JSONFormatter{})

// Handler'larınızda
logrus.WithFields(logrus.Fields{
    "node_id": node.ID,
    "version": node.Version,
    "hostname": node.Hostname,
}).Info("Node kaydedildi")

logrus.WithFields(logrus.Fields{
    "sender_id": sender.ID,
    "receiver_id": receiver.ID,
    "action": "connection",
}).Debug("IS-05 bağlantı istendi")

13.4 Ağ Debug’ı

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# tcpdump ile NMOS trafiğini yakala
sudo tcpdump -i eth0 -w nmos-traffic.pcap 'port 3210 or port 3211'

# Wireshark ile analiz et
wireshark nmos-traffic.pcap

# Multicast routing kontrol et (ST 2110 stream'leri için)
ip mroute show
netstat -g

# WebSocket bağlantılarını izle
ss -t | grep 3211

14. Yaygın Hatalar ve Best Practice’ler

14.1 Yaygın Hatalar

❌ YAPMAYIN: Sıralı ID kullanın

1
2
// Kötü
node.ID = fmt.Sprintf("node-%d", counter++)

✅ YAPIN: Düzgün UUID kullanın

1
2
3
// İyi
import "github.com/google/uuid"
node.ID = uuid.New().String()

❌ YAPMAYIN: Değişikliklerde version güncellemeyi unutun

1
2
3
// Kötü - version aynı kalıyor
node.Label = "Yeni Label"
registry.RegisterNode(node)

✅ YAPIN: Version timestamp’ini güncelleyin

1
2
3
4
// İyi
node.Label = "Yeni Label"
node.Version = CreateVersion() // Mevcut timestamp'e günceller
registry.RegisterNode(node)

❌ YAPMAYIN: Heartbeat hatalarını görmezden gelin

1
2
// Kötü - sessiz başarısızlık
sendHeartbeat() // Hatayı görmezden gelir

✅ YAPIN: Heartbeat hatalarını yönetin

1
2
3
4
5
// İyi
if err := sendHeartbeat(); err != nil {
    log.Printf("Heartbeat başarısız: %v, yeniden kaydediliyor...", err)
    node.Start() // Yeniden kaydet
}

14.2 Performance Optimizasyon İpuçları

  1. Database Index’leri (PostgreSQL):
1
2
3
4
CREATE INDEX idx_nodes_id ON nodes(id);
CREATE INDEX idx_senders_device_id ON senders(device_id);
CREATE INDEX idx_resources_type ON resources(type);
CREATE INDEX idx_health_updated_at ON health(updated_at);
  1. Connection Pooling:
1
2
3
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(25)
db.SetConnMaxLifetime(5 * time.Minute)
  1. Caching (Redis):
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Sık erişilen kaynakları önbelleğe al
func (r *Registry) GetSenderCached(id string) (*Sender, error) {
    // Önce cache'i dene
    cached, err := redisClient.Get(ctx, "sender:"+id).Result()
    if err == nil {
        var sender Sender
        json.Unmarshal([]byte(cached), &sender)
        return &sender, nil
    }
    
    // Cache miss, DB'den al
    sender := r.GetSender(id)
    data, _ := json.Marshal(sender)
    redisClient.Set(ctx, "sender:"+id, data, 30*time.Second)
    return sender, nil
}

14.3 Güvenlik Best Practice’leri

  • ✅ Production’da her zaman TLS kullanın (https://)
  • ✅ IS-10 OAuth2 authentication implement edin
  • ✅ Tüm input’ları doğrulayın (JSON schema validation)
  • ✅ API endpoint’lerini rate limit edin
  • ✅ Güçlü UUID’ler kullanın (tahmin edilemez)
  • ✅ Log çıktısını temizleyin (hassas veri yok)
  • ✅ CORS’u düzgün implement edin
  • ✅ Secrets management kullanın (Vault, K8s secrets)

14.4 Ölçeklenebilirlik İpuçları

Büyük Tesisler İçin (1000+ cihaz):

  1. Kaynak türüne göre shard’lama:
1
2
3
// Farklı kaynak türleri için ayrı database'ler
nodesDB := connectDB("nodes_db")
sendersDB := connectDB("senders_db")
  1. Event’ler için message queue kullan:
1
2
3
4
// In-memory channel'lar yerine, Kafka/RabbitMQ kullan
func (r *Registry) PublishEvent(event ResourceUpdate) {
    kafkaProducer.Send("nmos-events", event)
}
  1. Load balancer ile horizontal ölçekleme:
1
2
3
4
5
                    ┌─> Registry Instance 1
Client -> LB -> ┼─> Registry Instance 2
                    └─> Registry Instance 3
                    Paylaşılan PostgreSQL

15. SDI’dan NMOS’a Geçiş: Adım Adım Rehber

15.1 Geçiş Stratejisi Genel Bakış

SDI’dan NMOS/ST 2110’a geçiş “big bang” olmak zorunda değil. İşte pratik bir aşamalı yaklaşım:

1
2
3
4
5
6
7
Faz 1: Değerlendirme (1-2 ay)
Faz 2: Pilot (2-3 ay)
Faz 3: Hibrit İşletme (6-12 ay)
Faz 4: Tam IP Geçişi (6-12 ay)

15.2 Faz 1: Değerlendirme

Amaç: Mevcut altyapıyı anlamak ve geçişi planlamak

Görevler:

  1. SDI Ekipman Envanteri
1
2
3
4
5
# Bir spreadsheet oluşturun:
- Cihaz adı/türü
- I/O port sayısı
- Sinyal yolları (kimin kime bağlandığı)
- Kritik vs kritik olmayan cihazlar
  1. Sinyal Akışlarını Haritalama
1
2
3
Örnek mevcut akış:
Kamera 1 (SDI)  Router  Encoder  Playout
Kamera 2 (SDI)  Router  Monitor Duvarı
  1. Ağ Değerlendirmesi
1
2
3
4
5
6
7
8
# Mevcut ağ altyapısını kontrol edin:
- Switch kapasitesi (ST 2110 için minimum 10GbE gerekli)
- PTP desteği (IEEE 1588)
- Multicast routing yetenekleri
- Bant genişliği hesaplamaları

# Örnek: 1080p50 sıkıştırılmamış (ST 2110-20)
# = stream başına ~1.66 Gbps
  1. Yükseltme Önceliklerini Belirleyin
  • Kritik olmayan yollardan başlayın (monitoring, QC)
  • Kritik yolları (on-air playout) daha sonra bırakın
  • Hızlı kazanımları belirleyin (örn., SDI dağıtım amplifikatörlerini IP multicast ile değiştirin)

Çıktı: Zaman çizelgesi ve bütçe ile geçiş yol haritası

15.3 Faz 2: Pilot Deployment

Amaç: Teknolojiyi küçük ölçekli deployment ile doğrulama

Görevler:

  1. NMOS Registry Deploy Edin
1
2
3
4
5
# Kolay kurulum için Docker kullanın
docker-compose up -d nmos-registry

# Registration API'yi doğrulayın
curl http://registry:3210/health
  1. 2-3 IP Cihaz Kurun
  • Monitoring/multiviewer sistemleriyle başlayın
  • Bunlar genellikle on-air için kritik değildir
  1. SDI ↔ IP Gateway Ekleyin
1
2
3
SDI Kamera → Gateway → NMOS Registry
           IP Monitor (test)

Önerilen Gateway’ler:

  • Grass Valley IQUCP25
  • Evertz 570IPG
  • Embrionix emSFP
  1. Workflow’ları Doğrulayın
1
2
3
4
5
6
# Test kontrol listesi:
✓ Cihaz otomatik keşfi
✓ IS-05 ile bağlantı kurma
✓ IS-08 ile ses yönlendirmesi
✓ PTP senkronizasyonu
✓ Failover senaryoları

Süre: 2-3 ay
Bütçe: $50k-$100k (küçük pilot)

15.4 Faz 3: Hibrit İşletme

Amaç: SDI ve IP’yi paralel çalıştırma

Mimari:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
┌─────────────────────────────────────────┐
│         NMOS Registry (HA)              │
│   Primary: 10.0.1.10   Backup: 10.0.1.11│
└─────────────────────────────────────────┘
    ┌─────────┴─────────┐
    ↓                   ↓
[IP Cihazlar]      [SDI→IP Gateway'ler]
    ↓                   ↓
 ST 2110              SDI Cihazlar
  Akışlar              (Legacy)
    ↓                   ↓
    └─────────┬─────────┘
      [IP→SDI Gateway]
       Legacy Playout

Temel Stratejiler:

  1. Kritik Yolları Başlangıçta SDI’da Tutun
1
2
3
4
5
On-Air Zinciri (SDI'da tutun):
Canlı Kamera → SDI Router → Master Control → Playout

Kritik Olmayan (IP'ye taşıyın):
Monitoring, editing, QC, grafikler
  1. Gateway’leri Stratejik Olarak Kullanın
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Örnek: SDI kamerayı IP workflow'a köprüleyin
type SDItoIPGateway struct {
    SDIInput  string
    IPOutput  *registry.Sender
    NMOSNode  *client.NMOSNode
}

func (g *SDItoIPGateway) Start() {
    // Gateway'i NMOS node olarak kaydedin
    g.NMOSNode.AddVideoSender(
        "Gateway SDI Input 1",
        "http://gateway:8080/sdp/input1.sdp",
    )
    g.NMOSNode.Start()
}
  1. Personeli Eğitin
  • Eski yol: Fiziksel patch panel
  • Yeni yol: Yazılım kontrolü (NMOS controller UI)

Süre: 6-12 ay
Bütçe: $200k-$500k (tesis boyutuna bağlı)

15.5 Faz 4: Tam IP Geçişi

Amaç: Tüm SDI’yi kaldır, saf IP tesisi

Görevler:

  1. SDI Cihazları Değiştirin
1
2
3
4
Eski: 10 SDI kamera → Yeni: NMOS'lu 10 IP kamera
Eski: SDI router → Yeni: 10GbE ağ switch'leri
Eski: SDI monitoring → Yeni: IP multiviewer
Eski: SDI encoder → Yeni: Yazılım encoder (NMOS-aware)
  1. Gateway’leri Devre Dışı Bırakın
  • SDI→IP gateway’lerini kaldırın
  • Eski SDI ekipmanını satın/yeniden kullanın
  1. Ağı Optimize Edin
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# Son ağ tasarımı
core_switches:
  - model: Arista 7280R3
    capacity: 100GbE
    ptp: true
    
edge_switches:
  - model: Cisco Nexus 93180YC-FX
    ports: 48x 25GbE
    ptp: true
    igmp_snooping: true

# VLAN tasarımı
vlans:
  management: 10
  st2110_video: 100
  st2110_audio: 101
  control: 200
  1. Otomasyon ve Orkestrasyon
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// NMOS ile tam workflow otomasyonu
func AutomateNewsShow() {
    // 1. Kullanılabilir kameraları keşfet
    cameras := queryRegistry("senders?format=video")
    
    // 2. Kamera 1'i program çıkışına yönlendir
    connectSenderToReceiver(cameras[0].ID, programReceiverID)
    
    // 3. Tüm kameraları multiviewer'a yönlendir
    for _, cam := range cameras {
        connectToMultiviewer(cam.ID)
    }
    
    // 4. Ses yönlendirmesini yapılandır (mikrofonlar mikser'e)
    audioSources := queryRegistry("senders?format=audio")
    for i, src := range audioSources {
        routeAudioChannel(src.ID, mixerInputs[i])
    }
}

Süre: 6-12 ay
Bütçe: $500k-$2M (tam tesis değiştirme)

15.6 Geçiş Kontrol Listesi

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
## Geçiş Öncesi
- [ ] Ağ altyapısı denetimi (minimum 10GbE)
- [ ] PTP-uyumlu switch'ler kurulu
- [ ] Personel eğitimi tamamlandı
- [ ] Yedekleme/geri alma planı dokümante edildi

## Geçiş Sırasında
- [ ] NMOS registry deploy edildi (HA)
- [ ] DNS-SD/mDNS yapılandırıldı
- [ ] Monitoring sistemi (Prometheus/Grafana)
- [ ] Önce kritik olmayan yollarda bağlantıları test edin
- [ ] Tüm cihaz konfigürasyonlarını dokümante edin

## Geçiş Sonrası
- [ ] Eski SDI ekipmanı kaldırıldı
- [ ] Dokümantasyon güncellendi
- [ ] Disaster recovery test edildi
- [ ] Performance benchmark'ları doğrulandı
- [ ] Personel yeni workflow'larla rahat

15.7 Yaygın Geçiş Hataları

Hata Etki Çözüm
PTP desteği yok Ses/video senkronizasyon sorunları Switch’lerin IEEE 1588 desteklediğinden emin olun
Yetersiz bant genişliği Frame kayıpları Bant genişliğini hesaplayın: 1080p50 = ~1.66Gbps
Yedeklilik yok Tek hata noktası Çift registry, yedekli switch’ler
Zayıf ağ tasarımı Yüksek gecikme, jitter Broadcast ağ danışmanı işe alın
Personel direnci Operasyonel sorunlar Eğitim, değişim yönetimi

15.6 Maliyet Karşılaştırması: SDI vs NMOS

Küçük Stüdyo (5 kamera, 10 I/O noktası):

Bileşen SDI Maliyeti NMOS Maliyeti Tasarruf
Router $50k $0 (ağ) $50k
Kablolama $10k $2k (Ethernet) $8k
Monitoring $30k $5k (yazılım) $25k
Toplam $90k $7k $83k

Büyük Tesis (100 kamera, 500 I/O noktası):

Bileşen SDI Maliyeti NMOS Maliyeti Tasarruf
Router’lar $500k $150k (ağ) $350k
Kablolama $200k $30k $170k
Monitoring $300k $50k $250k
Toplam $1M $230k $770k

ROI Zaman Çizelgesi: 18-24 ay


16. Performance Benchmark’ları

Gerçek dünya NMOS Registry implementasyonu performance testi.

16.1 Test Ortamı

1
2
3
4
5
6
7
Donanım:
  - Cloud: AWS c5.2xlarge (8 vCPU, 16GB RAM)
  - Storage: PostgreSQL on io2 EBS (10K IOPS)
  - Cache: Redis 7.0 (r6g.large)

Load Testing Tool: k6 (Grafana)
Test Süresi: Senaryo başına 1 saat

16.2 Registration API Performance

Senaryo Node’lar Sender’lar İstek/sn Ort. Gecikme P95 Gecikme P99 Gecikme
Küçük 100 1,000 5,000 8ms 15ms 25ms
Orta 500 5,000 4,200 18ms 28ms 45ms
Büyük 1,000 10,000 3,500 25ms 45ms 80ms
Çok Büyük 5,000 50,000 2,800 40ms 85ms 150ms

Notlar:

  • PostgreSQL connection pooling ile (25 bağlantı)
  • Redis cache etkin (30s TTL)
  • Heartbeat yükü: 100 node × 0.2 req/s = 20 req/s temel

16.3 Query API Performance

Senaryo Toplam Kaynak Sorgu Türü İstek/sn Ort. Gecikme P95 Gecikme
Küçük 10,000 GET /senders 8,000 5ms 12ms
Orta 50,000 GET /senders 6,500 12ms 25ms
Büyük 100,000 GET /senders 5,000 22ms 40ms
Filtreli 100,000 GET /senders?format=video 4,500 28ms 50ms

Optimizasyon Etkisi:

Optimizasyon Gecikme İyileştirmesi Throughput İyileştirmesi
Redis cache ekle -60% +150%
PostgreSQL index’leri -40% +80%
Connection pooling -30% +50%
Gzip sıkıştırma +5% (hafif artış) +200% (bant genişliği)

16.4 WebSocket Ölçeklenebilirliği

Eşzamanlı Bağlantılar CPU Kullanımı Bellek Kullanımı Event Teslim Süresi
100 15% 200MB <10ms
500 35% 600MB <20ms
1,000 60% 1.2GB <50ms
5,000 85% 5GB <200ms

Öneri: Registry instance başına 1,000 WebSocket bağlantısı

16.5 Heartbeat İşleme

Node’lar Heartbeat/sn CPU Kullanımı Bellek Kullanımı
100 20 5% 100MB
500 100 12% 300MB
1,000 200 20% 500MB
5,000 1,000 45% 2GB

Formül: Heartbeat oranı = (Node’lar × 0.2) req/s (her 5 saniyede)

16.6 Database Sorgu Performance’ı

En Yaygın Sorgular:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
-- Sorgu 1: Tüm sender'ları al (1000x/sn çalışır)
SELECT * FROM senders;
-- Index'lerden önce: 150ms
-- Index'lerden sonra: 8ms

-- Sorgu 2: Device'a göre sender'ları al (500x/sn çalışır)
SELECT * FROM senders WHERE device_id = ?;
-- Index'den önce: 200ms
-- Index'den sonra: 5ms

-- Sorgu 3: Tür ve label'a göre kaynakları al (300x/sn çalışır)
SELECT * FROM resources WHERE type = ? AND label LIKE ?;
-- Index'den önce: 300ms
-- Index'den sonra: 15ms

Kritik Index’ler:

1
2
3
4
5
6
CREATE INDEX idx_senders_id ON senders(id);
CREATE INDEX idx_senders_device_id ON senders(device_id);
CREATE INDEX idx_resources_type ON resources(type);
CREATE INDEX idx_resources_label ON resources(label);
CREATE INDEX idx_health_updated_at ON health(updated_at);
CREATE INDEX idx_nodes_version ON nodes(version DESC);

16.7 Ölçekleme Önerileri

Benchmark’lara dayalı:

Tesis Boyutu Kaynaklar Registry Instance’ları Database Cache
Küçük <10k 1 (HA yok) PostgreSQL (small) Opsiyonel
Orta 10k-50k 2 (HA) PostgreSQL (medium) + Read replicas Redis
Büyük 50k-200k 3+ (HA) PostgreSQL (large) + Read replicas Redis Cluster
Enterprise 200k+ 5+ (HA) PostgreSQL Cluster + Sharding Redis Cluster

Dikey Ölçekleme (Tek Instance):

  • 100k kaynağa kadar: c5.2xlarge (8 vCPU, 16GB RAM)
  • 200k kaynağa kadar: c5.4xlarge (16 vCPU, 32GB RAM)

Yatay Ölçekleme (Birden Fazla Instance):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
                Load Balancer (Round Robin)
        ┌───────────────┼───────────────┐
        ↓               ↓               ↓
   Registry 1      Registry 2      Registry 3
        ↓               ↓               ↓
        └───────────────┼───────────────┘
              PostgreSQL Primary
                   ↓       ↓
           Read Replica 1  Read Replica 2

16.8 Gerçek Dünya Production İstatistikleri

BBC Production Tesisi (kamuya açık bilgilere dayalı tahmin):

  • 500+ NMOS-etkin cihaz
  • 30,000+ kayıtlı kaynak
  • 200+ WebSocket bağlantısı (controller’lar, monitoring)
  • %99.99 uptime gereksinimi
  • 3-node registry cluster (active-active-active)

Performance:

  • Query gecikmesi: <20ms (p95)
  • Registration gecikmesi: <30ms (p95)
  • Event teslimatı: <50ms (p95)

17. Terimler Sözlüğü

A-E:

AMWA: Advanced Media Workflow Association
ANC (Ancillary Data): Görüntü dışı veri (altyazılar, timecode, vb.)

Device: Bir Node içindeki işlevsellik birimi (encoder, decoder, mixer)

DNS-SD: NMOS registry’lerini bulmak için DNS Service Discovery

Essence: Gerçek medya içeriği (video/ses örnekleri)

F-N:

Flow: Belirli encoding/format ile mantıksal medya akışı

Grain: Bireysel örnek/frame veya NMOS event mesajı

Heartbeat: Periyodik sağlık kontrolü (NMOS’ta her 5 saniye)

IS-04: NMOS Keşif ve Kayıt spesifikasyonu
IS-05: NMOS Bağlantı Yönetimi spesifikasyonu
IS-08: NMOS Kanal Haritalama spesifikasyonu
IS-09: NMOS Sistem Parametreleri spesifikasyonu
IS-10: NMOS Yetkilendirme spesifikasyonu (OAuth2)

Node: NMOS kaynaklarını barındıran fiziksel/sanal cihaz

O-Z:

PTP: Senkronizasyon için Precision Time Protocol (IEEE 1588)

Receiver: Medya akışlarını alan cihaz/arayüz

Registry: Kaynak kaydı ve keşfi için merkezi NMOS servisi

Sender: Medya akışlarını ileten cihaz/arayüz

Source: Medya içeriğinin kaynağı (kamera sensörü, dosya, vb.)

ST 2110: IP üzerinden profesyonel medya suite (video, ses, veri)
ST 2110-20: Sıkıştırılmamış video
ST 2110-30: PCM ses
ST 2110-40: Ancillary data

UUID: Universally Unique Identifier (128-bit)

WebSocket: Gerçek zamanlı NMOS event’leri için full-duplex iletişim


18. Sonuç

18.1 Ne Oluşturduk

Bu makale, production-ready Go implementasyonları ile kapsamlı bir AMWA NMOS rehberi sağladı:

✅ Temel NMOS Spesifikasyonları:

  • IS-04: WebSocket subscription’larla Keşif ve Kayıt
  • IS-05: SDP parsing ile Bağlantı Yönetimi
  • IS-08: Ses yönlendirmesi için Kanal Haritalama
  • IS-09: PTP zamanlama için Sistem Parametreleri

✅ Production Özellikleri:

  • Error handling ve recovery pattern’leri
  • Docker ve Kubernetes deployment
  • IS-10 OAuth2 authentication
  • Performance optimization (PostgreSQL, Redis, caching)
  • Prometheus/Grafana ile monitoring

✅ Pratik Bilgi:

  • SDI → NMOS geçiş stratejisi
  • Yaygın sorunlarla troubleshooting rehberi
  • Performance benchmark’ları (50k kaynak’a kadar)
  • Gerçek dünya maliyet analizi ve ROI

18.2 NMOS Neden Önemli

NMOS, broadcast workflow’larını dönüştürüyor:

Geleneksel SDI Modern NMOS
Manuel konfigürasyon Otomatik keşif
Fiziksel patching Yazılım tabanlı yönlendirme
Sınırlı ölçeklenebilirlik Cloud-native, sınırsız ölçek
Yüksek capex Düşük maliyetler
Vendor lock-in Açık standartlar, birlikte çalışabilirlik

Gerçek Etki:

  • BBC: IP’ye geçti, kablolama maliyetlerini %70 azalttı
  • Discovery Networks: Otomatik workflow’lar, %50 daha hızlı
  • Bölgesel Yayıncılar: NMOS ile cloud production, %60 maliyet tasarrufu

18.3 Okuyucular İçin Sonraki Adımlar

Başlayanlar:

  1. Docker Compose ile registry’yi kurun
  2. Bu makaledeki curl örneklerini test edin
  3. Basit bir node client deploy edin
  4. IS-05 bağlantılarıyla deney yapın

Orta Seviye:

  1. IS-07 (Event & Tally) implement edin
  2. Production güvenliği için IS-10 OAuth2 ekleyin
  3. Görsel kontrol için web UI oluşturun
  4. Mevcut broadcast otomasyonu ile entegre edin

İleri Seviye:

  1. SDN entegrasyonu için IS-06 implement edin
  2. Büyük tesisler için BCP-002-02 (Natural Grouping) ekleyin
  3. Workflow otomasyonu ile özel controller oluşturun
  4. AMWA NMOS açık kaynak projelerine katkıda bulunun

18.4 Broadcast IT’nin Geleceği

NMOS + ST 2110 şunları mümkün kılar:

Cloud-Native Broadcasting:

1
2
3
4
On-Premise Kameralar → NMOS Registry → Cloud İşleme → CDN → İzleyiciler
                    Otomatik Workflow'lar
                    (Kubernetes + NMOS)

Yazılım Tanımlı Tesisler:

  • Infrastructure as Code (Terraform + NMOS API’leri)
  • Broadcast konfigürasyonları için GitOps
  • Talebe göre otomatik ölçekleme
  • Multi-cloud yedeklilik

AI/ML Entegrasyonu:

  • İçerik analizine dayalı otomatik kamera seçimi
  • Öngörücü bakım (NMOS istatistikleri ile cihaz sağlığı)
  • Ağ koşullarına göre akıllı yönlendirme
  • Gerçek zamanlı kalite izleme

18.5 Öğrenmeye Devam Etmek İçin Kaynaklar

Resmi Spesifikasyonlar:

Açık Kaynak:

Topluluk:

18.6 Son Düşünceler

NMOS, broadcast teknolojisinde temel bir değişimi temsil ediyor. Sadece kabloları Ethernet ile değiştirmekle ilgili değil:

  • Otomasyon: Manuel patching ve konfigürasyonu ortadan kaldırın
  • Esneklik: Tesisleri saatler yerine saniyeler içinde yeniden yapılandırın
  • Ölçeklenebilirlik: Yeniden tasarım olmadan 10’dan 10,000 cihaza büyüyün
  • Birlikte Çalışabilirlik: Vendor’ları sorunsuzca karıştırıp eşleştirin
  • İnovasyon: SDI ile imkansız olan yeni workflow’ları etkinleştirin

Bu makaledeki Go implementasyonları, production broadcast sistemleri oluşturmak için sağlam bir temel sağlar. NMOS vendor-agnostic, API-first ve modern DevOps pratikleri için tasarlanmıştır—yeni nesil broadcast tesisleri için mükemmel.

Broadcast’in geleceği IP’dir. NMOS bunu pratik hale getirir.


19. Production Dikkat Edilmesi Gerekenler

19.1 Registry Keşfi için DNS-SD

Production’da, node’lar DNS-SD (mDNS veya unicast DNS) kullanarak registry’yi keşfeder:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// discovery/dns_sd.go
package discovery

import (
    "context"
    "fmt"
    "time"
    
    "github.com/grandcat/zeroconf"
)

// DiscoverRegistry mDNS ile NMOS registry bulur
func DiscoverRegistry() (string, error) {
    resolver, err := zeroconf.NewResolver(nil)
    if err != nil {
        return "", err
    }
    
    entries := make(chan *zeroconf.ServiceEntry)
    
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    // _nmos-registration._tcp servisini ara
    err = resolver.Browse(ctx, "_nmos-registration._tcp", "local.", entries)
    if err != nil {
        return "", err
    }
    
    select {
    case entry := <-entries:
        registryURL := fmt.Sprintf("http://%s:%d", entry.HostName, entry.Port)
        return registryURL, nil
    case <-ctx.Done():
        return "", fmt.Errorf("registry bulunamadı")
    }
}

19.2 Registry için PostgreSQL Depolama

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// storage/postgres.go
package storage

import (
    "database/sql"
    "encoding/json"
    
    _ "github.com/lib/pq"
)

type PostgresStorage struct {
    db *sql.DB
}

func NewPostgresStorage(connStr string) (*PostgresStorage, error) {
    db, err := sql.Open("postgres", connStr)
    if err != nil {
        return nil, err
    }
    
    return &PostgresStorage{db: db}, nil
}

func (s *PostgresStorage) SaveNode(node *registry.Node) error {
    data, _ := json.Marshal(node)
    
    _, err := s.db.Exec(`
        INSERT INTO nodes (id, data, updated_at)
        VALUES ($1, $2, NOW())
        ON CONFLICT (id) DO UPDATE SET data = $2, updated_at = NOW()
    `, node.ID, data)
    
    return err
}

func (s *PostgresStorage) GetNodes() ([]*registry.Node, error) {
    rows, err := s.db.Query("SELECT data FROM nodes")
    if err != nil {
        return nil, err
    }
    defer rows.Close()
    
    var nodes []*registry.Node
    for rows.Next() {
        var data []byte
        rows.Scan(&data)
        
        var node registry.Node
        json.Unmarshal(data, &node)
        nodes = append(nodes, &node)
    }
    
    return nodes, nil
}

19.3 Prometheus ile Monitoring

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// monitoring/metrics.go
package monitoring

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    registeredNodes = promauto.NewGauge(prometheus.GaugeOpts{
        Name: "nmos_registered_nodes",
        Help: "Kayıtlı NMOS node sayısı",
    })
    
    registeredSenders = promauto.NewGauge(prometheus.GaugeOpts{
        Name: "nmos_registered_senders",
        Help: "Kayıtlı sender sayısı",
    })
    
    activeConnections = promauto.NewGauge(prometheus.GaugeOpts{
        Name: "nmos_active_connections",
        Help: "Aktif IS-05 bağlantı sayısı",
    })
)

func UpdateMetrics(reg *registry.Registry) {
    registeredNodes.Set(float64(len(reg.GetNodes())))
    registeredSenders.Set(float64(len(reg.GetSenders())))
}

19.1 Test ve Debug

19.1.1 NMOS Testing Tool

AMWA resmi bir test aracı sağlar: nmos-testing

1
2
3
4
5
6
7
# nmos-testing yükle
git clone https://github.com/AMWA-TV/nmos-testing.git
cd nmos-testing
pip install -r requirements.txt

# Registry'nize karşı testleri çalıştırın
python nmos-test.py --host localhost --port 3210 --version v1.3

19.1.2 Debug İpuçları

  1. Wireshark: Keşfi görmek için mDNS trafiğini yakalayın
  2. Postman: API endpoint’lerini manuel olarak test edin
  3. WebSocket Inspector: Gerçek zamanlı güncellemeleri izleyin
  4. Logging: Kayıt akışını takip etmek için yapılandırılmış logging ekleyin
1
2
3
4
5
6
7
// Yapılandırılmış logging için logrus veya zap kullanın
import "github.com/sirupsen/logrus"

logrus.WithFields(logrus.Fields{
    "node_id": node.ID,
    "action":  "register",
}).Info("Node kaydedildi")

19.2 NMOS’u Eylem Halinde Görselleştirme

19.2.1 Önerilen Görselleştirmeler

Bu makale implementasyona odaklansa da, görsel yardımcılar anlamayı büyük ölçüde kolaylaştırır:

1. WebSocket Gerçek Zamanlı Güncellemeler Demo’su

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# Bunu eylem halinde kaydedin:
# Terminal 1: Registry'yi başlat
./nmos-registry

# Terminal 2: WebSocket event'lerini izle
wscat -c ws://localhost:3211/x-nmos/query/v1.3/subscriptions/xxx/ws

# Terminal 3: Yeni bir sender kaydet
curl -X POST http://localhost:3210/x-nmos/registration/v1.3/resource -d @sender.json

# Event'in Terminal 2'de anında göründüğünü görün!

2. Grafana Dashboard Screenshot

Önerilen paneller:

  • Kayıtlı kaynaklar (timeseries)
  • Query API gecikmesi (gauge)
  • Heartbeat başarı oranı (%)
  • WebSocket bağlantıları (graph)
  • Database sorgu zamanı (heatmap)

3. NMOS Controller Web UI

Popüler açık kaynak controller’lar:

4. Network Packet Capture

NMOS için Wireshark filtreleri:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# mDNS keşfi
dns.qry.name contains "nmos-registration"

# Registration API
http.host == "localhost:3210"

# Query API
http.host == "localhost:3211"

# WebSocket
websocket

19.2.2 Canlı Demo Kurulumu

Hızlı demo ortamı:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# Demo repo'yu klonla (varsayımsal)
git clone https://github.com/yourusername/nmos-demo
cd nmos-demo

# Her şeyi Docker Compose ile başlat
docker-compose up -d

# Erişim:
# - Registry: http://localhost:3210, http://localhost:3211
# - Grafana: http://localhost:3000 (admin/admin)
# - Mock cihazlar: 5 sanal kamera, 3 monitör

# Grafana dashboard'unda otomatik kaydı izle

20. Gerçek Dünya Kullanım Senaryoları

20.1 Canlı Haber Prodüksiyonu

Senaryo: 5 kamera, grafik sistemi, ses mikseri olan haber stüdyosu

NMOS Faydaları:

  • Otomatik Keşif: Tüm cihazlar kendilerini kaydeder
  • Hızlı Yeniden Konfigürasyon: Kamera feed’leri arasında anında geçiş IS-05 ile
  • Ses Yönlendirmesi: Belirli mikrofonları belirli çıkışlara yönlendirme IS-08 ile
  • Merkezi Kontrol: Bir controller tüm tesisi yönetir

20.2 Spor Yayıncılığı

Senaryo: 20+ kamera, yorum pozisyonları, OB van olan stadyum

NMOS Faydaları:

  • Ölçeklenebilirlik: Yüzlerce sender/receiver’ı yönet
  • Uzaktan Prodüksiyon (REMI): Kameralar stadyumda, prodüksiyon merkezi tesiste
  • Yedeklilik: IS-04 heartbeat monitoring kullanarak otomatik failover
  • Multi-viewer: Controller’lar monitoring duvarları için tüm kaynakları keşfedebilir

20.3 Cloud Production

Senaryo: On-premise kameralar ile cloud-tabanlı video işleme

NMOS Faydaları:

  • Hibrit Workflow’lar: On-premise node’lar cloud registry ile kaydolur
  • Dinamik Ölçekleme: Cloud instance’ları ekle/kaldır, otomatik kayıt
  • API-First: Orkestrasyon sistemleriyle (Kubernetes, vb.) kolay entegrasyon

21. Referanslar ve Kaynaklar

21.1 Resmi Spesifikasyonlar

AMWA NMOS:

SMPTE Standartları:

İlgili Standartlar:

  • AES67 - IP Üzerinden Ses
  • IEEE 1588 - Precision Time Protocol
  • RFC 4566 - SDP: Session Description Protocol
  • RFC 3550 - RTP: Real-time Transport Protocol

21.2 Açık Kaynak Projeleri

NMOS Implementasyonları:

  • nmos-cpp - Sony’nin production C++ implementasyonu
  • nmos-js - JavaScript kütüphanesi
  • pynmos - BBC’nin Python kütüphanesi
  • nmos-testing - Resmi uygunluk testleri

NMOS Controller’lar:

Araçlar ve Yardımcı Programlar:

21.3 Kullanılan Go Kütüphaneleri

Temel Bağımlılıklar:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
require (
    github.com/gorilla/mux v1.8.1          // HTTP routing
    github.com/gorilla/websocket v1.5.1    // WebSocket desteği
    github.com/google/uuid v1.5.0          // UUID oluşturma
    github.com/lib/pq v1.10.9              // PostgreSQL driver
    github.com/go-redis/redis/v8 v8.11.5   // Redis client
    github.com/sirupsen/logrus v1.9.3      // Yapılandırılmış logging
    github.com/prometheus/client_golang v1.18.0 // Metrikler
    github.com/golang-jwt/jwt/v5 v5.2.0    // JWT işleme
    github.com/grandcat/zeroconf v1.0.0    // mDNS/DNS-SD
    gorm.io/gorm v1.25.5                   // ORM
    gorm.io/driver/postgres v1.5.4         // GORM PostgreSQL
)

21.4 Kitaplar ve Yayınlar

Broadcast IP:

  • “IP in Broadcast Production” - Gerard Phillips
  • “Networked Media Systems” - Brad Gilmer
  • “Cloud-Based Broadcasting” - Tony Brown

Go Programlama:

  • “The Go Programming Language” - Donovan & Kernighan
  • “Concurrency in Go” - Katherine Cox-Buday
  • “Building Microservices with Go” - Nic Jackson

21.5 Online Kurslar ve Eğitim

Broadcast IP:

Go Programlama:

21.6 Topluluklar ve Forumlar

NMOS/Broadcast:

Go:

21.7 Faydalı Araçlar

Geliştirme:

  • Postman - API testing
  • wscat - WebSocket testing
  • jq - JSON işleme
  • k6 - Load testing

Ağ:

Monitoring:

21.8 İlgili Makaleler (Yazar Tarafından)

21.9 Güncel Kalın

Abone olun:

Konferanslar:

  • NAB Show (Las Vegas, Nisan)
  • IBC (Amsterdam, Eylül)
  • SMPTE Technical Conference

Sorularınız veya geri bildiriminiz mi var? GitHub veya LinkedIn‘de bana ulaşabilirsiniz.


Yasal Uyarı: Kod örnekleri eğitim amaçlıdır. Production kullanımından önce kapsamlı test yapın. NMOS spesifikasyonları güncellemelere tabidir—her zaman en son yayınlanan versiyonlara başvurun.