抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

全链路压测架构设计

全链路压测的意义

​ 上图是 2012 年淘宝核心业务应用关系的拓扑图,还不包含了其他的非核心业务应用,所谓的核心业务就是和交易相关的,和钱相关的业务。这张图大家可能看不清楚,看不清楚才是正常的,因为当时的阿里应用数量之多、应用间关系之混乱靠人工确实已经无法理清楚了。

​ 在真实的业务场景种,每个系统的压力都比较大,而系统之间是有相互依赖关系的,单机压测没有考虑到依赖环节压力都比较大的情况,会引入一个不确定的误差。这就好比,我们要生产一个仪表,每一个零件都经过了严密的测试,最终把零件组装成一个仪表,仪表的工作状态会是什么样的并不清楚。

2链路压测方案刨析

线下压测

​ 顾名思义就是在测试环境进行压测,且是针对一些重点项目这种测试手段,因为测试环境硬件资源以及压测数据与线上差别太大并且服务间依赖关系错综复杂,测试环境很难模拟且不够稳定,压测出来的数据指标参考价值不大,难以用测试环境得出的结果推导生产真实容量。

预生产环境压测

​ 这个一般是将生成环境的硬件以及软件同步复制到与生产环境一份,然后对服务内部的外部调用接口进行拦截,然后进行压测这样可以评估出来生产环境的真实容量以及达到压测的目的,但是成本非常高,需要将生产环境的硬件完全的复制一份,并未维护成本非常高,部署的时候需要同步的在预生产环境进行部署,以及压测代码的更改。

引流压测

​ 随着业务量的不断增长,考虑到线下测试结果的准确性,开始尝试生产压测,这种压测手段,我们称之为引流压测。事实上没有真正的模拟放大压力进行测试,而是一种通过缩小在线服务集群数的方式来放大单机处理量。比如一个业务系统的集群有100个节点,将其中90个节点模拟下线或转发流量到剩余的10个节点上实施压测。

​ 引流压测的弊端在于,DB承受压力不变,上下游系统的压力不变。压测结果仅能代表单个应用的性能,但往往无法识别链路和架构级的隐患,而且在引流过程中倘若出现异常或突如其来的业务高峰,很容易造成生产故障。

全链路压测

​ 随着微服务架构的流行,服务按照不同的维度进行拆分,一次请求往往需要涉及到多个服务。互联网应用构建在不同的软件模块集上,这些软件模块,有可能是由不同的团队开发、可能使用不同的编程语言来实现、有可能布在了几千台服务器,横跨多个不同的数据中心。因此,就需要一些可以帮助理解系统行为、用于分析性能问题的工具,以便发生故障的时候,能够快速定位和解决问题,但是他的缺点也很明显就是需要的技术难度很高,需要克服流量染色数据隔离日志隔离风险熔断等技术难题,因位在生产环境压测,所以控制不好风险也是非常高的。

​ 所以,在复杂的微服务架构系统中,几乎每一个前端请求都会形成一个复杂的分布式服务调用链路。一个请求完整调用链可能如下图所示:

四种压测方案对比

压测效果 技术难度 机器成本 维护成本 风险
线下压测
预生产压测
引流压测
全链路压测

全链路压测概述

什么是全链路压测

​ 基于实际的生产业务场景、生产环境,模拟海量的用户请求和数据对整个业务链(通常是核心业务链)进行压力测试,并持续调优的过程。

解决什么问题

​ 解决在业务场景越发复杂化、海量数据冲击下系统整个业务链的可用性、服务能力的瓶颈,以及容量规划等问题。

精确的容量规划
为什么需要容量规划

什么时候增减机器、保障系统稳定性、节约成本

​ 容量规划的目的在于让每一个业务系统能够清晰地知道:什么时候该加机器、什么时候应该减机器?双11等大促场景需要准备多少机器,既能保障系统稳定性、又能节约成本

容量规划四步走
  1. 业务流量预估阶段:通过历史数据分析未来某一个时间点业务的访问量会有多大
  2. 系统容量评估阶段:初步计算每一个系统需要分配多少机器
  3. 容量的精调阶段:通过全链路压测来模拟大促时刻的用户行为,在验证站点能力的同时对整个站点的容量水位进行精细调整
  4. 流量控制阶段:对系统配置限流阈值等系统保护措施,防止实际的业务流量超过预估业务流量的情况下,系统无法提供正常服务流量控制阶段:对系统配置限流阈值等系统保护措施,防止实际的业务流量超过预估业务流量的情况下,系统无法提供正常服务
进行全链路的性能监控

全链路性能监控 从整体维度到局部维度展示各项指标,将跨应用的所有调用链性能信息集中展现,可方便度量整体和局部性能,并且方便找到故障产生的源头,生产上可极大缩短故障排除时间。

  • 保证系统稳定性:可能提前预估系统存在的各种问题,提前模拟高并发场景,有备无患。
  • 请求链路追踪,故障快速定位:可以通过调用链结合业务日志快速定位错误信息。
  • 精准的容量评估:能够定位到最需要扩容的服务,帮助公司用最低的成本满足业务的性能要求
  • 真实的性能验证:能够在生成环境以最真实的环境来验证系统的真实性能。
  • 数据分析,优化链路:可以得到用户的行为路径,汇总分析应用在很多业务场景。

如何展开全链路压测

