Thiết kế: Livestream Feature
1. Tổng quan kiến trúc
flowchart TB
subgraph Client
Viewer["Viewer App"]
Streamer["Streamer App"]
OBS["OBS Studio"]
end
subgraph Livestream["livestreamweb module (port 8086)"]
LC["LivestreamController\n/api/livestream/*"]
LGC["LivestreamGiftController\n/api/livestream-gift/*"]
ATS["AgoraTokenService\n(@Cacheable credentials)"]
LFS["LivestreamFirebaseService"]
end
subgraph GiftPipeline["Gift Aggregation Pipeline"]
Redis[("Redis\nlivestream:pending_gifts:{streamerId}")]
GAJ["GiftAggregationJob\nfixedDelay=5000ms"]
MQ[("ActiveMQ\nlivestream-viewer-gift-firebase-update\nJMSXGroupID=streamerId")]
JL["JMSLivestreamViewerGiftFirebaseUpdateListener"]
end
subgraph Firebase["Firebase RTDB"]
FB_Live["livestreams/{id}\n(counters, viewers, gifts)"]
FB_Cmd["commands/\n(room sync)"]
FB_Evt["event_types/\n(game events)"]
FB_Yell["yell_points_history/"]
end
subgraph FBListener["firebaselistener module (port 8087)"]
FCL["FirebaseCommandListener"]
FEL["FirebaseEventTypesListener"]
FLL["FirebaseLivestreamEventListener"]
FYL["FirebaseLivestreamsYellPointListener"]
end
Viewer -->|"viewer-get-livestream"| LC
Streamer -->|"get-by-user-id"| LC
LC --> ATS
LC --> LFS
LFS -->|"read counters"| FB_Live
Viewer -->|"send gift"| LGC
LGC -->|"INCRBY"| Redis
GAJ -->|"GETSET per streamer"| Redis
GAJ -->|"JMSXGroupID=streamerId"| MQ
MQ --> JL
JL -->|"deposit B-Coin"| Core["UserSystemCoinService"]
JL -->|"update counters"| LFS
LFS -->|"write"| FB_Live
FB_Cmd --> FCL
FB_Evt --> FEL
FB_Live --> FLL
FB_Yell --> FYL
FCL & FEL & FLL & FYL --> Core2["application-core handlers"]
2. Gift aggregation sequence
sequenceDiagram
participant V1 as Viewer 1
participant V2 as Viewer 2
participant LGC as LivestreamGiftController
participant R as Redis
participant GAJ as GiftAggregationJob
participant Q as ActiveMQ
participant JL as JMS Gift Listener
participant Coin as UserSystemCoinService
participant FB as Firebase
par Viewer 1 sends gift
V1->>LGC: POST /api/livestream-gift/send\n{streamerId, giftId, value=100}
LGC->>R: INCRBY livestream:pending_gifts:{streamerId} 100
and Viewer 2 sends gift
V2->>LGC: POST /api/livestream-gift/send\n{streamerId, giftId, value=200}
LGC->>R: INCRBY livestream:pending_gifts:{streamerId} 200
end
Note over GAJ: 5 seconds later (fixedDelay)
GAJ->>R: scan keys "livestream:pending_gifts:*"
GAJ->>R: GETSET livestream:pending_gifts:{streamerId} 0
R-->>GAJ: 300 (total accumulated)
GAJ->>Q: send LivestreamGiftAggregateMessageDto\n{streamerId, totalBcoinValue=300}\nJMSXGroupID=streamerId
Q->>JL: consume (sequential per streamerId)
JL->>Coin: deposit(streamerId, 300)
JL->>FB: update total_bcoins += 300
JL->>FB: update streamer_points
JL->>FB: update wish_orb progress
3. Agora UID scheme
flowchart LR
UserIndex["getUserIndex(userId)\n(integer ID)"] --> Cam["uid_camera = userIndex"]
Cam -->|"+ UID_SCREEN_PREFIX"| Screen["uid_screen = 1000000 + userIndex"]
Screen -->|"+ UID_VIEW_PREFIX"| View["uid_view = 2000000 + userIndex"]
Screen -->|"+ UID_VOICE_PREFIX"| Voice["uid_voice = 3000000 + userIndex"]
UserIndex -->|"+ UID_SUB_CAMERA_PREFIX"| Sub["uid_sub_camera = 4000000 + userIndex\n(for viewer watching sub-channel)"]
sequenceDiagram
participant C as Client
participant LC as LivestreamController
participant FBBan as FirebaseLivestreamBanHandler (async)
participant LS as LivestreamService
participant US as UserService
participant ATS as AgoraTokenService (cached)
participant LFS as LivestreamFirebaseService (cached)
C->>LC: GET /api/livestream/viewer-get-livestream/{id}
par Non-blocking ban check
LC->>FBBan: getBannedLivestreamsByUserIdAsync() [fire-and-forget]
and Main flow
LC->>LS: findById(id)
LS-->>LC: LivestreamModel
LC->>US: getSimpleUserById(streamerId)
US-->>LC: UserModel
LC->>US: getCurrentUserIndex(userId)
US-->>LC: uid (integer)
LC->>ATS: getAppId() [cached]
LC->>ATS: getAppCertificate() [cached]
ATS-->>LC: credentials
LC->>ATS: generateTokenWithCredentials(channel, uid, SUBSCRIBER)
LC->>LFS: getLivestreamDataByChildNodes(id) [cached]
LFS-->>LC: LivestreamFirebaseDto
end
LC-->>C: ViewerGetLivestreamResp (with agora + firebase data)
5. Firebase listener pattern
// FirebaseCommandListener pattern
// Listens to Firebase RTDB changes and processes commands
@Service
public class FirebaseCommandListener {
@PostConstruct
public void startListening() {
// Attach Firebase listener to "commands/" path
firebase.getReference("commands/" + envPrefix)
.addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot snapshot, String previousChildName) {
processCommand(snapshot);
snapshot.getRef().removeValue(); // clean up after processing
}
});
}
private void processCommand(DataSnapshot snapshot) {
// Route to appropriate handler (roomSyncRegister, roomSyncCancel, etc.)
String commandType = snapshot.child("type").getValue(String.class);
commandRouter.route(commandType, snapshot.getValue());
}
}
| Bottleneck |
Mitigation |
| Agora credentials DB lookup per request |
@Cacheable on getAppId() and getAppCertificate() |
| Firebase read per viewer request |
getLivestreamDataByChildNodes() with caching |
| Gift processing per individual gift |
Redis aggregation → batch JMS |
| Ban check blocking response |
Async fire-and-forget |
| Multiple DB queries in viewer response |
Parallel fetches where possible |
7. Điểm cần chú ý
| # |
Vấn đề |
Chi tiết |
| 1 |
LOGGER.error dùng cho debug log |
viewer-get-livestream dùng LOGGER.error cho performance logs → cần đổi sang DEBUG |
| 2 |
GiftAggregationJob fixedDelay |
Default 5000ms nhưng configurable → review theo traffic |
| 3 |
Firebase listener cleanup |
snapshot.getRef().removeValue() sau khi process → đảm bảo đúng |
| 4 |
Room sync v1 vs v2 |
Có cả /room-sync-register (v1 qua Firebase) và /sync-register (v2 via handler) |