avatar


26.Sentinel

Sentinel介绍

应用场景

  • 流量控制
    在系统运行期间,任意时间到来的请求往往是随机不可控的,而系统的处理能力是有限的。我们需要根据系统的处理能力对流量进行控制。
  • 熔断降级
    在调用系统的时候,如果调用链路中的某个资源出现了不稳定,最终会导致请求发生堆积。
    而熔断降级就可以解决这个问题。
    所谓的熔断降级就是当检测到调用链路中某个资源出现不稳定的表现,例如请求响应时间长或异常比例升高的时候,则对这个资源的调用进行限制,让请求快速失败,避免影响到其它的资源而导致级联故障。

竞品比较

  • Hystrix
    • https://github.com/Netflix/Hystrix
    • Hystrix源自Netflix公司的一个开源项目。
    • 注意,截止2024年3月14日,该项目的最后一个发行版是2018年11月的。
  • Resilience4j
  • Sentinel
    • https://github.com/alibaba/Sentinel
    • Sentinel是阿里巴巴开源的面向分布式服务架构的轻量级流量控制组件。主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护等多个维度来保障微服务的稳定性。
Sentinel Hystrix resilience4j
隔离策略 信号量隔离(并发线程数隔离) 线程池隔离/信号量隔离 信号量隔离
熔断降级策略 基于响应时间、异常比率、异常数 基于异常比率 基于异常比率、响应时间
实时统计实现 滑动窗口 滑动窗口 Ring Bit Buffer
动态规则配置 支持多种数据源 支持多种数据源 有限支持
扩展性 多个扩展点 插件的形式 接口的形式
基于注解的支持 支持 支持 支持
限流 基于 QPS,支持基于调用关系的限流 有限的支持 Rate Limiter
流量整形 支持预热模式、匀速器模式、预热排队模式 不支持 简单的 Rate Limiter 模式
系统自适应保护 支持 不支持 不支持
控制台 提供开箱即用的控制台,可配置规则、查看秒级监控、机器发现等 简单的监控查看 不提供控制台,可对接其它监控系统

Sentinel概述

Sentinel主要分为两个部分:

  • 核心库:主要指Java客户端,不依赖任何框架/库,能够运行于Java7及以上的版本的运行时环境,同时对Dubbo/SpringCloud等框架也有较好的支持。
  • 控制台:控制台主要负责管理推送规则、监控、集群限流分配管理、机器发现等。

相关概念

  • 资源
    资源是Sentinel的关键概念。它可以是Java应用程序中的任何内容,例如,由应用程序提供的服务,或由应用程序调用的其它应用提供的服务,甚至可以是一段代码。只要通过Sentinel API定义的代码,就是资源,能够被 Sentinel 保护起来。大部分情况下,可以使用方法签名,URL,甚至服务名称作为资源名来标示资源。
  • 规则
    规则指的是围绕资源的实时状态设定的规则,可以包括流量控制规则、熔断降级规则以及系统保护规则。所有规则可以动态实时调整。

优点

  • 友好的控制面板
  • 支持实时监控。
  • 支持多种限流。支持QPS限流,线程数限流以及多种限流策略。
  • 支持多种降级模式,支持按平均返回时间降级,按多种异常数降级,按异常比率降级等。
  • 方便扩展开发,支持SPI模式对chain进行扩展。
  • 支持链路的关联,可以实现按链路统计限流,系统保护,热门资源保护等等。

快速开始

接下来,我们讨论一个SpringBoot的项目如何接入Sentinel控制台。

创建应用

整体步骤

  1. 创建SpringBoot项目。
  2. 在项目的pom.xml文件中引入sentinel-core的依赖坐标。
  3. 创建TestController,定义和使用限流规则。

创建SpringBoot项目

关于如何创建SpringBoot项目,我们不讨论,可以参考《21.SpringBoot [1/3]》

sentinel-core

在项目的pom.xml文件中引入sentinel-core依赖的坐标。

1
2
3
4
5
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.7</version>
</dependency>

定义使用限流规则

我们创建TestController,定义使用限流规则。

这里我们定义OPS类型的限流规则,每秒可接受的请求最多为2个,超过则返回给页面"系统繁忙,请稍候",不超过则返回给页面"Hello Sentinel!"。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.kakawanyifan.controller;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;

