Skip to content

Thiết kế: Batch & Async Processing Feature

1. Tổng quan kiến trúc

flowchart TB
    subgraph Startup["Batch App Startup"]
        REG["ProgrammaticEndpointRegistration\n@PostConstruct"]
    end

    subgraph Schedulers["Spring @Scheduled Jobs"]
        GAME["Game loop:\nMatchMaking, MachineWaiting, OvertimePing"]
        PAYMENT["Payment:\nCardExpired, CryptoExpired, TokenExpired,\nCardConfirm, TokenConfirm"]
        NFT["NFT:\nRentalExpiry, LifeGauge, Revenue"]
        LEADERBOARD["Leaderboard:\nEvent, Tournament, Daily"]
        MAINTENANCE["Maintenance:\nSite, Cleanup, Archive, DeleteAccount"]
        GIFT["Gift:\nGiftAggregationJob (fixedDelay=5s)"]
    end

    subgraph JMSQueues["ActiveMQ Artemis"]
        Q_PA["Prize Allocation Queues (×9)"]
        Q_MATCH["Matching Queues (per setting)"]
        Q_MACHINE["Machine Queues (per machine)"]
        Q_INVITE["queue-invitation"]
        Q_GACHA["Gacha Queues (×5)"]
        Q_LIVE["livestream-viewer-gift"]
        Q_OTHER["Other queues"]
    end

    subgraph Listeners["JMS Listeners (batch module)"]
        L_PA["Prize Allocation Listeners (×9)"]
        L_MATCH["Matching Listeners (dynamic)"]
        L_MACHINE["Machine Listeners (dynamic)"]
        L_GACHA["Gacha Listeners (×5)"]
        L_LIVE["JMSLivestreamViewerGiftFirebaseUpdateListener"]
        L_RECOVERY["JMSListenerRecoveryScheduler"]
    end

    subgraph Firebase["Firebase RTDB"]
        FB_CMD["commands/"]
        FB_EVT["event_types/"]
        FB_LIVE["livestreams/"]
        FB_YELL["yell_points_history/"]
    end

    subgraph FBListeners["firebaselistener module"]
        FBL_CMD["FirebaseCommandListener"]
        FBL_EVT["FirebaseEventTypesListener"]
        FBL_LIVE["FirebaseLivestreamEventListener"]
        FBL_YELL["FirebaseLivestreamsYellPointListener"]
    end

    REG -->|"register listeners"| Q_MATCH & Q_MACHINE

    GIFT -->|"Redis → MQ"| Q_LIVE
    SCHEDULERS -->|"trigger"| Q_PA

    Q_PA --> L_PA
    Q_MATCH --> L_MATCH
    Q_MACHINE --> L_MACHINE
    Q_GACHA --> L_GACHA
    Q_LIVE --> L_LIVE

    L_RECOVERY -->|"monitor"| L_PA & L_MATCH & L_MACHINE

    FB_CMD --> FBL_CMD
    FB_EVT --> FBL_EVT
    FB_LIVE --> FBL_LIVE
    FB_YELL --> FBL_YELL

2. Dynamic listener registration

sequenceDiagram
    participant App as Batch App (@PostConstruct)
    participant REG as ProgrammaticEndpointRegistration
    participant DB as MySQL
    participant CTX as JmsListenerEndpointRegistry
    participant MQ as ActiveMQ

    App->>REG: init()
    REG->>DB: SELECT all playableGameBoothSettings
    loop per settingId
        REG->>CTX: registerListenerContainer(\n  queue-game-mode-type-pool-{settingId},\n  GameModeCreateRoomJMSListener\n)
        REG->>CTX: registerListenerContainer(\n  matching-pvp-{settingId},\n  MatchingGameModePvPJMSListener\n)
        REG->>CTX: registerListenerContainer(\n  matching-pvp-multi-{settingId},\n  MatchingGameModePvPMultiMatchJMSListener\n)
    end
    REG->>DB: SELECT all machines
    loop per machineId
        REG->>CTX: registerListenerContainer(\n  machine-start-{machineId},\n  CreateMachineStartRoomJMSListener\n)
    end
    CTX->>MQ: create/bind queues