业务模型梳理
  • 首先应该将核心业务和非核心业务进行拆分,确认流量高峰针对的是哪些业务场景和模块,针对性的进行扩容准备。
  • 梳理出对外的接口:使用MOCK(模拟)方式做挡板。
  • 千万不要污染正常数据:认真梳理数据处理的每一个环节,确保 mock 数据的处理结果不会写入到正常库里面
数据模型构建
  • 数据的真实性和可用性:可以从生产环境完全移植一份当量的数据包,作为压测的基础数据,然后基于基础数据,通过分析历史数据增长趋势,预估当前可能的数据量
  • 数据隔离:千万千万不要污染正常数据:认真梳理数据处理的每一个环节,可以考虑通过压测数据隔离处理,落入影子库,mock 对象等手段,来防止数据污染
压测工具选型

​ 使用分布式压测的手段来进行用户请求模拟,目前有很多的开源工具可以提供分布式压测的方式,比如JMeter、nGrinder、Locust等。

业务模块介绍

现在我们对整体的业务进行介绍以及演示

全链路整体架构

上面介绍了为什么需要全链路压测,下面来看下全链路压测的整体架构。

​ 整体架构如下主要是对压测客户端的压测数据染色,全链路中间件识别出染色数据,并将正常数据和压测数据区分开,进行数据隔离,这里主要涉及到mysql数据库,RabbitMQ,Redis,还需要处理因为hystrix线程池不能通过ThreadLocal传递染色表示的问题。

需要应对的问题

业务问题

如何开展全链路压测?在说这个问题前,我们先考虑下,全链路压测有哪些问题比较难解决。

  1. 涉及的系统太多,牵扯的开发人员太多

    ​ 在压测过程中,做一个全链路的压测一般会涉及到大量的系统,在整个压测过程中,光各个产品的人员协调就是一个比较大的工程,牵扯到太多的产品经理和开发人员,如果公司对全链路压测早期没有足够的重视,那么这个压测工作是非常难开展的。

  2. 模拟的测试数据和访问流量不真实

    ​ 在压测过程中经常会遇到压测后得到的数据不准确的问题,这就使得压测出的数据参考性不强,为什么会产生这样的问题?主要就是因为压测的环境可能和生成环境存在误差、参数存在不一样的地方、测试数据存在不一样的地方这些因素综合起来导致测试结果的不可信。

  3. 压测生产数据未隔离,影响生产环境

    ​ 在全链路压测过程中,压测数据可能会影响到生产环境的真实数据,举个例子,电商系统在生产环境进行全链路压测的时候可能会有很多压测模拟用户去下单,如果不做处理,直接下单的话会导致系统一下子会产生很多废订单,从而影响到库存和生产订单数据,影响到日常的正常运营。

技术问题
探针的性能消耗

APM组件服务的影响应该做到足够小。

服务调用埋点本身会带来性能损耗,这就需要调用跟踪的低损耗,实际中还会通过配置采样率的方式,选择一部分请求去分析请求路径。在一些高度优化过的服务,即使一点点损耗也会很容易察觉到,而且有可能迫使在线服务的部署团队不得不将跟踪系统关停。

代码的侵入性

即也作为业务组件,应当尽可能少入侵或者无入侵其他业务系统,对于使用方透明,减少开发人员的负担。

​ 对于应用的程序员来说,是不需要知道有跟踪系统这回事的。如果一个跟踪系统想生效,就必须需要依赖应用的开发者主动配合,那么这个跟踪系统也太脆弱了,往往由于跟踪系统在应用中植入代码的bug或疏忽导致应用出问题,这样才是无法满足对跟踪系统“无所不在的部署”这个需求。

可扩展性

​ 一个优秀的调用跟踪系统必须支持分布式部署,具备良好的可扩展性。能够支持的组件越多当然越好。或者提供便捷的插件开发API,对于一些没有监控到的组件,应用开发者也可以自行扩展。

数据的分析

​ 数据的分析要快 ,分析的维度尽可能多。跟踪系统能提供足够快的信息反馈,就可以对生产环境下的异常状况做出快速反应。分析的全面,能够避免二次开发。

全链路压测核心技术

上面从总体架构层面分析了全链路压测的核心,下面就分析下全链路压测用到的核心技术点

全链路流量染色

做到微服务和中间件的染色标志的穿透

​ 通过压测平台对输出的压力请求打上标识,在订单系统中提取压测标识,确保完整的程序上下文都持有该标识,并且能够穿透微服务以及各种中间件,比如 MQ,hystrix,Fegin等。

全链路服务监控

需要能够实时监控服务的运行状况以及分析服务的调用链,我们采用skywalking进行服务监控和压测分析

全链路日志隔离

做到日志隔离,防止污染生产日志

​ 当订单系统向磁盘或外设输出日志时,若流量是被标记的压测流量,则将日志隔离输出,避免影响生产日志。

全链路风险熔断

流量控制,防止流量超载,导致集群不可用

​ 当订单系统访问会员系统时,通过RPC协议延续压测标识到会员系统,两个系统之间服务通讯将会有白黑名单开关来控制流量流入许可。该方案设计可以一定程度上避免下游系统出现瓶颈或不支持压测所带来的风险,这里可以采用Sentinel来实现风险熔断。

全链路数据隔离

