Skip to content

Thiết kế: Livestream Feature

1. Tổng quan kiến trúc

flowchart TB
    subgraph Client
        Viewer["Viewer App"]
        Streamer["Streamer App"]
        OBS["OBS Studio"]
    end

    subgraph Livestream["livestreamweb module (port 8086)"]
        LC["LivestreamController\n/api/livestream/*"]
        LGC["LivestreamGiftController\n/api/livestream-gift/*"]
        ATS["AgoraTokenService\n(@Cacheable credentials)"]
        LFS["LivestreamFirebaseService"]
    end

    subgraph GiftPipeline["Gift Aggregation Pipeline"]
        Redis[("Redis\nlivestream:pending_gifts:{streamerId}")]
        GAJ["GiftAggregationJob\nfixedDelay=5000ms"]
        MQ[("ActiveMQ\nlivestream-viewer-gift-firebase-update\nJMSXGroupID=streamerId")]
        JL["JMSLivestreamViewerGiftFirebaseUpdateListener"]
    end

    subgraph Firebase["Firebase RTDB"]
        FB_Live["livestreams/{id}\n(counters, viewers, gifts)"]
        FB_Cmd["commands/\n(room sync)"]
        FB_Evt["event_types/\n(game events)"]
        FB_Yell["yell_points_history/"]
    end

    subgraph FBListener["firebaselistener module (port 8087)"]
        FCL["FirebaseCommandListener"]
        FEL["FirebaseEventTypesListener"]
        FLL["FirebaseLivestreamEventListener"]
        FYL["FirebaseLivestreamsYellPointListener"]
    end

    Viewer -->|"viewer-get-livestream"| LC
    Streamer -->|"get-by-user-id"| LC
    LC --> ATS
    LC --> LFS
    LFS -->|"read counters"| FB_Live

    Viewer -->|"send gift"| LGC
    LGC -->|"INCRBY"| Redis
    GAJ -->|"GETSET per streamer"| Redis
    GAJ -->|"JMSXGroupID=streamerId"| MQ
    MQ --> JL
    JL -->|"deposit B-Coin"| Core["UserSystemCoinService"]
    JL -->|"update counters"| LFS
    LFS -->|"write"| FB_Live

    FB_Cmd --> FCL
    FB_Evt --> FEL
    FB_Live --> FLL
    FB_Yell --> FYL
    FCL & FEL & FLL & FYL --> Core2["application-core handlers"]

2. Gift aggregation sequence

sequenceDiagram
    participant V1 as Viewer 1
    participant V2 as Viewer 2
    participant LGC as LivestreamGiftController
    participant R as Redis
    participant GAJ as GiftAggregationJob
    participant Q as ActiveMQ
    participant JL as JMS Gift Listener
    participant Coin as UserSystemCoinService
    participant FB as Firebase

    par Viewer 1 sends gift
        V1->>LGC: POST /api/livestream-gift/send\n{streamerId, giftId, value=100}
        LGC->>R: INCRBY livestream:pending_gifts:{streamerId} 100
    and Viewer 2 sends gift
        V2->>LGC: POST /api/livestream-gift/send\n{streamerId, giftId, value=200}
        LGC->>R: INCRBY livestream:pending_gifts:{streamerId} 200
    end

    Note over GAJ: 5 seconds later (fixedDelay)
    GAJ->>R: scan keys "livestream:pending_gifts:*"
    GAJ->>R: GETSET livestream:pending_gifts:{streamerId} 0
    R-->>GAJ: 300 (total accumulated)
    GAJ->>Q: send LivestreamGiftAggregateMessageDto\n{streamerId, totalBcoinValue=300}\nJMSXGroupID=streamerId

    Q->>JL: consume (sequential per streamerId)
    JL->>Coin: deposit(streamerId, 300)
    JL->>FB: update total_bcoins += 300
    JL->>FB: update streamer_points
    JL->>FB: update wish_orb progress

3. Agora UID scheme

flowchart LR
    UserIndex["getUserIndex(userId)\n(integer ID)"] --> Cam["uid_camera = userIndex"]
    Cam -->|"+ UID_SCREEN_PREFIX"| Screen["uid_screen = 1000000 + userIndex"]
    Screen -->|"+ UID_VIEW_PREFIX"| View["uid_view = 2000000 + userIndex"]
    Screen -->|"+ UID_VOICE_PREFIX"| Voice["uid_voice = 3000000 + userIndex"]
    UserIndex -->|"+ UID_SUB_CAMERA_PREFIX"| Sub["uid_sub_camera = 4000000 + userIndex\n(for viewer watching sub-channel)"]

4. viewer-get-livestream performance design

sequenceDiagram
    participant C as Client
    participant LC as LivestreamController
    participant FBBan as FirebaseLivestreamBanHandler (async)
    participant LS as LivestreamService
    participant US as UserService
    participant ATS as AgoraTokenService (cached)
    participant LFS as LivestreamFirebaseService (cached)

    C->>LC: GET /api/livestream/viewer-get-livestream/{id}

    par Non-blocking ban check
        LC->>FBBan: getBannedLivestreamsByUserIdAsync() [fire-and-forget]
    and Main flow
        LC->>LS: findById(id)
        LS-->>LC: LivestreamModel
        LC->>US: getSimpleUserById(streamerId)
        US-->>LC: UserModel
        LC->>US: getCurrentUserIndex(userId)
        US-->>LC: uid (integer)
        LC->>ATS: getAppId() [cached]
        LC->>ATS: getAppCertificate() [cached]
        ATS-->>LC: credentials
        LC->>ATS: generateTokenWithCredentials(channel, uid, SUBSCRIBER)
        LC->>LFS: getLivestreamDataByChildNodes(id) [cached]
        LFS-->>LC: LivestreamFirebaseDto
    end

    LC-->>C: ViewerGetLivestreamResp (with agora + firebase data)

5. Firebase listener pattern

// FirebaseCommandListener pattern
// Listens to Firebase RTDB changes and processes commands

@Service
public class FirebaseCommandListener {

    @PostConstruct
    public void startListening() {
        // Attach Firebase listener to "commands/" path
        firebase.getReference("commands/" + envPrefix)
            .addChildEventListener(new ChildEventListener() {
                @Override
                public void onChildAdded(DataSnapshot snapshot, String previousChildName) {
                    processCommand(snapshot);
                    snapshot.getRef().removeValue(); // clean up after processing
                }
            });
    }

    private void processCommand(DataSnapshot snapshot) {
        // Route to appropriate handler (roomSyncRegister, roomSyncCancel, etc.)
        String commandType = snapshot.child("type").getValue(String.class);
        commandRouter.route(commandType, snapshot.getValue());
    }
}

6. Performance notes

Bottleneck Mitigation
Agora credentials DB lookup per request @Cacheable on getAppId() and getAppCertificate()
Firebase read per viewer request getLivestreamDataByChildNodes() with caching
Gift processing per individual gift Redis aggregation → batch JMS
Ban check blocking response Async fire-and-forget
Multiple DB queries in viewer response Parallel fetches where possible

7. Điểm cần chú ý

# Vấn đề Chi tiết
1 LOGGER.error dùng cho debug log viewer-get-livestream dùng LOGGER.error cho performance logs → cần đổi sang DEBUG
2 GiftAggregationJob fixedDelay Default 5000ms nhưng configurable → review theo traffic
3 Firebase listener cleanup snapshot.getRef().removeValue() sau khi process → đảm bảo đúng
4 Room sync v1 vs v2 Có cả /room-sync-register (v1 qua Firebase) và /sync-register (v2 via handler)