İçerikler

Go ile Güvenli HLS Stream Yöneticisi

Go ile Güvenli HLS Stream Yöneticisi

Giriş

Bu yazımızda, Go programlama dili kullanılarak geliştirilen güvenli bir HLS (HTTP Live Streaming) Akış Yöneticisi’nin backend uygulamasını detaylı bir şekilde inceleyeceğiz. Projemiz, canlı video akışlarını güvenli bir şekilde yönetmek ve kullanıcılara sunmak için tasarlanmıştır.

Kullanılan Teknolojiler

Projemiz aşağıdaki temel teknolojileri kullanmaktadır:

  • Go: Ana programlama dili
  • Fiber: Hızlı ve verimli bir HTTP web framework’ü
  • GORM: Go için ORM (Object-Relational Mapping) kütüphanesi
  • JWT: JSON Web Token ile kimlik doğrulama
  • SQLite: Veritabanı olarak
  • bcrypt: Şifre hashleme için

Proje Yapısı

Projemiz dört ana bileşenden oluşmaktadır:

  1. Kullanıcı Yönetimi
  2. Akış URL’lerinin Yönetimi
  3. Kimlik Doğrulama ve Yetkilendirme
  4. HLS Akış Proxy’si

Her bir bileşeni detaylı olarak inceleyelim.

1. Kullanıcı Yönetimi

Kullanıcı yönetimi, User struct’ı ve ilgili fonksiyonlarla gerçekleştirilir:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
type User struct {
	ID       uint   `gorm:"primaryKey"`
	Username string `gorm:"unique"`
	Password string
}

func register(c *fiber.Ctx) error {
	log.Println("Received registration request")
	var user User
	if err := c.BodyParser(&user); err != nil {
		log.Println("Failed to parse registration request:", err)
		return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Cannot parse JSON"})
	}

	hashedPassword, err := bcrypt.GenerateFromPassword([]byte(user.Password), bcrypt.DefaultCost)
	if err != nil {
		log.Println("Failed to hash password:", err)
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Cannot hash password"})
	}

	user.Password = string(hashedPassword)

	if err := db.Create(&user).Error; err != nil {
		log.Println("Failed to create user:", err)
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Cannot create user"})
	}

	log.Println("User registered successfully:", user.Username)
	return c.Status(fiber.StatusCreated).JSON(fiber.Map{"message": "User created successfully"})
}

func login(c *fiber.Ctx) error {
	log.Println("Received login request")
	var loginData struct {
		Username string `json:"username"`
		Password string `json:"password"`
	}

	if err := c.BodyParser(&loginData); err != nil {
		log.Println("Failed to parse login request:", err)
		return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Cannot parse JSON"})
	}

	var user User
	if err := db.Where("username = ?", loginData.Username).First(&user).Error; err != nil {
		log.Println("User not found:", loginData.Username)
		return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{"error": "Invalid credentials"})
	}

	if err := bcrypt.CompareHashAndPassword([]byte(user.Password), []byte(loginData.Password)); err != nil {
		log.Println("Invalid password for user:", loginData.Username)
		return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{"error": "Invalid credentials"})
	}

	sessionID := generateSessionID()
	token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{
		"username":  user.Username,
		"sessionID": sessionID,
		"exp":       time.Now().Add(sessionTimeout).Unix(),
	})

	tokenString, err := token.SignedString(jwtSecret)
	if err != nil {
		log.Println("Failed to generate token:", err)
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Could not generate token"})
	}

	activeSessionsMutex.Lock()
	if _, exists := activeSessionsPerUser[user.Username]; !exists {
		activeSessionsPerUser[user.Username] = make(map[string]time.Time)
	}
	activeSessionsPerUser[user.Username][sessionID] = time.Now().Add(sessionTimeout)
	activeSessionsMutex.Unlock()

	log.Println("User logged in successfully:", user.Username)
	return c.JSON(fiber.Map{"token": tokenString})
}

Bu bölümde, kullanıcı kaydı ve girişi işlemleri gerçekleştirilir. Şifreler bcrypt ile güvenli bir şekilde hashlenir ve JWT token’ları oluşturulur.

2. Akış URL’lerinin Yönetimi

Akış URL’leri, StreamURL struct’ı ile yönetilir:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
type StreamURL struct {
	ID       uint   `gorm:"primaryKey"`
	BaseURL  string `gorm:"not null"`
	EndPoint string `gorm:"not null"`
	UserID   uint   `gorm:"not null"`
}

