Thiết kế: Batch & Async Processing Feature
1. Tổng quan kiến trúc
flowchart TB
subgraph Startup["Batch App Startup"]
REG["ProgrammaticEndpointRegistration\n@PostConstruct"]
end
subgraph Schedulers["Spring @Scheduled Jobs"]
GAME["Game loop:\nMatchMaking, MachineWaiting, OvertimePing"]
PAYMENT["Payment:\nCardExpired, CryptoExpired, TokenExpired,\nCardConfirm, TokenConfirm"]
NFT["NFT:\nRentalExpiry, LifeGauge, Revenue"]
LEADERBOARD["Leaderboard:\nEvent, Tournament, Daily"]
MAINTENANCE["Maintenance:\nSite, Cleanup, Archive, DeleteAccount"]
GIFT["Gift:\nGiftAggregationJob (fixedDelay=5s)"]
end
subgraph JMSQueues["ActiveMQ Artemis"]
Q_PA["Prize Allocation Queues (×9)"]
Q_MATCH["Matching Queues (per setting)"]
Q_MACHINE["Machine Queues (per machine)"]
Q_INVITE["queue-invitation"]
Q_GACHA["Gacha Queues (×5)"]
Q_LIVE["livestream-viewer-gift"]
Q_OTHER["Other queues"]
end
subgraph Listeners["JMS Listeners (batch module)"]
L_PA["Prize Allocation Listeners (×9)"]
L_MATCH["Matching Listeners (dynamic)"]
L_MACHINE["Machine Listeners (dynamic)"]
L_GACHA["Gacha Listeners (×5)"]
L_LIVE["JMSLivestreamViewerGiftFirebaseUpdateListener"]
L_RECOVERY["JMSListenerRecoveryScheduler"]
end
subgraph Firebase["Firebase RTDB"]
FB_CMD["commands/"]
FB_EVT["event_types/"]
FB_LIVE["livestreams/"]
FB_YELL["yell_points_history/"]
end
subgraph FBListeners["firebaselistener module"]
FBL_CMD["FirebaseCommandListener"]
FBL_EVT["FirebaseEventTypesListener"]
FBL_LIVE["FirebaseLivestreamEventListener"]
FBL_YELL["FirebaseLivestreamsYellPointListener"]
end
REG -->|"register listeners"| Q_MATCH & Q_MACHINE
GIFT -->|"Redis → MQ"| Q_LIVE
SCHEDULERS -->|"trigger"| Q_PA
Q_PA --> L_PA
Q_MATCH --> L_MATCH
Q_MACHINE --> L_MACHINE
Q_GACHA --> L_GACHA
Q_LIVE --> L_LIVE
L_RECOVERY -->|"monitor"| L_PA & L_MATCH & L_MACHINE
FB_CMD --> FBL_CMD
FB_EVT --> FBL_EVT
FB_LIVE --> FBL_LIVE
FB_YELL --> FBL_YELL
2. Dynamic listener registration
sequenceDiagram
participant App as Batch App (@PostConstruct)
participant REG as ProgrammaticEndpointRegistration
participant DB as MySQL
participant CTX as JmsListenerEndpointRegistry
participant MQ as ActiveMQ
App->>REG: init()
REG->>DB: SELECT all playableGameBoothSettings
loop per settingId
REG->>CTX: registerListenerContainer(\n queue-game-mode-type-pool-{settingId},\n GameModeCreateRoomJMSListener\n)
REG->>CTX: registerListenerContainer(\n matching-pvp-{settingId},\n MatchingGameModePvPJMSListener\n)
REG->>CTX: registerListenerContainer(\n matching-pvp-multi-{settingId},\n MatchingGameModePvPMultiMatchJMSListener\n)
end
REG->>DB: SELECT all machines
loop per machineId
REG->>CTX: registerListenerContainer(\n machine-start-{machineId},\n CreateMachineStartRoomJMSListener\n)
end
CTX->>MQ: create/bind queues
3. Listener recovery flow
sequenceDiagram
participant SCHED as JMSListenerRecoveryScheduler
participant REG as JmsListenerEndpointRegistry
participant CTX as MessageListenerContainer
loop Every N seconds
SCHED->>REG: getListenerContainers()
loop per container
REG-->>SCHED: container
SCHED->>CTX: isRunning()?
alt NOT running
SCHED->>CTX: start()
SCHED->>LOGGER: warn("Recovered listener: {id}")
end
end
end
4. Job scheduling patterns
fixedDelay (sequential)
// Job chờ xong lần trước mới chạy lần sau
@Scheduled(fixedDelayString = "${jobs.gift-aggregation.fixedDelay:5000}")
public void processAggregatedGifts() { ... }
// Dùng cho: GiftAggregationJob, thường những job cần sequential
fixedRate (parallel allowed)
// Job chạy theo fixed interval, có thể overlap nếu chậm
@Scheduled(fixedRate = 60000)
public void runMatchmaking() { ... }
// Thường có Spring Batch ShedLock để tránh overlap
cron
// Chạy theo cron expression
@Scheduled(cron = "${batch.cron.archive-data:0 0 2 * * *}")
public void archiveData() { ... }
5. Firebase listener pattern
@Service
public class FirebaseCommandListener {
@PostConstruct
public void startListening() {
String path = "commands/" + envPrefix;
FirebaseDatabase.getInstance()
.getReference(path)
.addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot snapshot, String prevKey) {
String commandType = snapshot.child("type").getValue(String.class);
String userId = snapshot.child("user_id").getValue(String.class);
String data = snapshot.child("data").getValue(String.class);
try {
commandRouter.route(commandType, userId, data);
} finally {
// Always cleanup to prevent re-processing
snapshot.getRef().removeValue((error, ref) -> {
if (error != null) {
LOGGER.error("Failed to remove Firebase command: {}", error.getMessage());
}
});
}
}
});
}
}
6. Dead letter queue handling
@JmsListener(destination = "${queue.dlq:DLQ}", containerFactory = "jmsListenerContainerFactory")
public void handleDlqMessage(String messageJson, @Header JmsHeaders headers) {
// JMSHandleDlqMessage logic:
LOGGER.error("[DLQ] Dead letter received: destination={}, message={}",
headers.getDestination(), messageJson);
// Alert to monitoring system
// Optionally store for manual review
}
7. GiftAggregationJob detail
@Scheduled(fixedDelayString = "${jobs.gift-aggregation.fixedDelay:5000}")
public void processAggregatedGifts() {
// 1. Scan Redis for all pending gift keys
Set<String> keys = redisCounterService.scanKeys(PENDING_GIFT_KEY_PREFIX + "*");
for (String key : keys) {
// 2. Atomic GETSET (read and reset to 0)
Long totalBcoinValue = redisCounterService.getAndSet(key, 0L);
if (totalBcoinValue != null && totalBcoinValue > 0) {
// 3. Extract streamerId from key
String streamerId = key.replace(PENDING_GIFT_KEY_PREFIX, "");
// 4. Send aggregated message with JMSXGroupID = streamerId
LivestreamGiftAggregateMessageDto msg = LivestreamGiftAggregateMessageDto.builder()
.streamerId(streamerId)
.totalBcoinValue(totalBcoinValue)
.build();
jmsProducer.sendWithMessageGroup(destinationQueue, streamerId, objectMapper.writeValueAsString(msg));
}
}
}
8. Observability gaps
| Gap |
Impact |
Recommendation |
| No queue lag monitoring |
Backlog undetected |
Add ActiveMQ metrics + alert |
| No job execution metrics |
Can't measure job performance |
Spring Batch micrometer integration |
| Firebase listener health |
Silent failures |
Periodic connectivity check |
| DLQ message count |
Errors accumulate |
DLQ monitoring dashboard |