@RestController
public class TestController {

@GetMapping("hello")
public String hello(){

// 进行限流控制
// 限流入口
try (Entry entry = SphU.entry("Hello")){
// 被保护的资源
return "Hello Sentinel!";
} catch (BlockException e) {
e.printStackTrace();
// 被限流或者降级的处理
return "系统繁忙,请稍候";
}

}

/**
* 定义限流规则
* @PostConstruct :在构造函数执行完毕后执行
*/
@PostConstruct
public void initFlowRules(){
// 创建存放限流规则的集合
List<FlowRule> rules = new ArrayList<FlowRule>();

// 创建限流规则
FlowRule rule = new FlowRule();
// 定义资源
rule.setResource("Hello");
// 定义限流规则类型,RuleConstant.FLOW_GRADE_QPS:OPS类型
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// 定义OPS每秒最多只能通过的请求个数
rule.setCount(2);
//将限流规则添加到集合中
rules.add(rule);

//3.加载限流规则
FlowRuleManager.loadRules(rules);
}

}
  • @PostConstruct,在当前类的构造函数执行完毕后执行。

在启动项目的时候,我们会注意到,还有这么一段,是Sentinel日志的地址。

1
2
3
4
5
6
7
8
9
10
11
12

【部分运行结果略】

2024-03-14 20:35:00.706 INFO 8025 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 789 ms
INFO: Sentinel log output type is: file
INFO: Sentinel log charset is: utf-8
INFO: Sentinel log base directory is: /Users/kaka/logs/csp/
INFO: Sentinel log name use pid is: false
INFO: Sentinel log level is: INFO
2024-03-14 20:35:00.869 INFO 8025 --- [ main] o.s.b.a.w.s.WelcomePageHandlerMapping : Adding welcome page: class path resource [static/index.html]

【部分运行结果略】

运行测试

我们可以写一段脚本代码,快速的不断请求http://localhost:8080/hello

最简单粗暴的方法,我们快速的刷新浏览器。

系统繁忙

Sentinel控制台

Sentinel提供一个轻量级的开源控制台,它提供机器发现以及健康情况管理、实时监控,规则管理和推送的功能。

通过如下的地址,下载Sentinel控制台jar包,sentinel-dashboard。

sentinel-dashboard

启动Sentinel控制台需要JDK版本为1.8及以上版本,使用如下命令启动控制台:

示例代码:

1
java -Dserver.port=9000 -jar sentinel-dashboard-1.8.7.jar

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
INFO: Sentinel log output type is: file
INFO: Sentinel log charset is: utf-8
INFO: Sentinel log base directory is: /Users/kaka/logs/csp/
INFO: Sentinel log name use pid is: false
INFO: Sentinel log level is: INFO

. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.5.12)

2024-03-14 21:05:25.929 INFO 8812 --- [ main] c.a.c.s.dashboard.DashboardApplication : Starting DashboardApplication using Java 1.8.0_371 on 192.168.0.102 with PID 8812 (/Users/kaka/Downloads/sentinel-dashboard-1.8.7.jar started by kaka in /Users/kaka/Downloads)
2024-03-14 21:05:25.932 INFO 8812 --- [ main] c.a.c.s.dashboard.DashboardApplication : No active profile set, falling back to 1 default profile: "default"
2024-03-14 21:05:26.686 INFO 8812 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9000 (http)
2024-03-14 21:05:26.695 INFO 8812 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2024-03-14 21:05:26.695 INFO 8812 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.60]
2024-03-14 21:05:26.787 INFO 8812 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2024-03-14 21:05:26.787 INFO 8812 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 814 ms
2024-03-14 21:05:26.853 INFO 8812 --- [ main] c.a.c.s.dashboard.config.WebConfig : Sentinel servlet CommonFilter registered
2024-03-14 21:05:27.431 INFO 8812 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9000 (http) with context path ''
2024-03-14 21:05:27.438 INFO 8812 --- [ main] c.a.c.s.dashboard.DashboardApplication : Started DashboardApplication in 1.828 seconds (JVM running for 2.21)

通过浏览器打开http://localhost:9000,即可访问Sentinel控制台,默认用户名和密码都是sentinel

但是此时本地应用还没有接入到Sentinel控制台进行管理,所以接下来就要将本地应用接入到Sentinel控制台。

sentinel-dashboard

应用接入控制台

sentinel-transport-simple-http

在本地应用的pom.xml文件中引入依赖。

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.alibaba.csp/sentinel-transport-simple-http -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>1.8.7</version>
</dependency>

添加JVM的启动参数

添加JVM的启动参数

添加的参数有两个:

  • -Dcsp.sentinel.dashboard.server=localhost:9000
    设置Sentinel控制台的主机地址和端口。
  • -Dproject.name=S
    设置本地应用在Sentinel控制台中的名称。

需要注意的是,在配置完成后,应用不会主动连接上控制台,需要触发一次应用的规则才会开始进行初始化,并向控制台发送心跳和应用规则等信息。

动态限流规则调整

在上文,我们的限流规则,是写死在代码中的。