func getAllStreams(c *fiber.Ctx) error {
	log.Println("Received request to get all streams")
	user := c.Locals("user").(*jwt.Token)
	claims := user.Claims.(jwt.MapClaims)
	username := claims["username"].(string)

	var userData User
	if err := db.Where("username = ?", username).First(&userData).Error; err != nil {
		log.Println("User not found:", username)
		return c.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "User not found"})
	}

	var streams []StreamURL
	if err := db.Where("user_id = ?", userData.ID).Find(&streams).Error; err != nil {
		log.Println("Failed to fetch streams for user:", username, err)
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Database error"})
	}

	log.Printf("Fetched %d streams for user: %s", len(streams), username)
	return c.JSON(fiber.Map{"streams": streams})
}

func updateStreamURL(c *fiber.Ctx) error {
	log.Println("Received request to update stream URL")
	user := c.Locals("user").(*jwt.Token)
	claims := user.Claims.(jwt.MapClaims)
	username := claims["username"].(string)

	var userData User
	if err := db.Where("username = ?", username).First(&userData).Error; err != nil {
		log.Println("User not found:", username)
		return c.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "User not found"})
	}

	var streamData struct {
		StreamID uint   `json:"streamID"`
		NewURL   string `json:"newUrl"`
	}
	if err := c.BodyParser(&streamData); err != nil {
		log.Println("Failed to parse stream update request:", err)
		return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Cannot parse JSON"})
	}

	baseURL, endPoint := splitURL(streamData.NewURL)

	var streamURL StreamURL
	result := db.Where("id = ? AND user_id = ?", streamData.StreamID, userData.ID).First(&streamURL)
	if result.Error != nil {
		if result.Error == gorm.ErrRecordNotFound {
			log.Printf("Creating new stream for user %s: %d", username, streamData.StreamID)
			streamURL = StreamURL{UserID: userData.ID, BaseURL: baseURL, EndPoint: endPoint}
			db.Create(&streamURL)
		} else {
			log.Println("Database error while updating stream:", result.Error)
			return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Database error"})
		}
	} else {
		log.Printf("Updating existing stream for user %s: %d", username, streamData.StreamID)
		streamURL.BaseURL = baseURL
		streamURL.EndPoint = endPoint
		db.Save(&streamURL)
	}

	log.Printf("Stream URL updated successfully for user %s: %d", username, streamData.StreamID)
	return c.JSON(fiber.Map{"message": "Stream URL updated successfully"})
}

Bu bölümde, kullanıcıların akış URL’lerini yönetmeleri sağlanır. Kullanıcılar kendi akışlarını listeleyebilir ve güncelleyebilir.

3. Kimlik Doğrulama ve Yetkilendirme

JWT kullanarak güvenli bir kimlik doğrulama sistemi uygulanmıştır:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func generateStreamToken(username, sessionID, ip, userAgent string) string {
	tokenData := fmt.Sprintf("%s|%s|%s|%s|%d", username, sessionID, ip, userAgent, time.Now().UnixNano())
	hash := sha256.Sum256([]byte(tokenData))
	token := hex.EncodeToString(hash[:])

	tokenMutex.Lock()
	defer tokenMutex.Unlock()
	tokenCache[token] = tokenInfo{
		Username:  username,
		SessionID: sessionID,
		Expiry:    time.Now().Add(streamTokenTimeout),
		IP:        ip,
		UserAgent: userAgent,
	}

	log.Printf("Generated stream token for user %s: %s", username, token)
	return token
}

func isValidToken(token, ip, userAgent string) (tokenInfo, bool, string) {
	tokenMutex.RLock()
	info, exists := tokenCache[token]
	tokenMutex.RUnlock()

	if !exists {
		log.Printf("Token not found in cache: %s", token)
		return tokenInfo{}, false, ""
	}

	now := time.Now()

	activeSessionsMutex.RLock()
	sessionExpiry, sessionExists := activeSessionsPerUser[info.Username][info.SessionID]
	activeSessionsMutex.RUnlock()

	if !sessionExists || now.After(sessionExpiry) {
		log.Printf("Session expired for user %s", info.Username)
		return tokenInfo{}, false, ""
	}

	if now.After(info.Expiry) {
		if info.IP == ip && info.UserAgent == userAgent {
			newToken := generateStreamToken(info.Username, info.SessionID, ip, userAgent)
			log.Printf("Refreshed expired token for user %s: %s", info.Username, newToken)
			return tokenCache[newToken], true, newToken
		}
		log.Printf("Token expired for user %s", info.Username)
		return tokenInfo{}, false, ""
	}

	if info.IP == ip && info.UserAgent == userAgent {
		log.Printf("Valid token for user %s", info.Username)
		return info, true, ""
	}

	log.Printf("IP or User-Agent mismatch for token of user %s", info.Username)
	return tokenInfo{}, false, ""
}