3. Listener recovery flow

sequenceDiagram
    participant SCHED as JMSListenerRecoveryScheduler
    participant REG as JmsListenerEndpointRegistry
    participant CTX as MessageListenerContainer

    loop Every N seconds
        SCHED->>REG: getListenerContainers()
        loop per container
            REG-->>SCHED: container
            SCHED->>CTX: isRunning()?
            alt NOT running
                SCHED->>CTX: start()
                SCHED->>LOGGER: warn("Recovered listener: {id}")
            end
        end
    end

4. Job scheduling patterns

fixedDelay (sequential)

// Job chờ xong lần trước mới chạy lần sau
@Scheduled(fixedDelayString = "${jobs.gift-aggregation.fixedDelay:5000}")
public void processAggregatedGifts() { ... }

// Dùng cho: GiftAggregationJob, thường những job cần sequential

fixedRate (parallel allowed)

// Job chạy theo fixed interval, có thể overlap nếu chậm
@Scheduled(fixedRate = 60000)
public void runMatchmaking() { ... }
// Thường có Spring Batch ShedLock để tránh overlap

cron

// Chạy theo cron expression
@Scheduled(cron = "${batch.cron.archive-data:0 0 2 * * *}")
public void archiveData() { ... }

5. Firebase listener pattern

@Service
public class FirebaseCommandListener {

    @PostConstruct
    public void startListening() {
        String path = "commands/" + envPrefix;
        FirebaseDatabase.getInstance()
            .getReference(path)
            .addChildEventListener(new ChildEventListener() {
                @Override
                public void onChildAdded(DataSnapshot snapshot, String prevKey) {
                    String commandType = snapshot.child("type").getValue(String.class);
                    String userId = snapshot.child("user_id").getValue(String.class);
                    String data = snapshot.child("data").getValue(String.class);

                    try {
                        commandRouter.route(commandType, userId, data);
                    } finally {
                        // Always cleanup to prevent re-processing
                        snapshot.getRef().removeValue((error, ref) -> {
                            if (error != null) {
                                LOGGER.error("Failed to remove Firebase command: {}", error.getMessage());
                            }
                        });
                    }
                }
            });
    }
}

6. Dead letter queue handling

@JmsListener(destination = "${queue.dlq:DLQ}", containerFactory = "jmsListenerContainerFactory")
public void handleDlqMessage(String messageJson, @Header JmsHeaders headers) {
    // JMSHandleDlqMessage logic:
    LOGGER.error("[DLQ] Dead letter received: destination={}, message={}", 
        headers.getDestination(), messageJson);
    // Alert to monitoring system
    // Optionally store for manual review
}

7. GiftAggregationJob detail

@Scheduled(fixedDelayString = "${jobs.gift-aggregation.fixedDelay:5000}")
public void processAggregatedGifts() {
    // 1. Scan Redis for all pending gift keys
    Set<String> keys = redisCounterService.scanKeys(PENDING_GIFT_KEY_PREFIX + "*");

    for (String key : keys) {
        // 2. Atomic GETSET (read and reset to 0)
        Long totalBcoinValue = redisCounterService.getAndSet(key, 0L);

        if (totalBcoinValue != null && totalBcoinValue > 0) {
            // 3. Extract streamerId from key
            String streamerId = key.replace(PENDING_GIFT_KEY_PREFIX, "");

            // 4. Send aggregated message with JMSXGroupID = streamerId
            LivestreamGiftAggregateMessageDto msg = LivestreamGiftAggregateMessageDto.builder()
                .streamerId(streamerId)
                .totalBcoinValue(totalBcoinValue)
                .build();

            jmsProducer.sendWithMessageGroup(destinationQueue, streamerId, objectMapper.writeValueAsString(msg));
        }
    }
}

8. Observability gaps

Gap Impact Recommendation
No queue lag monitoring Backlog undetected Add ActiveMQ metrics + alert
No job execution metrics Can't measure job performance Spring Batch micrometer integration
Firebase listener health Silent failures Periodic connectivity check
DLQ message count Errors accumulate DLQ monitoring dashboard