我们可以将@PostConstruct部分的整段都注释掉,然后在Sentinel控台的左侧菜单中选择流控规则即可对限流规则进行调整。

动态限流规则调整

定义资源

Sentinel中定义资源的方式有:

  • 抛出异常的方式定义资源
  • 返回布尔值方式定义资源
  • 异步调用支持
  • 注解方式定义资源
  • 主流框架的默认适配

抛出异常的方式定义资源

上文的案例,就是"抛出异常的方式定义资源"。

Sentinel中的SphU包含了try-catch风格的API,用这种方式,当资源发生了限流之后会抛出BlockException。这个时候可以捕捉异常,进行限流之后的逻辑处理。

具体参考上文的代码。

返回布尔值方式定义资源

Sentinel中的SphO提供if-else风格的API。用这种方式,当资源发生了限流之后会返回false,这个时候可以根据返回值,进行限流之后的逻辑处理。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.kakawanyifan.controller;

import com.alibaba.csp.sentinel.SphO;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

@GetMapping("hello")
public String hello() {

// 进行限流控制
// 限流入口
if (SphO.entry("B")) {
try {
// 被保护的资源
System.out.println("访问成功");
return "Hello Sentinel!";
} finally {
// SphO.entry(xxx)需要与 SphO.exit()方法成对出现
// 否则会导致调用链记录异常,抛出ErrorEntryFreeException异常
// 限流出口,
SphO.exit();
}
} else {
// 被限流或者降级的处理
System.out.println("系统繁忙,请稍候");
return "系统繁忙,请稍候";
}

}

}

注意:SphO.entry(xxx)需要与SphO.exit()方法成对出现,否则会导致调用链记录异常,抛出ErrorEntryFreeException异常。

异步调用支持

Sentinel支持异步调用链路的统计。在异步调用中,需要通过SphU.asyncEntry(xxx)方法定义资源,并通常需要在异步的回调函数中调用exit方法。

首先,我们需要在本地应用的引导类中添加@EnableAsync,表示SpringBoot项目开始异步调用支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.kakawanyifan;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}

然后,创建AsyncService编写异步调用的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.kakawanyifan.service;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
public class AsyncService {

//@Async表示方法为异步调用方法
@Async
public void hello(){
System.out.println("异步开始-----");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("完成异步----");
}
}

创建TestAsyncController,实现异步调用限流控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.kakawanyifan.controller;

import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.kakawanyifan.service.AsyncService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestAsyncController {

@Autowired
private AsyncService asyncService;

@GetMapping("async")
public String hello() {

// 进行限流控制
AsyncEntry asyncEntry = null;
try {
// 限流入口
asyncEntry = SphU.asyncEntry("A");
// 异步方法调用
asyncService.hello();
return "Hello Sentinel!";
} catch (BlockException e) {
// 被限流或者降级的处理
System.out.println("系统繁忙,请稍候");
return "系统繁忙,请稍候";
}finally {
if (asyncEntry != null){
// 限流出口
asyncEntry.exit();
}
}
}
}
  • 该例子也可以改成try-with-resources的形式。

注解方式定义资源

Sentinel支持通过@SentinelResource注解定义资源并配置blockHandler函数来进行限流之后的处理。

在本地应用的pom.xml文件中引入依赖sentinel-annotation-aspectj。因为Sentinel中使用AspectJ的扩展用于自动定义资源、处理BlockException等。

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.alibaba.csp/sentinel-annotation-aspectj -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-annotation-aspectj</artifactId>
<version>1.8.7</version>
</dependency>

创建AspectJ的配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.kakawanyifan.config;

import com.alibaba.csp.sentinel.annotation.aspectj.SentinelResourceAspect;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SentinelAspectConfiguration {
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
}

创建TestAnnController,实现限流控制。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.kakawanyifan.controller;

import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestAnnController {

//定义限流资源和限流讲解回调函数
@SentinelResource(value = "Ann", blockHandler = "exceptionHandler")
@GetMapping("ann")
public String hello() {
return "Hello Sentinel!";
}

// blockHandler函数,原方法调用被限流/降级/系统保护的时候调用
public String exceptionHandler(BlockException ex) {
ex.printStackTrace();
return "系统繁忙,请稍候";
}
}
  • @SentinelResource注解用来标识资源是否被限流、降级。上述例子中该注解的属性'Ann'表示资源名。此外,@SentinelResource还提供了其它额外的属性如blockHandler来指定被限流后的操作。

主流框架的默认适配

为了减少开发的复杂程度,对大部分的主流框架,例如WebServlet、Dubbo、SpringCloud、gRPC、SpringWebFlux、Reactor等都做了适配。我们只需要引入对应的依赖即可方便地整合Sentinel。

