TrafficService.java
package io.featureprobe.api.service;
import io.featureprobe.api.base.db.ExcludeTenant;
import io.featureprobe.api.base.enums.TrafficCacheTypeEnum;
import io.featureprobe.api.base.enums.ResourceType;
import io.featureprobe.api.base.util.JsonMapper;
import io.featureprobe.api.dao.entity.DebugEvent;
import io.featureprobe.api.dao.entity.Environment;
import io.featureprobe.api.dao.entity.Traffic;
import io.featureprobe.api.dao.entity.TrafficCache;
import io.featureprobe.api.dao.exception.ResourceNotFoundException;
import io.featureprobe.api.dao.repository.DebugEventRepository;
import io.featureprobe.api.dao.repository.EnvironmentRepository;
import io.featureprobe.api.dao.repository.TrafficRepository;
import io.featureprobe.api.dao.repository.TrafficCacheRepository;
import io.featureprobe.api.dto.TrafficCreateRequest;
import io.featureprobe.api.dto.VariationAccessCounterRequest;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@Slf4j
@Service
@AllArgsConstructor
@ExcludeTenant
public class TrafficService {
private TrafficRepository trafficRepository;
private EnvironmentRepository environmentRepository;
private TrafficCacheRepository trafficCacheRepository;
private DebugEventRepository debugEventRepository;
private final static ExecutorService executorService = Executors.newFixedThreadPool(5);
@Transactional(rollbackFor = Exception.class)
public void create(String serverSdkKey, String userAgent, List<TrafficCreateRequest> requests) {
Environment environment = environmentRepository.findByServerSdkKey(serverSdkKey)
.orElseThrow(() -> new ResourceNotFoundException(ResourceType.ENVIRONMENT, serverSdkKey));
requests.forEach(request -> {
if (request.getAccess() == null) {
return;
}
if (CollectionUtils.isNotEmpty(request.getEvents())) {
DebugEventStorageTask debugEventStorageTask = new DebugEventStorageTask(serverSdkKey,
debugEventRepository, request.getEvents(), userAgent);
executorService.submit(debugEventStorageTask);
}
List<Traffic> events = Optional.of(request.getAccess().getCounters())
.orElse(Collections.emptyMap())
.entrySet()
.stream()
.flatMap(entry -> createEventEntities(entry).stream())
.map(event -> wrapEvent(event, userAgent, environment, request))
.collect(Collectors.toList());
if (!events.isEmpty()) {
trafficRepository.saveAll(events);
saveAllEvaluation(events);
}
});
}
public void saveAllEvaluation(List<Traffic> events) {
Map<String, Traffic> eventMap = events.stream()
.collect(Collectors.toMap(Traffic::uniqueKey, u -> u, (k1, k2) -> k2));
for (String key : eventMap.keySet()) {
trafficCacheRepository.deleteBySdkKeyAndToggleKey(eventMap.get(key).getSdkKey(),
eventMap.get(key).getToggleKey());
}
List<Traffic> uniqueEvents = eventMap.entrySet().stream().map(Map.Entry::getValue).collect(Collectors.toList());
List<TrafficCache> trafficCaches = uniqueEvents.stream().map(e -> toMetricsCache(e))
.collect(Collectors.toList());
trafficCacheRepository.saveAll(trafficCaches);
}
private TrafficCache toMetricsCache(Traffic event) {
TrafficCache trafficCache = new TrafficCache();
trafficCache.setSdkKey(event.getSdkKey());
trafficCache.setToggleKey(event.getToggleKey());
trafficCache.setEndDate(event.getEndDate());
trafficCache.setStartDate(event.getStartDate());
trafficCache.setType(TrafficCacheTypeEnum.EVALUATION);
return trafficCache;
}
private List<Traffic> createEventEntities(Map.Entry<String,
List<VariationAccessCounterRequest>> toggleToAccessCounter) {
String toggleKey = toggleToAccessCounter.getKey();
return Optional.of(toggleToAccessCounter.getValue())
.orElse(Collections.emptyList())
.stream()
.map(accessEvent -> this.createEventEntity(toggleKey, accessEvent))
.collect(Collectors.toList());
}
private Traffic createEventEntity(String toggleKey, VariationAccessCounterRequest accessCounter) {
Traffic event = new Traffic();
event.setToggleKey(toggleKey);
event.setCount(accessCounter.getCount());
event.setValueIndex(accessCounter.getIndex());
event.setToggleVersion(accessCounter.getVersion());
event.setValue(JsonMapper.toJSONString(accessCounter.getValue()));
return event;
}
private Traffic wrapEvent(Traffic event, String userAgent, Environment environment, TrafficCreateRequest request) {
if (request.getAccess() == null) {
return event;
}
event.setSdkKey(environment.getServerSdkKey());
event.setProjectKey(environment.getProject().getKey());
event.setEnvironmentKey(environment.getKey());
event.setType("access");
event.setSdkType(getSdkType(userAgent));
event.setSdkVersion(getSdkVersion(userAgent));
event.setStartDate(new Date(request.getAccess().getStartTime()));
event.setEndDate(new Date(request.getAccess().getEndTime()));
return event;
}
private String getSdkType(String userAgent) {
return extractSdkField(userAgent, 0);
}
private String getSdkVersion(String userAgent) {
return extractSdkField(userAgent, 1);
}
private String extractSdkField(String userAgent, int index) {
if (StringUtils.isBlank(userAgent) || !userAgent.contains("/")) {
log.error("[Event] SDK user-agent format error. {} ", userAgent);
return "";
}
String[] parts = userAgent.split("/");
return parts.length > index ? parts[index] : null;
}
class DebugEventStorageTask implements Runnable {
private final String sdkKey;
private final DebugEventRepository debugEventRepository;
private final List<Map> events;
private final String userAgent;
public DebugEventStorageTask(String sdkKey, DebugEventRepository debugEventRepository, List<Map> events,
String userAgent) {
this.sdkKey = sdkKey;
this.debugEventRepository = debugEventRepository;
this.events = events;
this.userAgent = userAgent;
}
@Override
public void run() {
if (Objects.nonNull(events) && CollectionUtils.isNotEmpty(events)) {
List<DebugEvent> debugEvents = events.stream().filter(event -> "debug".equals(event.get("kind")))
.map(event -> buildDebugEvent(event, sdkKey, userAgent)).collect(Collectors.toList());
debugEventRepository.saveAll(debugEvents);
}
}
}
private DebugEvent buildDebugEvent(Map event, String sdkKey, String userAgent) {
DebugEvent debugEvent = new DebugEvent();
debugEvent.setSdkKey(sdkKey);
debugEvent.setKind(String.valueOf(event.get("kind")));
debugEvent.setTime((Long) event.get("time"));
debugEvent.setUserKey(String.valueOf(event.get("user")));
debugEvent.setUserDetail(JsonMapper.toJSONString(event.get("userDetail")));
debugEvent.setToggleKey(String.valueOf(event.get("key")));
debugEvent.setValue(JsonMapper.toJSONString(event.get("value")));
debugEvent.setVersion((Integer) event.get("version"));
debugEvent.setVariationIndex((Integer) event.get("variationIndex"));
debugEvent.setRuleIndex((Integer) event.get("ruleIndex"));
debugEvent.setReason(String.valueOf(event.get("reason")));
debugEvent.setSdkType(getSdkType(userAgent));
debugEvent.setSdkVersion(getSdkVersion(userAgent));
return debugEvent;
}
}