首页 > 技术文章 > Spring生态系列文章 >

SpringCloud-源码分析 Hystrix 熔断器

更新时间:2019-01-03 | 阅读量(2,949)

> 本文作者:陈刚,叩丁狼高级讲师。原创文章,转载请注明出处。 ### 回顾 为了防止服务之间的调用异常造成的连锁反应,在SpringCloud中提供了Hystrix组件来实现服务调用异常的处理,或对高并发情况下的服务降级处理 。简单回顾一下Hystrix的使用: 1.要使用 Hystrix熔断机制处理引入它本身的依赖之外,我们需要在主程序配置类上贴 @EnableHystrix 标签 开启Hystrix功能,如下 ``` @EnableHystrix @EnableEurekaClient @SpringBootApplication ... public class ConsumerApplication { ``` 2.开启Hystrix熔断机制后,对方法进行熔断处理 ``` @Service public class HelloService { @Autowired private RestTemplate restTemplate; //该注解对该方法创建了熔断器的功能,并指定了fallbackMethod熔断方法 @HystrixCommand(fallbackMethod = "hiError") public String hiService(String name){ //调用接口进行消费 String result = restTemplate.getForObject("http://PRODUCER/hello?name="+name,String.class); return result; } public String hiError(String name) { return "hi,"+name+"error!"; } } ``` 当hiService方法第调用异常,会触发 fallbackMethod执行的hiError方法做成一些补救处理。 那么我们就沿着我们的使用方式来跟踪一下 Hystrix的 工作原理。 首先我们看一下标签:@EnableHystrix ,他的作用从名字就能看出就是开启Hystrix ,我们看一下它的源码 ``` @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @EnableCircuitBreaker public @interface EnableHystrix { } ``` 它上面有一个注解:@ EnableCircuitBreaker ,翻译单词意思就是启用熔断器(断路器),那么@ EnableHystrix标签的本质其实是@ EnableCircuitBreaker ,我们看一下他的源码 ``` /** * Annotation to enable a CircuitBreaker implementation. * http://martinfowler.com/bliki/CircuitBreaker.html * @author Spencer Gibb */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableCircuitBreakerImportSelector.class) public @interface EnableCircuitBreaker { } ``` @EnableCircuitBreaker标签引入了一个@Import(EnableCircuitBreakerImportSelector.class) 类,翻译类的名字就是 , 开启熔断器的导入选择器 ,导入什么东西呢?看源码 ``` /** * Import a single circuit breaker implementation Configuration * @author Spencer Gibb */ @Order(Ordered.LOWEST_PRECEDENCE - 100) public class EnableCircuitBreakerImportSelector extends SpringFactoryImportSelector { @Override protected boolean isEnabled() { return getEnvironment().getProperty( "spring.cloud.circuit.breaker.enabled", Boolean.class, Boolean.TRUE); } } ``` 翻译类上的注释 “Import a single circuit breaker implementation Configuration”,其实EnableCircuitBreakerImportSelector的作用就是去导入熔断器的配置 。其实Spring中也有类似于JAVA SPI 的加载机制, 即会自动加载 jar包 spring-cloud-netflix-core 中的META-INF/spring.factories 中的Hystrix相关的自动配置类 注:SPI : 通过将服务的接口与实现分离以实现解耦,提高程序拓展性的机制,达到插拔式的效果 。 ![image.png](https://upload-images.jianshu.io/upload_images/11046204-2cd19f8f3f490831.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) HystrixCircuitBreakerConfiguration 就是针对于 Hystrix熔断器的配置 ``` /** * @author Spencer Gibb * @author Christian Dupuis * @author Venil Noronha */ @Configuration public class HystrixCircuitBreakerConfiguration { @Bean public HystrixCommandAspect hystrixCommandAspect() { return new HystrixCommandAspect(); } @Bean public HystrixShutdownHook hystrixShutdownHook() { return new HystrixShutdownHook(); } @Bean public HasFeatures hystrixFeature() { return HasFeatures.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class)); } ...... ``` 在该配置类中创建了 HystrixCommandAspect ``` /** * AspectJ aspect to process methods which annotated with {@link HystrixCommand} annotation. */ @Aspect public class HystrixCommandAspect { private static final Map META_HOLDER_FACTORY_MAP; static { META_HOLDER_FACTORY_MAP = ImmutableMap.builder() .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory()) .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory()) .build(); } //定义切点,切到 @HystrixCommand标签所在的方法 @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)") public void hystrixCommandAnnotationPointcut() { } @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)") public void hystrixCollapserAnnotationPointcut() { } //针对切点:@hystrixCommand切点的处理 @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { //获取到目标方法 Method method = getMethodFromTarget(joinPoint); Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint); //判断方法上不能同时存在@HystrixCommand标签和HystrixCollapser标签 if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) { throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " + "annotations at the same time"); } MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method)); MetaHolder metaHolder = metaHolderFactory.create(joinPoint); //把方法封装成 HystrixInvokable HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder); ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType(); Object result; try { // 通过CommandExecutor来执行方法 if (!metaHolder.isObservable()) { result = CommandExecutor.execute(invokable, executionType, metaHolder); } else { result = executeObservable(invokable, executionType, metaHolder); } } catch (HystrixBadRequestException e) { throw e.getCause() != null ? e.getCause() : e; } catch (HystrixRuntimeException e) { throw hystrixRuntimeExceptionToThrowable(metaHolder, e); } return result; ``` HystrixCommandAspect 其实就是对 贴了@HystrixCommand标签的方法使用 Aop机制实现处理 。代码中通过把目标方法封装成 HystrixInvokable对象,通过CommandExecutor工具来执行目标方法。 HystrixInvokable是用来干嘛的?看源码知道,其实他是一个空行法的接口,他的目的只是用来标记可被执行,那么他是如何创建的我们看代码HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);的create方法 ``` public HystrixInvokable create(MetaHolder metaHolder) { HystrixInvokable executable; ...省略代码... executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } return executable; } ``` 其实是new了一个 GenericCommand 对象,很明显他们是实现关系,我们看一下关系图 ![image.png](https://upload-images.jianshu.io/upload_images/11046204-fe0461c7f09d0224.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 跟踪 GenericCommand 的源码 ``` @ThreadSafe public class GenericCommand extends AbstractHystrixCommand { private static final Logger LOGGER = LoggerFactory.getLogger(GenericCommand.class); public GenericCommand(HystrixCommandBuilder builder) { super(builder); } protected Object run() throws Exception { LOGGER.debug("execute command: {}", this.getCommandKey().name()); return this.process(new AbstractHystrixCommand.Action() { Object execute() { return GenericCommand.this.getCommandAction().execute(GenericCommand.this.getExecutionType()); } }); } protected Object getFallback() { final CommandAction commandAction = this.getFallbackAction(); if (commandAction != null) { try { return this.process(new AbstractHystrixCommand.Action() { Object execute() { MetaHolder metaHolder = commandAction.getMetaHolder(); Object[] args = CommonUtils.createArgsForFallback(metaHolder, GenericCommand.this.getExecutionException()); return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args); } }); } catch (Throwable var3) { LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, var3).build()); throw new FallbackInvocationException(ExceptionUtils.unwrapCause(var3)); } } else { return super.getFallback(); } } } ``` 它本身对目标方法的正常执行和对 fallback方法的 执行做了实现 。 GenericCommand.this.getCommandAction().execute(...)获取到目标方法并执行,底层会交给 MethodExecutionAction 使用反射去执行方法, 回到 HystrixCommandAspect的methodsAnnotatedWithHystrixCommand方法中,我们看下 CommandExecutor.execute是如何执行的 ``` public class CommandExecutor { public CommandExecutor() { } public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { Validate.notNull(invokable); Validate.notNull(metaHolder); switch(executionType) { //异步 case SYNCHRONOUS: return castToExecutable(invokable, executionType).execute(); //同步 case ASYNCHRONOUS: HystrixExecutable executable = castToExecutable(invokable, executionType); if (metaHolder.hasFallbackMethodCommand() && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { return new FutureDecorator(executable.queue()); } return executable.queue(); case OBSERVABLE: HystrixObservable observable = castToObservable(invokable); return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); default: throw new RuntimeException("unsupported execution type: " + executionType); } } private static HystrixExecutable castToExecutable(HystrixInvokable invokable, ExecutionType executionType) { if (invokable instanceof HystrixExecutable) { return (HystrixExecutable)invokable; } else { throw new RuntimeException("Command should implement " + HystrixExecutable.class.getCanonicalName() + " interface to execute in: " + executionType + " mode"); } } ``` 这里有两种执行方式 SYNCHRONOUS 异步 ,ASYNCHRONOUS同步 ,我们先看异步: castToExecutable(invokable, executionType).execute(); 这里代码把HystrixInvokable对象转成 HystrixExecutable并调用execute方法执行 ,跟踪execute方法进入HystrixCommand.execute方法中 ``` public R execute() { try { return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } } -------------- public Future queue() { /* * The Future returned by Observable.toBlocking().toFuture() does not implement the * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true; * thus, to comply with the contract of Future, we must wrap around it. */ final Future delegate = toObservable().toBlocking().toFuture(); final Future f = new Future() { @Override public boolean cancel(boolean mayInterruptIfRunning) { if (delegate.isCancelled()) { return false; } if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) { /* * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are * issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked. * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption, * than that interruption request cannot be taken back. */ interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning); } final boolean res = delegate.cancel(interruptOnFutureCancel.get()); if (!isExecutionComplete() && interruptOnFutureCancel.get()) { final Thread t = executionThread.get(); if (t != null && !t.equals(Thread.currentThread())) { t.interrupt(); } } return res; } ....省略... ``` 在 HystrixCommand.execute方法中 其实是Future 来异步执行,调用过程中会触发 GenericCommand来完成调用,执行完成后调用 Future.get()方法拿到执行结果 。
叩丁狼学员采访 叩丁狼学员采访
叩丁狼头条 叩丁狼头条
叩丁狼在线课程 叩丁狼在线课程