在这里以和SpringCloud的整合为例。

我们可以通过引入Spring Cloud Alibaba Sentinel来更方便地整合Sentinel。

创建一个新的SpringBoot项目,在项目中引入spring-cloud-starter-alibaba-sentinel依赖。

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-alibaba-sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2021.0.5.0</version>
</dependency>
  • 只需要引入这一个依赖,不需要引入sentinel-core等包。

在项目中创建TestController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.kakawanyifan.controller;

import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

//定义限流资源和限流讲解回调函数
@SentinelResource(value = "SSC", blockHandler = "exceptionHandler")
@GetMapping("ssc")
public String hello() {
return "Hello Sentinel!";
}

// blockHandler函数,原方法调用被限流/降级/系统保护的时候调用
public String exceptionHandler(BlockException ex) {
ex.printStackTrace();
return "系统繁忙,请稍候";
}
}

在application.properties中配置本地项目接入本地控制台

1
2
3
4
# 设置应用的名称
spring.application.name=SpringCloudSentinel
# 设置Sentinel连接控制台的主机地址和端口
spring.cloud.sentinel.transport.dashboard=localhost:9000
  • 不需修改JVM的参数了。

规则功能

分类

Sentinel的所有规则都可以在内存态中动态地查询及修改,修改之后立即生效。同时Sentinel也提供相关API,我们可以定制自己的规则策略。

Sentinel支持以下几种规则:

  • 流量控制规则
  • 熔断降级规则
  • 系统保护规则
  • 授权控制规则
  • 动态规则扩展

其中授权控制规则,我们不讨论,一般不会用Sentinel进行权限控制。

流量控制

上文,我们讨论的都是流量控制。

流量控制,flow control,其原理是监控应用流量的QPS或并发线程数等指标,当达到指定的阈值时对流量进行控制,以避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。

流量控制有两种方式:

  • 并发线程数:并发线程数限流用于保护业务线程数不被耗尽。
  • QPS:当QPS超过某个阈值的时候,采取措施进行流量控制。

一条限流规则主要由下面几个因素组成,我们可以组合这些元素来实现不同的限流效果:

  • resource:资源名,即限流规则的作用对象。
  • count:限流阈值。
  • grade:限流阈值类型(QPS或并发线程数)。
  • limitApp:流控针对的调用来源,若为default则不区分调用来源
  • strategy:调用关系限流策略。
  • controlBehavior:流量控制效果(直接拒绝、Warm Up、匀速排队)
    • 直接拒绝,RuleConstant.CONTROL_BEHAVIOR_DEFAULT,是默认的流量控制方式。当QPS超过任意规则的阈值后,新的请求就会被立即拒绝,拒绝方式为抛出FlowException。这种方式适用于对系统处理能力确切已知的情况下,比如通过压测确定了系统的准确水位时。
    • Warm Up,RuleConstant.CONTROL_BEHAVIOR_WARM_UP,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。
    • 排队等待,RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER,方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法。

我们可以对同一个资源同时使用多个限流规则,会依次进行检查。

熔断降级

什么是熔断降级

熔断降级,在调用链路中某个资源出现不稳定状态时(例如调用超时或异常比例升高),对这个资源的调用进行限制,让请求快速失败,避免影响到其它的资源而导致级联错误。当资源被降级后,在接下来的降级时间窗口之内,对该资源的调用都自动熔断(默认行为是抛出 DegradeException)。

重要的属性

Field 说明 默认值
resource 资源名,即限流规则的作用对象
count 阈值
grade 熔断策略,支持秒级 RT/秒级异常比例/分钟级异常数 秒级平均RT
timeWindow 降级的时间,单位为s
rtSlowRequestAmount RT模式下1秒内连续多少个请求的平均RT超出阈值方可触发熔断(1.7.0 引入) 5
minRequestAmount 异常熔断的触发最小请求数,请求数小于该值时即使异常比率超出阈值也不会熔断(1.7.0 引入) 5

同一个资源可以同时有多个降级规则。

策略详解

  • 平均响应时间,DEGRADE_GRADE_RT:当1s内持续进入N个请求,对应时刻的平均响应时间(秒级)均超过阈值(count,以ms为单位),那么在接下的时间(DegradeRule中的timeWindow,以s为单位)之内,对这个方法的调用都会自动地熔断(抛出DegradeException)。
  • 异常比例,DEGRADE_GRADE_EXCEPTION_RATIO:当资源的每秒请求量>= N(可配置),并且每秒异常总数占通过量的比值超过阈值(DegradeRule中的 count)之后,资源进入降级状态,即在接下的时间(DegradeRule中的timeWindow,以s为单位)之内,对这个方法的调用都会自动地返回。异常比率的阈值范围是[0.0, 1.0],代表0% - 100%
  • 异常数,DEGRADE_GRADE_EXCEPTION_COUNT:当资源近1分钟的异常数目超过阈值之后会进行熔断。注意由于统计时间是分钟级别的,若timeWindow小于60s,则结束熔断状态后仍可能再进入熔断状态。

