前言
微服務(wù)架構(gòu)中,前置檢查功能項(xiàng)對(duì)于服務(wù)的可靠性有重要意義,使用場(chǎng)景如:
1、如檢查基礎(chǔ)服務(wù),如不正常需要熔斷
2、如檢查被依賴(lài)服務(wù),不正常需要熔斷
3、本服務(wù)有較長(zhǎng)的初始化邏輯,需要完成后,才能通知提供正常REST功能
實(shí)踐
import com.google.common.util.concurrent.ThreadFactoryBuilder;import lombok.extern.slf4j.Slf4j;import java.util.List;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.function.Consumer;@Slf4jpublic class ApplicationStartupChecker { private boolean checkSwitch = true; private ScheduledExecutorService scheduledExecutorService; private Consumer passConsumer; private AdvanceCheckRegister advanceCheckRegister = new AdvanceCheckRegisterImpl(); private String threadName; private int checkCycle; private TimeUnit timeUnit; private int checkCount = 0; public ApplicationStartupChecker(String threadName, int checkCycle, TimeUnit timeUnit) { scheduledExecutorService = new ScheduledThreadPoolExecutor( 1, new ThreadFactoryBuilder().setNameFormat(threadName + “-%d”).setDaemon(true).build()); this.threadName = threadName; this.checkCycle = checkCycle; this.timeUnit = timeUnit; } public ApplicationStartupChecker(String threadName) { this(threadName, 30, TimeUnit.SECONDS); } public void bootstrap() { if (scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated()) { scheduledExecutorService = new ScheduledThreadPoolExecutor( 1, new ThreadFactoryBuilder().setNameFormat(threadName + “-%d”).build()); } scheduledExecutorService.schedule(this::guidance, 0, TimeUnit.SECONDS); } private void guidance() { if (!checkSwitch) { scheduledExecutorService.shutdown(); log.info(“Not need check”); if (passConsumer != null) { passConsumer.accept(advanceCheckRegister.getCheckerNames()); } } checkCount++; log.info(“No.{} app startup checking”, checkCount); try { advanceCheckRegister.foreach(Checker::check); } catch (Throwable e) { log.error(“check with error.”, e); } boolean pass = advanceCheckRegister.pass(); if (pass) { scheduledExecutorService.shutdown(); log.info(“All check passed:{}”, String.join(“,”, advanceCheckRegister.getCheckerNames())); if (passConsumer != null) { passConsumer.accept(advanceCheckRegister.getCheckerNames()); } } else { scheduledExecutorService.schedule(this::guidance, this.checkCycle, this.timeUnit); } } public void setPassConsumer(Consumer passConsumer) { this.passConsumer = passConsumer; } public void addChecker(Checker kafkaHealthChecker) { advanceCheckRegister.registerEntity(kafkaHealthChecker); }}
配置用于保存檢查項(xiàng):
import java.util.List;import java.util.function.Consumer;public interface AdvanceCheckRegister { void registerEntity(Checker checkEntity); boolean pass(); void foreach(Consumer consumer); List getCheckerNames();}package com.zte.sdn.oscp.check;import java.util.ArrayList;import java.util.List;import java.util.Objects;import java.util.function.Consumer;import java.util.stream.Collectors;public class AdvanceCheckRegisterImpl implements AdvanceCheckRegister { private final List checkEntityList = new ArrayList(); @Override public void registerEntity(Checker entity) { Objects.requireNonNull(entity,”Register Checker can not be null.”); if ( entity.getName() == null || entity.getName().isEmpty()) { throw new IllegalArgumentException(entity.getClass().getSimpleName()+” must have valid name”); } checkEntityList.add(entity); } @Override public boolean pass() { return this.checkEntityList.stream().allMatch(Checker::isPass); } @Override public void foreach(Consumer consumer) { for (Checker checkEntity : checkEntityList) { boolean check = checkEntity.check(); checkEntity.setPass(check); if (!check) { return; } } } @Override public List getCheckerNames() { return checkEntityList.stream().map(Checker::getName).collect(Collectors.toList()); }}
檢測(cè)項(xiàng)接口
public interface Checker { String getName(); boolean check(); boolean isPass(); void setPass(boolean pass);}package com.zte.sdn.oscp.check;public abstract class AbstractChecker implements Checker { private boolean pass; @Override public boolean isPass() { return this.pass; } @Override public void setPass(boolean pass) { this.pass = pass; }}
檢查項(xiàng)
新增Checker項(xiàng),只需要繼承指定接口,實(shí)現(xiàn)檢測(cè)邏輯,即可
public class DataBaseHealthChecker extends AbstractChecker { @Override public String getName() { return “DataBaseHealthChecker”; } @Getter @VisibleForTesting private int count; @Override public boolean check() { count++; if (count >= 3) { return true; } return false; }}
測(cè)試
循環(huán)檢測(cè)直至通過(guò),通過(guò)后回調(diào)對(duì)應(yīng)方法,不過(guò)這是單獨(dú)線(xiàn)程,不會(huì)阻塞主線(xiàn)程
public class CheckTest { @Test public void checkTest() throws InterruptedException { ApplicationStartupChecker applicationStartupChecker = new ApplicationStartupChecker(“abc”); KafkaHealthChecker kafkaHealthChecker = new KafkaHealthChecker(); DataBaseHealthChecker dataBaseHealthChecker = new DataBaseHealthChecker(); applicationStartupChecker.addChecker(kafkaHealthChecker); applicationStartupChecker.addChecker(dataBaseHealthChecker); CountDownLatch latch = new CountDownLatch(1); applicationStartupChecker.setPassConsumer(t -> { latch.countDown(); }); applicationStartupChecker.bootstrap(); latch.await(); int kafkaCheck = kafkaHealthChecker.getCount(); int dbCheck = dataBaseHealthChecker.getCount(); Assert.assertEquals(5, kafkaCheck, 0.0); Assert.assertEquals(3, dbCheck, 0.0); } @Test public void checkTestNOChecker() throws InterruptedException { ApplicationStartupChecker applicationStartupChecker = new ApplicationStartupChecker(“abc”); CountDownLatch latch = new CountDownLatch(1); applicationStartupChecker.setPassConsumer(t -> { latch.countDown(); }); applicationStartupChecker.bootstrap(); latch.await(); } @Test public void checkTestNoConsumer() throws InterruptedException { ApplicationStartupChecker applicationStartupChecker = new ApplicationStartupChecker(“abc”); KafkaHealthChecker kafkaHealthChecker = new KafkaHealthChecker(); DataBaseHealthChecker dataBaseHealthChecker = new DataBaseHealthChecker(); applicationStartupChecker.addChecker(kafkaHealthChecker); applicationStartupChecker.addChecker(dataBaseHealthChecker); applicationStartupChecker.bootstrap(); while (dataBaseHealthChecker.getCount() < 3) { TimeUnit.SECONDS.sleep(5); } Assert.assertEquals(5, kafkaHealthChecker.getCount(), 0.0); Assert.assertEquals(3, dataBaseHealthChecker.getCount(), 0.0); } @Test public void checkTestNoConsumerTimeOut() throws InterruptedException { ApplicationStartupChecker applicationStartupChecker = new ApplicationStartupChecker("abc", 2, TimeUnit.SECONDS); KafkaHealthChecker kafkaHealthChecker = new KafkaHealthChecker(); DataBaseHealthChecker dataBaseHealthChecker = new DataBaseHealthChecker(); applicationStartupChecker.addChecker(kafkaHealthChecker); applicationStartupChecker.addChecker(dataBaseHealthChecker); applicationStartupChecker.bootstrap(); while (dataBaseHealthChecker.getCount() { latch.countDown(); }); applicationStartupChecker.bootstrap(); applicationStartupChecker.bootstrap(); applicationStartupChecker.bootstrap(); applicationStartupChecker.bootstrap(); latch.await(); int kafkaCheck = kafkaHealthChecker.getCount(); int dbCheck = dataBaseHealthChecker.getCount(); Assert.assertEquals(5, kafkaCheck, 0.0); Assert.assertEquals(3, dbCheck, 0.0); }}
其它
如果想產(chǎn)生檢查不通過(guò),程序不進(jìn)行下一步動(dòng)作的強(qiáng)依賴(lài),實(shí)際上也簡(jiǎn)單,只需要在ApplicationStartupChecker中增加
>private CountDownLatch wait=new CountDownLatch(1);
檢查通過(guò)后減1,主程序中await即可。