对各种存储服务以及中间件做到数据隔离,方式数据污染

数据库隔离

​ 当会员系统访问数据库时,在持久化层同样会根据压测标识进行路由访问压测数据表。数据隔离的手段有多种,比如影子库影子表,或者影子数据,三种方案的仿真度会有一定的差异,他们的对比如下。

隔离性 兼容性 安全级别 技术难度
影子库
影子表
影子数据
消息队列隔离

​ 当我们生产的消息扔到MQ之后,接着让消费者进行消费,这个没有问题,压测的数据不能够直接扔到MQ中的,因为它会被正常的消费者消费到的,要做好数据隔离,方案有队列隔离消息隔离,他们对比如下。

隔离性 兼容性 安全级别 技术难度
队列隔离
消息隔离
Redis 隔离

​ 通过 key 值来区分,压测流量的 key 值加统一后缀,通过改造RedisTemplate来实现key的路由。

框架实现

流量染色方案

上面分析了从整体分析了全链路压测用的的核心技术,下面就来实现第一个流量染色。

流量识别

​ 要想压测的流量和数据不影响线上真实的生产数据,就需要线上的集群能识别出压测的流量,只要能识别出压测请求的流量,那么流量触发的读写操作就很好统一去做隔离了。

全链路压测发起的都是Http的请求,只需要要请求头上添加统一的压测请求头。

​ 通过在请求协议中添加压测请求的标识,在不同服务的相互调用时,一路透传下去,这样每一个服务都能识别出压测的请求流量,这样做的好处是与业务完全的解耦,只需要应用框架进行感知,对业务方代码无侵入。

MVC接收数据

​ 客户端传递过来的数据可以通过获取Header的方式获取到,并将其设置进当前的ThreadLocal,交给后面的方法使用。

MVC拦截器实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 链路跟踪Request设置值
*/
public class MvcWormholeWebInterceptor implements WebRequestInterceptor {


@Override
public void preHandle(WebRequest webRequest) {
//失效上下文,解决Tomcat线程复用问题
WormholeContextHolder.invalidContext();
String wormholeValue = webRequest.getHeader(WormholeContextHolder.WORMHOLE_REQUEST_MARK);
if (StringUtils.isNotEmpty(wormholeValue)) {
WormholeContextHolder.setContext(new WormholeContext(wormholeValue));
}
}

@Override
public void postHandle(WebRequest webRequest, ModelMap modelMap) throws Exception {

}

@Override
public void afterCompletion(WebRequest webRequest, Exception e) throws Exception {

}
}

Tomcat线程复用问题

​ tomcat默认使用线程池来管理线程,一个请求过来,如果线程池里面有空闲的线程,那么会在线程池里面取一个线程来处理该请求,一旦该线程当前在处理请求,其他请求就不会被分配到该线程上,直到该请求处理完成。请求处理完成后,会将该线程重新加入线程池,因为是通过线程池复用线程,就会如果线程内部的ThreadLocal没有清除就会出现问题,需要新的请求进来的时候,清除ThreadLocal。

Fegin传递传递染色标识

​ 我们项目的微服务是使用Fegin来实现远程调用的,跨微服务传递染色标识是通过MVC拦截器获取到请求Header的染色标识,并放进ThreadLocal中,然后交给Fegin拦截器在发送请求之前从ThreadLocal中获取到染色标识,并放进Fegin构建请求的Header中,实现微服务之间的火炬传递。

代码实现
1
2
3
4
5
6
7
8
9
10
public class WormholeFeignRequestInterceptor implements RequestInterceptor {

@Override
public void apply(RequestTemplate requestTemplate) {
WormholeContext wormholeContext = WormholeContextHolder.getContext();
if (null != wormholeContext) {
requestTemplate.header(WormholeContextHolder.WORMHOLE_REQUEST_MARK, wormholeContext.toString());
}
}
}
Hystrix传递染色标识
Hystrix隔离技术

Hystrix 实现资源隔离,主要有两种技术:

信号量

​ 信号量的资源隔离只是起到一个开关的作用,比如,服务 A 的信号量大小为 10,那么就是说它同时只允许有 10 个 tomcat 线程来访问服务 A,其它的请求都会被拒绝,从而达到资源隔离和限流保护的作用。

线程池

​ 线程池隔离技术,是用 Hystrix 自己的线程去执行调用;而信号量隔离技术,是直接让 tomcat 线程去调用依赖服务。信号量隔离,只是一道关卡,信号量有多少,就允许多少个 tomcat 线程通过它,然后去执行。

Hystrix穿透

​ 如果使用线程池模式,那么存在一个ThreadLocal变量跨线程传递的问题,即在主线程的ThreadLocal变量,无法在线程池中使用,不过Hystrix内部提供了解决方案。

封装Callable任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final class DelegatingWormholeContextCallable<V> implements Callable<V> {
private final Callable<V> delegate;
// 用户信息上下文(根据项目实际情况定义ThreadLocal上下文)
private WormholeContext orginWormholeContext;

public DelegatingWormholeContextCallable(Callable<V> delegate,
WormholeContext wormholeContext) {
this.delegate = delegate;
this.orginWormholeContext = wormholeContext;
}

public V call() throws Exception {
//防止线程复用销毁ThreadLocal的数据
WormholeContextHolder.invalidContext();
// 将当前的用户上下文设置进Hystrix线程的TreadLocal中
WormholeContextHolder.setContext(orginWormholeContext);
try {
return delegate.call();
} finally {
// 执行完毕,记得清理ThreadLocal资源
WormholeContextHolder.invalidContext();
}
}

public static <V> Callable<V> create(Callable<V> delegate,
WormholeContext wormholeContext) {
return new DelegatingWormholeContextCallable<V>(delegate, wormholeContext);
}
}