Bu bölümde, akış erişimi için kısa ömürlü tokenlar oluşturulur ve doğrulanır. Bu, her istek için güvenli bir erişim sağlar.

4. HLS Akış Proxy’si

HLS akışlarını güvenli bir şekilde sunmak için bir proxy sistemi uygulanmıştır:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
func streamHLS(c *fiber.Ctx) error {
	log.Println("Received HLS stream request")
	token := c.Params("token")
	streamIDStr := c.Params("streamID")

	streamID, err := strconv.Atoi(streamIDStr)
	if err != nil {
		log.Printf("Invalid stream ID: %s", streamIDStr)
		return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Invalid stream ID"})
	}

	info, valid, newToken := isValidToken(token, c.IP(), c.Get("User-Agent"))
	if !valid {
		log.Printf("Invalid token for stream request: %s", token)
		return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{"error": "Invalid or expired token"})
	}

	if newToken != "" {
		log.Printf("Issuing new token for stream request: %s", newToken)
		c.Set("X-New-Token", newToken)
		token = newToken
	} else {
		newToken = generateStreamToken(info.Username, info.SessionID, c.IP(), c.Get("User-Agent"))
		c.Set("X-New-Token", newToken)
	}

	var streamURL StreamURL
	if err := db.Where("id = ? AND user_id = ?", streamID, getUserIDFromUsername(info.Username)).First(&streamURL).Error; err != nil {
		log.Printf("Stream URL not found for user %s and stream ID %d", info.Username, streamID)
		return c.Status(fiber.StatusNotFound).JSON(fiber.Map{"error": "Stream URL not found"})
	}

	path := c.Params("*")
	url := streamURL.BaseURL + "/" + path

	log.Printf("Fetching content from URL: %s", url)
	resp, err := http.Get(url)
	if err != nil {
		log.Printf("Failed to fetch content from URL %s: %v", url, err)
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to fetch content"})
	}
	defer resp.Body.Close()

	contentType := resp.Header.Get("Content-Type")
	c.Set("Content-Type", contentType)

	if strings.HasSuffix(path, ".m3u8") {
		log.Println("Modifying m3u8 content")
		modifiedContent, err := modifyM3U8(resp.Body, token, streamIDStr)
		if err != nil {
			log.Printf("Failed to modify m3u8 content: %v", err)
			return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to modify m3u8"})
		}
		return c.Send(modifiedContent)
	}

	log.Println("Streaming content")
	_, err = io.Copy(c.Response().BodyWriter(), resp.Body)
	if err != nil {
		log.Printf("Failed to stream content: %v", err)
		return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Failed to stream content"})
	}

	return nil
}

func modifyM3U8(content io.Reader, token, streamID string) ([]byte, error) {
	scanner := bufio.NewScanner(content)
	var modifiedContent bytes.Buffer

	for scanner.Scan() {
		line := scanner.Text()
		if strings.HasPrefix(line, "#") {
			modifiedContent.WriteString(line + "\n")
		} else if strings.HasSuffix(line, ".ts") || strings.HasSuffix(line, ".m3u8") {
			if !strings.HasPrefix(line, "http") {
				modifiedLine := fmt.Sprintf("/api/hls/%s/%s/%s\n", token, streamID, line)
				modifiedContent.WriteString(modifiedLine)
			} else {
				modifiedContent.WriteString(line + "\n")
			}
		} else {
			modifiedContent.WriteString(line + "\n")
		}
	}

	if err := scanner.Err(); err != nil {
		log.Printf("Error scanning m3u8 content: %v", err)
		return nil, err
	}

	return modifiedContent.Bytes(), nil
}

Bu bölümde, HLS akışlarının güvenli bir şekilde sunulması sağlanır. streamHLS fonksiyonu, gelen istekleri işler ve gerekli güvenlik kontrollerini yapar. modifyM3U8 fonksiyonu ise, m3u8 dosyalarını dinamik olarak değiştirerek her segment için güvenli URL’ler oluşturur.

Güvenlik Önlemleri

Projemizde çeşitli güvenlik önlemleri uygulanmıştır:

  1. Şifre Hashleme: Kullanıcı şifreleri, bcrypt kullanılarak güvenli bir şekilde hashlenir ve saklanır.

  2. JWT ile Kimlik Doğrulama: Kullanıcı oturumları, JWT token’ları kullanılarak yönetilir.

  3. Kısa Ömürlü Akış Tokenları: Her HLS isteği için özel, kısa ömürlü tokenlar oluşturulur.

  4. IP ve User-Agent Kontrolü: Token doğrulaması sırasında IP adresi ve User-Agent bilgileri kontrol edilir.

  5. HTTPS Desteği: Projeye HTTPS desteği eklenerek, tüm iletişimin şifreli olması sağlanabilir.