例子

本文以如何使用平均响应时间DEGRADE_GRADE_RT为例。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.kakawanyifan.controller;

import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;

@RestController
public class TestController {

//定义限流资源和限流讲解回调函数
@SentinelResource(value = "SR",fallback = "exceptionHandler")
@GetMapping("degrade")
public String hello() {
return "Hello Sentinel!";
}

// 降级方法
public String exceptionHandler() {
return "系统繁忙,请稍候";
}

@PostConstruct
private void initDegradeRule() {
// 创建存放熔断降级规则的集合
List<DegradeRule> rules = new ArrayList<>();

// 创建熔断降级规则
DegradeRule rule = new DegradeRule();
// 定义资源
rule.setResource("SR");
// 阈值
rule.setCount(0.01);
// 定义规则类型,RuleConstant.DEGRADE_GRADE_RT:熔断降级(秒级 RT)类型
/*
* 当资源的平均响应时间超过阈值(DegradeRule中的count,以ms为单位)之后,资源进入准降级状态。
* 接下来如果持续进入 5 个请求,它们的 RT 都持续超过这个阈值,
* 那么在接下的时间窗口(DegradeRule 中的 timeWindow,以 s 为单位)之内,
* 将会抛出 DegradeException。
*/
rule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
// 降级的时间,单位为 s
rule.setTimeWindow(10);
// 将熔断降级规则添加到集合中
rules.add(rule);

// 加载熔断降级规则
DegradeRuleManager.loadRules(rules);
}

}

系统保护

什么是系统保护

Sentinel系统保护限流从整体维度对应用入口流量进行控制,结合应用的Load、CPU使用率、总体平均RT、入口QPS和并发线程数等几个维度的监控指标,通过系统保护的流控策略,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能保持最大吞吐量的同时保证系统整体的稳定性。

系统保护规则是应用整体维度的,而不是资源维度的,并且仅对入口流量生效。入口流量指的是进入应用的流量EntryType.IN,比如Web服务或Dubbo服务端接收的请求,都属于入口流量。

模式

系统保护支持以下的模式:

  • Load,仅对Linux/Unix-like机器生效,将系统的load1作为启发指标,进行系统保护。当系统load1超过设定的启发值,且系统当前的并发线程数超过估算的系统容量时才会触发系统保护(BBR阶段)。系统容量由系统的maxQps * minRt估算得出。设定参考值一般是CPU cores * 2.5
  • CPU usage,1.5.0+版本,当系统CPU使用率超过阈值即触发系统保护(取值范围0.0-1.0),比较灵敏。
  • 平均RT,当单台机器上所有入口流量的平均RT达到阈值即触发系统保护,单位是毫秒。
  • 并发线程数,当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。
  • 入口QPS,当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。

重要的属性

Field 说明 默认值
highestSystemLoad load1 触发值,用于触发保护控制阶段 -1 (不生效)
avgRt 所有入口流量的平均响应时间 -1 (不生效)
maxThread 入口流量的最大并发数 -1 (不生效)
qps 所有入口资源的 QPS -1 (不生效)
highestCpuUsage 当前系统的 CPU 使用率(0.0-1.0) -1 (不生效)

注意系统规则只针对入口资源(EntryType=IN)生效。

例子

本文以如何使用所有入口资源的QPS配置规则为例。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.kakawanyifan.controller;

import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.alibaba.csp.sentinel.slots.system.SystemRule;
import com.alibaba.csp.sentinel.slots.system.SystemRuleManager;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;

@RestController
public class SysController {
//定义限流资源和限流讲解回调函数
@SentinelResource(entryType = EntryType.IN)
@GetMapping("sys")
public String hello() {

return "Hello Sentinel!";
}

/**
* 定义系统保护规则
* @PostConstruct :在构造函数执行完毕后执行
*/
@PostConstruct
private void initSystemRule() {
//1.创建系统保护规则的集合
List<SystemRule> rules = new ArrayList<>();
//2.创建系统保护规则
SystemRule rule = new SystemRule();
//设置根据入口QPS规则
rule.setQps(2);
//将系统保护规则添加到集合中
rules.add(rule);

//3.加载系统保护规则
SystemRuleManager.loadRules(rules);
}

}

动态规则

什么是动态规则