实现Hystrix的并发策略类

因为Hystrix默认的并发策略不支持ThreadLocal传递,我们可以自定义并发策略类继承HystrixConcurrencyStrategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ThreadLocalAwareStrategy extends HystrixConcurrencyStrategy {

// 最简单的方式就是引入现有的并发策略,进行功能扩展
private final HystrixConcurrencyStrategy existingConcurrencyStrategy;

public ThreadLocalAwareStrategy(
HystrixConcurrencyStrategy existingConcurrencyStrategy) {
this.existingConcurrencyStrategy = existingConcurrencyStrategy;
}

@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize)
: super.getBlockingQueue(maxQueueSize);
}

@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getRequestVariable(rv)
: super.getRequestVariable(rv);
}

@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize,
maximumPoolSize, keepAliveTime, unit, workQueue)
: super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}

@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy
.wrapCallable(new DelegatingWormholeContextCallable<>(callable, WormholeContextHolder.getContext()))
: super.wrapCallable(new DelegatingWormholeContextCallable<T>(callable, WormholeContextHolder.getContext()));
}
}

Hystrix注入新并发策略并进行刷新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class HystrixThreadLocalConfiguration {

@Autowired(required = false)
private HystrixConcurrencyStrategy existingConcurrencyStrategy;

@PostConstruct
public void init() {
// Keeps references of existing Hystrix plugins.
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance()
.getCommandExecutionHook();

HystrixPlugins.reset();

HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalAwareStrategy(existingConcurrencyStrategy));
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
}
}

数据隔离方案

JDBC数据源隔离

数据隔离需要对DB,Redis,RabbitMQ进行数据隔离

​ 通过实现Spring动态数据源AbstractRoutingDataSource,通过ThreadLocal识别出来压测数据,如果是压测数据就路由到影子库,如果是正常流量则路由到主库,通过流量识别的改造,各个服务都已经能够识别出压测的请求流量了。

代码实现

数据源路由Key持有对象

根据路由Key将选择将操作路由给那个数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 动态数据源上下文
*/
public class DynamicDataSourceContextHolder {
public static final String PRIMARY_DB = "primary";
public static final String SHADOW_DB = "shadow";

private static final ThreadLocal<String> contextHolder = new ThreadLocal<String>() {
/**
* 将 master 数据源的 key作为默认数据源的 key
*/
@Override
protected String initialValue() {
return PRIMARY_DB;
}
};


/**
* 数据源的 key集合,用于切换时判断数据源是否存在
*/
public static List<Object> dataSourceKeys = new ArrayList<>();

/**
* 切换数据源
*
* @param key
*/
public static void setDataSourceKey(String key) {
contextHolder.set(key);
}

/**
* 获取数据源
*
* @return
*/
public static String getDataSourceKey() {
return contextHolder.get();
}

/**
* 重置数据源
*/
public static void clearDataSourceKey() {
contextHolder.remove();
}

/**
* 判断是否包含数据源
*
* @param key 数据源key
* @return
*/
public static boolean containDataSourceKey(String key) {
return dataSourceKeys.contains(key);
}

/**
* 添加数据源keys
*
* @param keys
* @return
*/
public static boolean addDataSourceKeys(Collection<? extends Object> keys) {
return dataSourceKeys.addAll(keys);
}
}

动态数据源实现类

根据路由Key实现数据源的切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 动态数据源实现类
*/
public class DynamicDataSource extends AbstractRoutingDataSource {

/**
* 如果不希望数据源在启动配置时就加载好,可以定制这个方法,从任何你希望的地方读取并返回数据源
* 比如从数据库、文件、外部接口等读取数据源信息,并最终返回一个DataSource实现类对象即可
*/
@Override
protected DataSource determineTargetDataSource() {
//获取当前的上下文
WormholeContext wormholeContext = WormholeContextHolder.getContext();
//如果不为空使用影子库
if (null != wormholeContext) {
DynamicDataSourceContextHolder.setDataSourceKey(DynamicDataSourceContextHolder.SHADOW_DB);
} else {
//为空则使用主数据源
DynamicDataSourceContextHolder.setDataSourceKey(DynamicDataSourceContextHolder.PRIMARY_DB);
}
return super.determineTargetDataSource();
}

/**
* 如果希望所有数据源在启动配置时就加载好,这里通过设置数据源Key值来切换数据,定制这个方法
*/
@Override
protected Object determineCurrentLookupKey() {
return DynamicDataSourceContextHolder.getDataSourceKey();
}


}
Redis 数据源隔离

​ 同时通过ThreadLocal识别出来压测数据,自定义Redis的主键的序列化方式,如果是压测数据则在主键后面加上后缀,这样就可以通过不同主键将Redis数据进行隔离。

