CacheServerDataSource.java

package io.featureprobe.api.server;

import io.featureprobe.api.base.cache.ICache;
import io.featureprobe.api.base.enums.ChangeLogType;
import io.featureprobe.api.base.util.JsonMapper;
import io.featureprobe.api.dao.entity.PublishMessage;
import io.featureprobe.api.dao.repository.PublishMessageRepository;
import io.featureprobe.api.dto.SdkKeyResponse;
import io.featureprobe.api.dto.ServerResponse;
import io.featureprobe.api.event.ToggleChangeEvent;
import io.featureprobe.api.service.BaseServerService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@AllArgsConstructor
@Slf4j
public class CacheServerDataSource extends AbstractCacheServerDataSource {

    ICache<String, byte[]> cache;

    private PublishMessageRepository publishMessageRepository;

    private BaseServerService baseServerService;

    private ApplicationEventPublisher eventPublisher;

    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder()
                    .setDaemon(true)
                    .setNameFormat("FeatureProbe-Cache-Updater-%d")
                    .setPriority(Thread.MIN_PRIORITY)
                    .build());

    @Override
    public SdkKeyResponse queryAllSdkKeys() throws Exception {
        byte[] bytes = cache.get(SDK_KEYS_CACHE_KEY);
        return Objects.isNull(bytes) ? null : JsonMapper.toObject(new String(bytes), SdkKeyResponse.class);
    }

    @Override
    public ServerResponse queryServerTogglesByServerSdkKey(String serverSdkKey) throws Exception {
        byte[] bytes = cache.get(serverSdkKey);
        return Objects.isNull(bytes) ? null : JsonMapper.toObject(new String(bytes), ServerResponse.class);
    }


    @PostConstruct
    public void init() {
        StopWatch watcher = new StopWatch();
        watcher.start();
        log.info("FeatureProbe API start initialization cache .");
        List<PublishMessage> publishMessages = publishMessageRepository.findAll(PageRequest.of(0, 1,
                Sort.Direction.DESC,
                "id")).getContent();
        PublishMessage maxId = CollectionUtils.isNotEmpty(publishMessages) ? publishMessages.get(0) :
                new PublishMessage(0L);
        cache.put(MAX_CHANGE_LOG_ID_CACHE_KEY, JsonMapper.toJSONString(maxId).getBytes());
        SdkKeyResponse sdkKeyResponse = baseServerService.queryAllSdkKeys();
        cache.put(SDK_KEYS_CACHE_KEY, JsonMapper.toJSONString(sdkKeyResponse).getBytes());
        Map<String, byte[]> allServerToggle = baseServerService.queryAllServerToggle();
        cache.putAll(allServerToggle);
        watcher.stop();
        log.info("FeatureProbe API initialization cache finished . Time : "
                + watcher.getTime(TimeUnit.SECONDS) + " s");
        scheduler.scheduleAtFixedRate(this::handleChangeLog, 0L, 200, TimeUnit.MILLISECONDS);
    }

    @VisibleForTesting
    private void handleChangeLog() {
        try {
            PublishMessage maxPublishMessage = JsonMapper.toObject(new String(
                    cache.get(MAX_CHANGE_LOG_ID_CACHE_KEY)), PublishMessage.class);
            List<PublishMessage> publishMessages = publishMessageRepository
                    .findAllByIdGreaterThanOrderByIdAsc(maxPublishMessage.getId());
            if (CollectionUtils.isNotEmpty(publishMessages)) {
                cache.put(MAX_CHANGE_LOG_ID_CACHE_KEY,
                        JsonMapper.toJSONString(publishMessages.get(publishMessages.size() - 1)).getBytes());
                Map<String, ChangeLogType> logGroup = new HashMap<>();
                boolean isUpdateSdkKey = false;
                for (PublishMessage publishMessage : publishMessages) {
                    logGroup.put(publishMessage.getServerSdkKey(), publishMessage.getType());
                    if (publishMessage.getType() == ChangeLogType.ADD ||
                            publishMessage.getType() == ChangeLogType.DELETE) {
                        isUpdateSdkKey = true;
                    }
                }
                if (isUpdateSdkKey) {
                    cache.put(SDK_KEYS_CACHE_KEY,
                            JsonMapper.toJSONString(baseServerService.queryAllSdkKeys()).getBytes());
                }
                for (String serverSdkKey : logGroup.keySet()) {
                    refreshCache(serverSdkKey, logGroup.get(serverSdkKey));
                }
            }
        } catch (Exception e) {
            log.error("Cache update error. ", e);
        }
    }

    private void refreshCache(String serverSdkKey, ChangeLogType type) {
        switch (type) {
            case ADD:
            case CHANGE:
                cache.put(serverSdkKey,
                        JsonMapper.toJSONString(baseServerService.queryServerTogglesByServerSdkKey(serverSdkKey))
                                .getBytes());
                eventPublisher.publishEvent(new ToggleChangeEvent(serverSdkKey, this));
                break;
            case DELETE:
                cache.invalidate(serverSdkKey);
                break;
            default:
                break;
        }
    }

}