上文不管是通过Java代码还是通过Sentinel控制台的方式去设置限流规则,都属于手动方式,不够灵活。

更建议通过动态规则源的方式来动态管理限流规则。
也就是说,很多时候限流规则会被存储在文件、数据库或者配置中心当中。Sentinel的DataSource接口给我们提供了对接任意配置源的能力。

官方推荐通过控制台设置规则后将规则推送到统一的规则管理中心,客户端实现ReadableDataSource接口端监听规则中心实时获取变更,流程如下:

注册中心

两种实现方式

  • 拉取式
    客户端主动向某个规则管理中心定期轮询拉取规则,这个规则管理中心可以是文件,甚至是VCS等。这样做的方式是简单,缺点是无法及时获取变更;实现拉模式的数据源最简单的方式是继承AutoRefreshDataSource抽象类,然后实现readSource()方法,在该方法里从指定数据源读取字符串格式的配置数据。
  • 推送式
    规则管理中心统一推送,客户端通过注册监听器的方式时刻监听变化,比如使用Zookeeper、Apollo等作为规则管理中心。这种方式有更好的实时性和一致性保证。实现推模式的数据源最简单的方式是继承AbstractDataSource抽象类,在其构造方法中添加监听器,并实现readSource()从指定数据源读取字符串格式的配置数据。

例子

本文演示如何使用zookeeper配置规则。
Sentinel针对ZooKeeper作了相应适配,底层可以采用ZooKeeper作为规则配置数据源。使用时只需添加sentinel-datasource-zookeeper。

关于Zookeeper的安装,可以参考《20.Dubbo和Zookeeper》

新建工程,在pom.xml文件引入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
<!--Spring Cloud Alibaba Sentinel依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2021.0.5.0</version>
</dependency>

<!--sentinel适配zookeeper的依赖-->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-zookeeper</artifactId>
<version>1.8.7</version>
</dependency>

在application.properties中配置连接sentinel控制台

1
2
3
4
# 设置应用的名称
spring.application.name=SentinelZookeeper
# 设置Sentinel连接控制台的主机地址和端口
spring.cloud.sentinel.transport.dashboard=localhost:9000

创建ZookeeperSentinelConfig,设置客户端修改获取规则的地方为从zookeeper获取规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.kakawanyifan.config;

import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.datasource.zookeeper.ZookeeperDataSource;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.util.List;

@Configuration
public class ZookeeperSentinelConfig {

/**
* 连接zookeeper获取规则
*/
@PostConstruct
public void loadRules(){
//Zookeeper 服务端的连接地址
String remoteAddress = "127.0.0.1:2181";
//Zookeeper中的数据路径
String path = "/Sentinel/zookeeper";


//构建资源
//参数1:zookeeper服务端地址
//参数2:zookeeper数据路径
//参数3:设置存放数据类型
ReadableDataSource<String, List<FlowRule>> readableDataSource = new ZookeeperDataSource<>(
remoteAddress,
path,
source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})
);

//将数据资源注册到FlowRuleManager
FlowRuleManager.register2Property(readableDataSource.getProperty());
}
}

创建ZookeeperController,设置流控资源配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.kakawanyifan.controller;

import com.alibaba.csp.sentinel.annotation.SentinelResource;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ZookeeperController {
//定义限流资源和限流讲解回调函数
@SentinelResource(value = "SZ",blockHandler = "exceptionHandler")
@GetMapping("zookeeper")
public String hello() {

return "Hello Sentinel!";
}

// blockHandler函数,原方法调用被限流/降级/系统保护的时候调用
public String exceptionHandler(BlockException ex) {
ex.printStackTrace();
return "系统繁忙,请稍候";
}
}

创建单元测试,编写代码,模拟往zookeeper中传递规则,然后再从客户端获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.kakawanyifan;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
class SzApplicationTests {

@Test
public void contextLoads() throws Exception {
//zookeeper的服务端连接地址
final String remoteAddress = "127.0.0.1:2181";

//发送的规则
//resource : 资源名
//controlBehavior:流控效果
//count:阀值
//grade:规则类型
//limitApp:调用来源
//strategy:判断根据是资源自身,还是根据其他关联资源,还是根据链路入口
final String rule = "[\n"
+ " {\n"
+ " \"resource\": \"SZ\",\n"
+ " \"controlBehavior\": 0,\n"
+ " \"count\": 2.0,\n"
+ " \"grade\": 1,\n"
+ " \"limitApp\": \"default\",\n"
+ " \"strategy\": 0\n"
+ " }\n"
+ "]";
//创建连接zookeeper
CuratorFramework zkClient = CuratorFrameworkFactory.newClient(remoteAddress, new ExponentialBackoffRetry(1000, 3));
//开始连接
zkClient.start();
//配置zookeeper数据路径
String path = "/Sentinel/zookeeper";
Stat stat = zkClient.checkExists().forPath(path);
//发送数据给zookeeper
if (stat == null) {
zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, null);
}
zkClient.setData().forPath(path, rule.getBytes());

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}