实现key序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
public class KeyStringRedisSerializer extends StringRedisSerializer {

@Resource
private WormholeIsolationConfiguration isolationConfiguration;

public byte[] serialize(@Nullable String redisKey) {
WormholeContext wormholeContext = WormholeContextHolder.getContext();
if (null != wormholeContext) {
redisKey = isolationConfiguration.generateIsolationKey(redisKey);
}
return super.serialize(redisKey);
}
}
配置序列化器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Redis 配置类
*/
@Configuration
@ConditionalOnClass({RedisTemplate.class, RedisOperations.class, RedisConnectionFactory.class})
public class WormholeRedisAutoConfiguration {


@Bean
public KeyStringRedisSerializer keyStringRedisSerializer() {
return new KeyStringRedisSerializer();
}

@Bean("redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate template = new RedisTemplate();
//使用fastjson序列化
FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class);
// value值的序列化采用fastJsonRedisSerializer
template.setValueSerializer(fastJsonRedisSerializer);
template.setHashValueSerializer(fastJsonRedisSerializer);
// key的序列化采用StringRedisSerializer
template.setKeySerializer(keyStringRedisSerializer());
template.setHashKeySerializer(keyStringRedisSerializer());
template.setConnectionFactory(factory);
return template;
}

@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setKeySerializer(keyStringRedisSerializer());
template.setHashKeySerializer(keyStringRedisSerializer());
template.setConnectionFactory(factory);
return template;
}
}
RabbitMQ 数据隔离

自动创建影子队列

因为SpringAMQP中的

中的关键方法是私有的,无法通过继承的方式进行实现对以配置好的队列进行扩展,所以需要自定义该类,来实现对自动创建影子队列,并和交换器进行绑定

代码实现

​ 改造RabbitListenerAnnotationBeanPostProcessor类来实现创建MQ影子队列以及将影子Key绑定到影子队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class WormholeRabbitListenerAnnotationBeanPostProcessor extends RabbitListenerAnnotationBeanPostProcessor {

@Resource
private WormholeIsolationConfiguration wormholeIsolationConfiguration;

/**
* routingKey 前置处理器
*
* @param queueName
* @param routingKey
* @return
*/
@Override
public String preProcessingRoutingKey(String queueName, String routingKey) {
//如果是影子队列就将routingKey转换为 影子routingKey
if (wormholeIsolationConfiguration.checkIsolation(queueName) && !wormholeIsolationConfiguration.checkIsolation(routingKey)) {
return wormholeIsolationConfiguration.generateIsolationKey(routingKey);
}
return routingKey;
}

/**
* 处理队列问题,如果来了一个队列就生成一个shadow的队列
*
* @param queues
* @return
*/
@Override
public List<String> handelQueues(List<String> queues) {
List<String> isolationQueues = new ArrayList<>();
if (null != queues && !queues.isEmpty()) {
for (String queue : queues) {
//添加shadow队列
isolationQueues.add(wormholeIsolationConfiguration.generateIsolationKey(queue));
}
queues.addAll(isolationQueues);
}
return queues;
}
}
传递染色标识

​ 因为MQ是异步通讯,为了传递染色标识,会在发送MQ的时候将染色标识传递过来,MQ接收到之后放进当前线程的ThreadLocal里面,这个需要扩展Spring的SimpleRabbitListenerContainerFactory来实现

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class WormholeSimpleRabbitListenerContainerFactory extends SimpleRabbitListenerContainerFactory {

@Override
protected SimpleMessageListenerContainer createContainerInstance() {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setAfterReceivePostProcessors(message -> {
//防止线程复用 销毁ThreadLocal
WormholeContextHolder.invalidContext();
//获取消息属性标识
String wormholeRequestContext = message.getMessageProperties().getHeader(WormholeContextHolder.WORMHOLE_REQUEST_MARK);
if (StringUtils.isNotEmpty(wormholeRequestContext)) {
WormholeContextHolder.setContext(wormholeRequestContext);
}
return message;
});
return simpleMessageListenerContainer;
}
}

发送MQ消息处理

​ 同上,需要传递染色标识,就通过继承RabbitTemplate重写convertAndSend方法来实现传递染色标识。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ShadowRabbitTemplate extends RabbitTemplate {
public ShadowRabbitTemplate(ConnectionFactory connectionFactory) {
super(connectionFactory);
}

@Autowired
private WormholeIsolationConfiguration isolationConfiguration;

@Override
public void send(final String exchange, final String routingKey,
final Message message, @Nullable final CorrelationData correlationData)
throws AmqpException {
WormholeContext wormholeContext = WormholeContextHolder.getContext();
if (null == wormholeContext) {
super.send(exchange, routingKey, message, correlationData);
} else {
message.getMessageProperties().setHeader(WormholeContextHolder.WORMHOLE_REQUEST_MARK, wormholeContext.toString());
//生成Rabbit 隔离Key
String wormholeRoutingKey = isolationConfiguration.generateIsolationKey(routingKey);
//调用父类进行发送
super.send(exchange, wormholeRoutingKey, message, correlationData);
}
}
}

接口隔离方法

Mock 第三方接口

​ 对于第三方数据接口需要进行隔离,比如短信接口,正常的数据需要发送短信,对于压测数据则不能直接调用接口发送短信,并且需要能够识别出来压测数据,并进行MOCK接口调用。