Performans ve Ölçeklenebilirlik

Projemiz, Go’nun concurrency özelliklerinden yararlanarak yüksek performans sağlar. Fiber framework’ünün hızlı route işleme kabiliyeti, uygulamanın çok sayıda eşzamanlı isteği işleyebilmesine olanak tanır.

Veritabanı işlemleri için GORM kullanımı, veritabanı sorgularının optimize edilmesini ve yönetilmesini kolaylaştırır. İleriki aşamalarda, veritabanı ölçeklenebilirliği için sharding veya replikasyon teknikleri uygulanabilir.

Sonuç ve Gelecek Geliştirmeler

Bu HLS Akış Yöneticisi projesi, Go’nun güçlü özelliklerini kullanarak güvenli ve ölçeklenebilir bir backend çözümü sunmaktadır. Proje, canlı video akışlarının güvenli bir şekilde yönetilmesi ve sunulması gereken senaryolarda kullanılabilir.

Gelecekteki geliştirmeler şunları içerebilir:

  1. Daha gelişmiş analitik ve izleme özellikleri
  2. Çoklu CDN desteği
  3. Otomatik ölçeklendirme için cloud-native mimariye geçiş
  4. WebSocket kullanarak gerçek zamanlı bildirimler
  5. API rate limiting ve daha gelişmiş güvenlik önlemleri

Bu proje, modern web uygulamalarında güvenli video streaming çözümleri için sağlam bir temel oluşturmaktadır.

API’yi Postman ile Test Etme ve Kullanma

HLS Akış Yöneticisi API’sini test etmek ve kullanmak için Postman’de aşağıdaki yönergeleri takip edebilirsiniz:

  1. Postman’i Ayarlama

    • Postman’i açın ve “HLS Akış Yöneticisi API” adında yeni bir koleksiyon oluşturun.
    • Koleksiyon değişkenlerinizde temel URL’yi http://localhost:8080/api olarak ayarlayın.
  2. Kullanıcı Kaydı

    • Metod: POST
    • URL: {{baseUrl}}/register
    • Gövde (raw JSON):
      1
      2
      3
      4
      
      {
        "username": "testkullanici",
        "password": "guvenliparola"
      }
      
    • İsteği gönderin ve başarılı bir yanıt aldığınızdan emin olun.
  3. Kullanıcı Girişi

    • Metod: POST
    • URL: {{baseUrl}}/login
    • Gövde (raw JSON):
      1
      2
      3
      4
      
      {
        "username": "testkullanici",
        "password": "guvenliparola"
      }
      
    • İsteği gönderin ve dönen JWT token’ını kaydedin.
  4. Kimlik Doğrulamayı Ayarlama

    • Koleksiyonunuzun Authorization sekmesinde, “Bearer Token” seçeneğini seçin ve giriş isteğinden aldığınız JWT token’ını yapıştırın.
  5. Tüm Akışları Alma

    • Metod: GET
    • URL: {{baseUrl}}/streams
    • Authorization sekmesinde Bearer Token’ın ayarlandığından emin olun.
    • Kimliği doğrulanmış kullanıcı için tüm akışları almak üzere isteği gönderin.
  6. Akış URL’sini Güncelleme

    • Metod: POST
    • URL: {{baseUrl}}/update-stream
    • Gövde (raw JSON):
      1
      2
      3
      4
      
      {
        "streamID": 1,
        "newUrl": "https://ornek.com/yeni_akis.m3u8"
      }
      
    • Mevcut bir akışı güncellemek veya yeni bir akış eklemek için isteği gönderin.
  7. Akış URL’sini Alma

    • Metod: GET
    • URL: {{baseUrl}}/stream?streamID=1
    • Belirli bir akış için güvenli URL’yi almak üzere isteği gönderin.
  8. HLS Akışına Erişim

    • Önceki istekten dönen URL’yi uyumlu bir HLS oynatıcıda veya doğrudan web tarayıcısında kullanarak akış erişimini test edin.

JWT token’ını güvenli bir şekilde kullanmayı ve asla herkese açık bir şekilde paylaşmamayı unutmayın. Kimlik doğrulama gerektiren her istek için, Authorization başlığında Bearer Token’ın doğru şekilde ayarlandığından emin olun.

Bu adımları takip ederek, HLS Akış Yöneticisi API’sini Postman kullanarak kapsamlı bir şekilde test edebilir ve etkileşimde bulunabilirsiniz. Bu süreç, işlemlerin akışını anlamanıza ve her bir endpoint’in işlevselliğini doğrulamanıza yardımcı olacaktır.