HookProcessor.java
package io.featureprobe.api.hook;
import io.featureprobe.api.base.hook.HookSettingsStatus;
import io.featureprobe.api.base.hook.ICallback;
import io.featureprobe.api.base.hook.IHookQueue;
import io.featureprobe.api.base.hook.IHookRuleBuilder;
import io.featureprobe.api.base.model.HookContext;
import io.featureprobe.api.dao.entity.WebHookSettings;
import io.featureprobe.api.dao.repository.WebHookSettingsRepository;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@Slf4j
@Component
public class HookProcessor {
private final WebHookSettingsRepository webHookSettingsRepository;
private final ApplicationEventPublisher eventPublisher;
private final IHookRuleBuilder hookRuleBuilder;
private final AtomicBoolean closed = new AtomicBoolean(false);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("FeatureProbe-API-Hook-Processor-%d")
.setPriority(1)
.build();
public HookProcessor(IHookQueue hookQueue, WebHookSettingsRepository webHookSettingsRepository,
ApplicationEventPublisher eventPublisher, IHookRuleBuilder hookRuleBuilder) {
this.webHookSettingsRepository = webHookSettingsRepository;
this.eventPublisher = eventPublisher;
this.hookRuleBuilder = hookRuleBuilder;
Thread hookProcessorThread = threadFactory.newThread(() -> {
handleHook(hookQueue);
});
hookProcessorThread.setDaemon(true);
hookProcessorThread.start();
}
private void handleHook(IHookQueue hookQueue) {
while (!closed.get()) {
try {
HookContext hookContext = hookQueue.take();
List<WebHookSettings> webHookSettingsList = webHookSettingsRepository
.findAllByOrganizationIdAndStatus(hookContext.getOrganizationId(), HookSettingsStatus.ENABLE);
List<WebHook> webHooks = webHookSettingsList.stream()
.map(webHookSettings -> translateHookConfig(webHookSettings)).filter(x -> x!=null)
.collect(Collectors.toList());
for(WebHook webHook : webHooks) {
webHook.callback(hookContext, eventPublisher);
}
} catch (Exception e) {
log.error("FeatureProbe hook process error", e);
}
}
}
private WebHook translateHookConfig(WebHookSettings webHookSettings) {
WebHook webHook = new WebHook();
webHook.setName(webHookSettings.getName());
webHook.setUrl(webHookSettings.getUrl());
webHook.setOrganizationId(webHookSettings.getOrganizationId());
webHook.setRule(hookRuleBuilder.build(webHookSettings.getId()));
webHook.setSecretKey(webHookSettings.getSecretKey());
ICallback callback = CallbackAbilityContainer.get(webHookSettings.getType());
if (Objects.isNull(callback)) {
return null;
}
webHook.setHook(callback);
return webHook;
}
}