核心类实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Aspect
public class WormholeMockSection {

/**
* 切点 拦截@WormholeMock的注解
*/
@Pointcut("@annotation(com.baiyp.wormhole.component.mock.annotation.WormholeMock)")
public void pointCut() {
}

/**
* 环绕通知
*
* @param point
* @return
* @throws Throwable
*/
@Around("pointCut()")
public Object section(ProceedingJoinPoint point) throws Throwable {
WormholeContext wormholeContext = WormholeContextHolder.getContext();
Object[] parameter = point.getArgs();
//如果没有wormholeContext 就执行正常方法
if (null == wormholeContext) {
return point.proceed(parameter);
}
//如果存在就执行MOCK方法
WormholeMock wormholeMock = WormholeMockUtils.getMethodAnnotation(point, WormholeMock.class);
if (null != wormholeMock) {
//获取到 Mock 回调类
WormholeMockCallback wormholeMockCallback = WormholeMockUtils.getWormholeMockCallback(wormholeMock);
if (null != wormholeMockCallback) {
return wormholeMockCallback.handelMockData(parameter);
}
}
return null;
}

}
使用方式

在具体方法上面加上注解就可以使用了

1
2
3
4
5
6
7
8
9
10
11
12
@Override
//加入注解进行MOCK测试拦截 设置最大耗时
@WormholeMock(maxDelayTime = 10, minDelayTime = 2)
public boolean send(NotifyVO notifyVO) {
logger.info("开始发送短信通知.....");
try {
//模拟发送短信耗时
Thread.sleep(5);
} catch (InterruptedException e) {
}
return true;
}

零侵入方案

如果开发的中间件需要各个微服务大量改造,对开发人员来说就是一个灾难,所以这里采用零侵入的springboot starter 来解决

自动装配

使用微服务得@Conditional来完成配置得自动装配,这里用MVC得配置来演示自动装配,其他得都是类似

这样可以最大限度的优化代码并提高很高的可扩展性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* MVC 自动装配
*/
@Configuration
//当DispatcherServlet存在时该配置类才会被执行到
@ConditionalOnClass(org.springframework.web.servlet.DispatcherServlet.class)
public class WormholeMVCAutoConfiguration {

@ConditionalOnClass
@Bean
public WormholeMVCConfiguration wormholeMVCConfiguration() {
return new WormholeMVCConfiguration();
}
}

Conditional 简介

@Conditional表示仅当所有指定条件都匹配时,组件才有资格注册 。 该@Conditional注释可以在以下任一方式使用:

  • 作为任何@Bean方法的方法级注释
  • 作为任何类的直接或间接注释的类型级别注释 @Component,包括@Configuration类
  • 作为元注释,目的是组成自定义构造型注释
Conditional派生注解

@Conditional派生了很多注解,下面给个表格列举一下派生注解的用法

@Conditional派生注解 作用(都是判断是否符合指定的条件)
@ConditionalOnJava 系统的java版本是否符合要求
@ConditionalOnBean 有指定的Bean类
@ConditionalOnMissingBean 没有指定的bean类
@ConditionalOnExpression 符合指定的SpEL表达式
@ConditionalOnClass 有指定的类
@ConditionalOnMissingClass 没有指定的类
@ConditionalOnSingleCandidate 容器只有一个指定的bean,或者这个bean是首选bean
@ConditionalOnProperty 指定的property属性有指定的值
@ConditionalOnResource 路径下存在指定的资源
@ConditionalOnWebApplication 系统环境是web环境
@ConditionalOnNotWebApplication 系统环境不是web环境
@ConditionalOnjndi JNDI存在指定的项
SpringBoot starter

​ 和自动装配一样,Spring Boot Starter的目的也是简化配置,而Spring Boot Starter解决的是依赖管理配置复杂的问题,有了它,当我需要构建一个Web应用程序时,不必再遍历所有的依赖包,一个一个地添加到项目的依赖管理中,而是只需要一个配置spring-boot-starter-web

使用规范

​ 在 Spring Boot starter 开发规范中,项目中会有一个空的名为 xxx-spring-boot-starter 的项目,这个项目主要靠 pom.xml 将所有需要的依赖引入进来。同时项目还会有一个 xxx-spring-boot-autoconfigure 项目,这个项目主要写带 @Configuration 注解的配置类,在这个类或者类中带 @Bean 的方法上。

项目使用

xxx-spring-boot-starter的项目下的resources文件夹下面新建一个META-INF文件,并在下面创建spring.factories文件,将我们的自动配置类配置进去

1
2
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.baiyp.wormhole.autoconfiguration.WormholeAutoConfiguration

服务监控方案

skywalking简介

​ Skywalking 是一个APM系统,即应用性能监控系统,为微服务架构和云原生架构系统设计。它通过探针自动收集所需的指标,并进行分布式追踪。通过这些调用链路以及指标,Skywalking APM会感知应用间关系和服务间关系,并进行相应的指标统计。目前支持链路追踪和监控应用组件如下,基本涵盖主流框架和容器,如国产PRC Dubbo和motan等,国际化的spring boot,spring cloud都支持了

​ SkyWalking是分布式系统的应用程序性能监视工具,专为微服务、云原生架构和基于容器(Docker、K8S、Mesos)架构而设计

​ SkyWalking是观察性分析平台和应用性能管理系统。提供分布式追踪、服务网格遥测分析、度量聚合和可视化一体化解决方案