//关闭连接
zkClient.close();

}


}

注意

注意,官方的Sentinel Dashboard不支持接入Zookeeper,我们需要额外进行改造。

限流算法(部分源码解读)

常见限流算法有:

  1. 计数器算法
  2. 滑动时间窗算法
  3. 漏桶算法
  4. 令牌桶算法

计数器限流算法

我们可以直接通过一个计数器,限制每一秒钟能够接收的请求数。比如说QPS定为1000,那么实现思路就是从第一个请求进来开始计时,在接下去的1s内,每来一个请求,就把计数加1,如果累加的数字达到了1000,那么后续的请求就会被全部拒绝。等到1s结束后,把计数恢复成0,重新开始计数。

计数器限流算法

优点:实现简单

缺点:如果1s内的前半秒,已经通过了1000个请求,那后面的半秒只能请求拒绝,我们把这种现象称为"突刺现象"。

实现代码案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Counter {
public long timeStamp = getNowTime();
public int reqCount = 0;
public final int limit = 100; // 时间窗口内最大请求数
public final long interval = 1000; // 时间窗口ms

public boolean limit() {
long now = getNowTime();
if (now < timeStamp + interval) {
// 在时间窗口内
reqCount++;
// 判断当前时间窗口内是否超过最大请求控制数
return reqCount <= limit;
} else {
timeStamp = now;
// 超时后重置
reqCount = 1;
return true;
}
}

public long getNowTime() {
return System.currentTimeMillis();
}
}

滑动时间窗算法

滑动窗口,又称Rolling Window。为了解决计数器算法的缺陷,我们引入了滑动窗口算法。下面这张图,很好地解释了滑动窗口算法:

滑动时间窗算法

如上图中,整个红色的矩形框表示一个时间窗口,在我们的例子中,一个时间窗口就是一分钟。然后我们将时间窗口进行划分,比如图中,我们就将滑动窗口划成了6格,所以每格代表的是10秒钟。每过10秒钟,我们的时间窗口就会往右滑动一格。每一个格子都有自己独立的计数器counter,比如当一个请求在0:35秒的时候到达,那么0:30~0:39对应的counter就会加1。

那么滑动窗口怎么解决刚才的临界问题的呢?
我们可以看上图,0:59到达的100个请求会落在灰色的格子中,而1:00到达的请求会落在橘黄色的格子中。当时间到达1:00时,我们的窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是200个,超过了限定的100个,所以此时能够检测出来触发了限流。

我再来回顾一下刚才的计数器算法,我们可以发现,计数器算法其实就是滑动窗口算法。只是它没有对时间窗口做进一步地划分,所以只有1格。
由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

实现代码案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class SlideWindow {

/** 队列id和队列的映射关系,队列里面存储的是每一次通过时候的时间戳,这样可以使得程序里有多个限流队列 */
private volatile static Map<String, List<Long>> MAP = new ConcurrentHashMap<>();

private SlideWindow() {}

public static void main(String[] args) throws InterruptedException {
while (true) {
// 任意10秒内,只允许2次通过
System.out.println(LocalTime.now().toString() + SlideWindow.isGo("ListId", 2, 10000L));
// 睡眠0-10秒
Thread.sleep(1000 * new Random().nextInt(10));
}
}

/**
* 滑动时间窗口限流算法
* 在指定时间窗口,指定限制次数内,是否允许通过
*
* @param listId 队列id
* @param count 限制次数
* @param timeWindow 时间窗口大小
* @return 是否允许通过
*/
public static synchronized boolean isGo(String listId, int count, long timeWindow) {
// 获取当前时间
long nowTime = System.currentTimeMillis();
// 根据队列id,取出对应的限流队列,若没有则创建
List<Long> list = MAP.computeIfAbsent(listId, k -> new LinkedList<>());
// 如果队列还没满,则允许通过,并添加当前时间戳到队列开始位置
if (list.size() < count) {
list.add(0, nowTime);
return true;
}

// 队列已满(达到限制次数),则获取队列中最早添加的时间戳
Long farTime = list.get(count - 1);
// 用当前时间戳 减去 最早添加的时间戳
if (nowTime - farTime <= timeWindow) {
// 若结果小于等于timeWindow,则说明在timeWindow内,通过的次数大于count
// 不允许通过
return false;
} else {
// 若结果大于timeWindow,则说明在timeWindow内,通过的次数小于等于count
// 允许通过,并删除最早添加的时间戳,将当前时间添加到队列开始位置
list.remove(count - 1);
list.add(0, nowTime);
return true;
}
}

}