SkyWalking组件
  • Skywalking Agent: 采集tracing(调用链数据)和metric(指标)信息并上报,上报通过HTTP或者gRPC方式发送数据到Skywalking Collector

  • Skywalking Collector : 链路数据收集器,对agent传过来的tracingmetric数据进行整合分析通过Analysis Core模块处理并落入相关的数据存储中,同时会通过Query Core模块进行二次统计和监控告警

  • Storage: Skywalking的存储,支持以ElasticSearchMysqlTiDBH2等作为存储介质进行数据存储

  • UI: Web可视化平台,用来展示落地的数据,目前官方采纳了RocketBot作为SkyWalking的主UI

配置SkyWalking
下载SkyWalking

​ 下载SkyWalking的压缩包,解压后将压缩包里面的agent文件夹放进本地磁盘,探针包含整个目录,请不要改变目录结构。

Agent配置

​ 通过了解配置,可以对一个组件功能有一个大致的了解,解压开skywalking的压缩包,在agent/config文件夹中可以看到agent的配置文件,从skywalking支持环境变量配置加载,在启动的时候优先读取环境变量中的相关配置。

skywalking配置名称 描述
agent.namespace 跨进程链路中的header,不同的namespace会导致跨进程的链路中断
agent.service_name 一个服务(项目)的唯一标识,这个字段决定了在sw的UI上的关于service的展示名称
agent.sample_n_per_3_secs 客户端采样率,0或者负数标识禁用,默认-1
agent.authentication 与collector进行通信的安全认证,需要同collector中配置相同
agent.ignore_suffix 忽略特定请求后缀的trace
collecttor.backend_service agent需要同collector进行数据传输的IP和端口
logging.level agent记录日志级别

skywalking agent使用javaagent无侵入式的配合collector实现对分布式系统的追踪和相关数据的上下文传递。

配置探针

配置SpringBoot启动参数,需要填写如下的运行参数,代码放在后面,需要的自己粘贴。

1
2
3
-javaagent:D:/data/skywalking/agent/skywalking-agent.jar
-Dskywalking.agent.service_name=storage-server
-Dskywalking.collector.backend_service=172.18.0.50:11800

  • javaagent:复制的agent目录下探针的jar包路径
  • skywalking.agent.service_name:需要在skywalking显示的服务名称
  • skywalking.collector.backend_service:skywalking服务端地址默认是11800

环境准备

环境服务列表

需要在虚拟机或者linux服务器启动运行环境

服务 ip 端口 备注
mysql 172.18.0.10 3306 数据库服务
rabbitMQ 172.18.0.20 5672,5672 RabbitMQ消息服务
redis 172.18.0.30 6379 Redis缓存服务
nacos 172.18.0.40 8848 微服务注册中心
skywalking 172.18.0.50 1234,11800,12800 链路追踪APM服务端
skywalking-ui 172.18.0.60 8080 链路追踪APM服务UI端

应用服务列表

应用服务可以单独部署或者在idea中启动

服务 ip 端口 备注
order-service 127.0.0.1 8001 订单服务
account-service 127.0.0.1 8002 账户服务
storage-service 127.0.0.1 8003 数据存储服务
notice-service 127.0.0.1 8004 通知服务

docker-compose 编排环境

我们的docker-compose只对环境进行了搭建,具体微服务在本地运行或者在容器运行都可以。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
version: '2'
services:
mysql:
image: mysql:5.7
hostname: mysql
container_name: mysql
networks:
docker-network:
ipv4_address: 172.18.0.10
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: root
volumes:
- "/tmp/etc/mysql:/etc/mysql/conf.d"
- "/tmp/data/mysql:/var/lib/mysql"
rabbitMQ:
image: rabbitmq:management
hostname: rabbitMQ
container_name: rabbitMQ
networks:
docker-network:
ipv4_address: 172.18.0.20
ports:
- "5672:5672"
- "15672:15672"
redis:
image: redis
hostname: redis
container_name: redis
networks:
docker-network:
ipv4_address: 172.18.0.30
ports:
- "6379:6379"
volumes:
- "/tmp/etc/redis/redis.conf:/etc/redis/redis.conf"
- "/tmp/data/redis:/data"
command:
redis-server /etc/redis/redis.conf
nacos:
image: nacos/nacos-server
hostname: nacos
container_name: nacos
depends_on:
- mysql
networks:
docker-network:
ipv4_address: 172.18.0.40
ports:
- "8848:8848"
environment:
MODE: standalone
volumes:
- "/tmp/etc/nacos/application.properties:/home/nacos/conf/application.properties"
skywalking:
image: apache/skywalking-oap-server
hostname: skywalking
container_name: skywalking
networks:
docker-network:
ipv4_address: 172.18.0.50
ports:
- "1234:1234"
- "11800:11800"
- "12800:12800"
skywalkingui:
image: apache/skywalking-ui
hostname: skywalkingui
container_name: skywalkingui
depends_on:
- skywalking
networks:
docker-network:
ipv4_address: 172.18.0.60
environment:
SW_OAP_ADDRESS: 172.18.0.50:12800
ports:
- "8080:8080"
networks:
docker-network:
ipam:
config:
- subnet: 172.18.0.0/16
gateway: 172.18.0.1

初始化数据

  1. 初始化用户数据以及产品数据

  2. 将feign,hystrix,ribbon等统一配置配置到nacos

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    # 配置超时时间
    feign:
    hystrix:
    enabled: true #开启熔断
    httpclient:
    enabled: true
    hystrix:
    threadpool:
    default:
    coreSize: 50
    maxQueueSize: 1500
    queueSizeRejectionThreshold: 1000
    command:
    default:
    execution:
    timeout:
    enabled: true
    isolation:
    thread:
    timeoutInMilliseconds: 60000
    ribbon:
    ConnectTimeout: 10000
    ReadTimeout: 50000

全链路压测测试

jmeter配置

配置好压测数据,并且配置压测线程数1000 进行10轮压测

第一轮压测

链路分析优化

我们找到一个调用时长1S左右的链路,分析发现在存储服务调用时,耗时较长,但是数据库调用耗时并不长,基本说明是存储服务的连接池耗尽导致调用过长。

数据库连接池优化

调整存储服务的连接池,由原来的最大10 改为100

1
2
3
initialSize: 10
minIdle: 20
maxActive: 100

第二轮压测

结果已经由原来的服务内部的耗时 变为了fegin的耗时,这种情况下可以考虑使用fegin的连接池优化或者新增节点

观察消费节点

发现消费速度很慢,产生了大量消息堆积

检查storage-serviceactualPlaceOrder端点信息

发现平均响应时间在200ms左右

检查断点链路/storage/order/actualPlaceOrder

发现是事务提交慢造成的,这个时候就需要优化mysql服务器了

Skywalking 使用

Skywalking 模块栏目

Skywalking web UI 主要包括如下几个大的功能模块:

  • 仪表盘:查看被监控服务的运行状态
  • 拓扑图:以拓扑图的方式展现服务直接的关系,并以此为入口查看相关信息
  • 追踪:以接口列表的方式展现,追踪接口内部调用过程
  • 性能剖析:单独端点进行采样分析,并可查看堆栈信息
  • 告警:触发告警的告警列表,包括实例,请求超时等。
  • 自动刷新:刷新当前数据内容。

9.2 仪表盘

  • 第一栏:不同内容主题的监控面板,应用/数据库/容器等
  • 第二栏:操作,包括编辑/导出当前数据/倒入展示数据/不同服务端点筛选展示
  • 第三栏:不同纬度展示,服务/实例/端点

展示栏

Global全局维度

  • 第一栏:Global、Server、Instance、Endpoint不同展示面板,可以调整内部内容
  • Services load:服务每分钟请求数
  • Slow Services:慢响应服务,单位ms
  • Un-Health services(Apdex):Apdex性能指标,1为满分。
  • Global Response Latency:百分比响应延时,不同百分比的延时时间,单位ms
  • Global Heatmap:服务响应时间热力分布图,根据时间段内不同响应时间的数量显示颜色深度
  • 底部栏:展示数据的时间区间,点击可以调整。
Service服务维度

  • Service Apdex(数字):当前服务的评分
  • Service Apdex(折线图):不同时间的Apdex评分
  • Successful Rate(数字):请求成功率
  • Successful Rate(折线图):不同时间的请求成功率
  • Servce Load(数字):每分钟请求数
  • Servce Load(折线图):不同时间的每分钟请求数
  • Service Avg Response Times:平均响应延时,单位ms
  • Global Response Time Percentile:百分比响应延时
  • Servce Instances Load:每个服务实例的每分钟请求数
  • Show Service Instance:每个服务实例的最大延时
  • Service Instance Successful Rate:每个服务实例的请求成功率
Instance实例维度

  • Service Instance Load:当前实例的每分钟请求数
  • Service Instance Successful Rate:当前实例的请求成功率
  • Service Instance Latency:当前实例的响应延时
  • JVM CPU:jvm占用CPU的百分比
  • JVM Memory:JVM内存占用大小,单位m
  • JVM GC Time:JVM垃圾回收时间,包含YGC和OGC
  • JVM GC Count:JVM垃圾回收次数,包含YGC和OGC
  • CLR XX:类似JVM虚拟机,这里用不上就不做解释了
Endpoint端点(API)维度

  • Endpoint Load in Current Service:每个端点的每分钟请求数
  • Slow Endpoints in Current Service:每个端点的最慢请求时间,单位ms
  • Successful Rate in Current Service:每个端点的请求成功率
  • Endpoint Load:当前端点每个时间段的请求数据
  • Endpoint Avg Response Time:当前端点每个时间段的请求行响应时间
  • Endpoint Response Time Percentile:当前端点每个时间段的响应时间占比
  • Endpoint Successful Rate:当前端点每个时间段的请求成功率

拓扑图

  • 1:选择不同的服务关联拓扑
  • 2:查看单个服务相关内容
  • 3:服务间连接情况
  • 4:分组展示服务拓扑

追踪

  • 左侧:api接口列表,红色-异常请求,蓝色-正常请求
  • 右侧:api追踪列表,api请求连接各端点的先后顺序和时间

性能剖析

  • 服务:需要分析的服务
  • 端点:链路监控中端点的名称,可以再链路追踪中查看端点名称
  • 监控时间:采集数据的开始时间
  • 监控持续时间:监控采集多长时间
  • 起始监控时间:多少秒后进行采集
  • 监控间隔:多少秒采集一次
  • 最大采集数:最大采集多少样本

查看监控结果

评论