在Sentinel中通过LeapArray结构来实现时间窗算法,它的核心代码如下(只列举获取时间窗方法):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* 获取当前的时间窗
*
* Get bucket item at provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}

int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
// 计算窗口的开始时间,计算每个格子的开始时间
long windowStart = calculateWindowStart(timeMillis);

/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
// 如果没有窗格,创建窗格
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
// 当前窗格存在,返回历史窗格
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
//
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
// 清空所有的窗格数据
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
// 如果时钟回拨,重新创建时间格
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}

漏桶算法

漏桶算法(Leaky Bucket)是网络中流量整形(Traffic Shaping)或速率限制(Rate Limiting)时经常使用的一种算法,它的主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。

漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量,执行过程如下图所示。

实现代码案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class LeakyBucket {
public long timeStamp = System.currentTimeMillis(); // 当前时间
public long capacity; // 桶的容量
public long rate; // 水漏出的速度
public long water; // 当前水量(当前累积请求数)

public boolean grant() {
long now = System.currentTimeMillis();
// 先执行漏水,计算剩余水量
water = Math.max(0, water - (now - timeStamp) * rate);

timeStamp = now;
if ((water + 1) < capacity) {
// 尝试加水,并且水还未满
water += 1;
return true;
} else {
// 水满,拒绝加水
return false;
}
}
}

解释说明:

  1. 未满加水:通过代码water+=1进行不停加水的动作。
  2. 漏水:通过时间差来计算漏水量。
  3. 剩余水量:总水量-漏水量。

在Sentine中RateLimiterController实现了了漏桶算法,核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}

long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
// 计算时间间隔
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

// Expected pass time of this request.
// 期望的执行时间
long expectedTime = costTime + latestPassedTime.get();

// 当前时间 > 期望时间
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
// 可以通过,并且设置最后通过时间
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
// 等待时间 = 期望时间 - 最后时间 - 当前时间
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 等待时间 > 最大排队时间
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
// 上次时间 + 间隔时间
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 等待时间
waitTime = oldTime - TimeUtil.currentTimeMillis();
// 等待时间 > 最大排队时间
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
// 休眠等待
if (waitTime > 0) {
Thread.sleep(waitTime);
}
// 等待完了,就放行
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}

令牌桶算法

令牌桶算法是网络中流量整形(Traffic Shaping)或速率限制(Rate Limiting)时经常使用的一种算法。

典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。如下图所示:

令牌桶算法

简单的说就是,一边请求时会消耗桶内的令牌,另一边会以固定速率往桶内放令牌。当消耗的请求大于放入的速率时,进行相应的措施,比如等待,或者拒绝等。

实现代码案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class TokenBucket {
public long timeStamp = System.currentTimeMillis(); // 当前时间
public long capacity; // 桶的容量
public long rate; // 令牌放入速度
public long tokens; // 当前令牌数量

public boolean grant() {
long now = System.currentTimeMillis();
// 先添加令牌
tokens = Math.min(capacity, tokens + (now - timeStamp) * rate);
timeStamp = now;
if (tokens < 1) {
// 若不到1个令牌,则拒绝
return false;
} else {
// 还有令牌,领取令牌
tokens -= 1;
return true;
}
}
}

Sentinel在WarmUpController中运用到了令牌桶算法,在这里可以实现对系统的预热,设定预热时间和水位线,对于预热期间多余的请求直接拒绝掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();

long previousQps = (long) node.previousPassQps();
syncToken(previousQps);

// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}

return false;
}

限流算法比较

计数器与时间窗

  • 时间窗算法的本质也是通过计数器算法实现的。
  • 时间窗算法格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确,但是也会占用更多的内存存储。

漏桶与令牌桶

漏桶算法和令牌桶算法本质上是为了做流量整形或速率限制,避免系统因为大流量而被打崩,但是两者的核心差异在于限流的方向是相反的。

  • 漏桶:限制的是流量的流出速率,是相对固定的。
  • 令牌桶:限制的是流量的平均流入速率,并且允许一定程度的突然性流量,最大速率为桶的容量和生成token的速率。

在某些场景中,漏桶算法并不能有效的使用网络资源,因为漏桶的漏出速率是相对固定的,所以在网络情况比较好并且没有拥塞的状态下,漏桶依然是会有限制的,并不能放开量,因此并不能有效的利用网络资源。而令牌桶算法则不同,其在限制平均速率的同时,支持一定程度的突发流量。

文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/10826
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

留言板