抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

服务幂等性架构设计

img

防重表实现幂等

对于防止数据重复提交,还有一种解决方案就是通过防重表实现。

​ 防重表的实现思路也非常简单,首先创建一张表作为防重表,同时在该表中建立一个或多个字段的唯一索引作为防重字段,用于保证并发情况下,数据只有一条。在向业务表中插入数据之前先向防重表插入,如果插入失败则表示是重复数据。

image-20200615094631834

为什么不用悲观锁

对于防重表的解决方案,可能有人会说为什么不使用悲观锁,悲观锁在使用的过程中也是会发生死锁的。

​ 悲观锁是通过锁表的方式实现的,假设现在一个用户A访问表A(锁住了表A),然后试图访问表B;

​ 另一个用户B访问表B(锁住了表B),然后试图访问表A。 这时对于用户A来说,由于表B已经被用户B锁住了,所以用户A必须等到用户B释放表B才能访问。

​ 同时对于用户B来说,由于表A已经被用户A锁住了,所以用户B必须等到用户A释放表A才能访问。此时死锁就已经产生了。

唯一主键实现幂等

​ 数据库唯一主键的实现主要是利用数据库中主键唯一约束的特性,一般来说唯一主键比较适用于“插入”时的幂等性,其能保证一张表中只能存在一条带该唯一主键的记录。

​ 使用数据库唯一主键完成幂等性时需要注意的是,该主键一般来说并不是使用数据库中自增主键,而是使用分布式 ID 充当主键,这样才能能保证在分布式环境下 ID 的全局唯一性。

​ 对于一些后台系统,并发量并不高的情况下,对于幂等的实现非常简单,通过这种思想即可完成幂等控制。

适用场景

  • 插入操作
  • 删除操作

使用限制

  • 需要生成全局唯一主键 ID;

主要流程

img

主要流程如下:

  1. 客户端执行创建请求,调用服务端接口。
  2. 服务端执行业务逻辑,生成一个分布式 ID,将该 ID 充当待插入数据的主键,然 后执数据插入操作,运行对应的 SQL 语句。
  3. 服务端将该条数据插入数据库中,如果插入成功则表示没有重复调用接口。如果抛出主键重复异常,则表示数据库中已经存在该条记录,返回错误信息到客户端。

​ 在业务执行前,先判断是否已经操作过,如果没有则执行,否则判断为重复操作。

image-20200612151946430

效果演示

在并发下访问时,因为是基于id进行判断,那id值就必须要保证在多次提交时,需要唯一。访问流程如下:

image-20200612164810882

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
@Transactional(rollbackFor = Exception.class)
public String addOrder(Order order) {

order.setCreateTime(new Date());
order.setUpdateTime(new Date());

//查询
Order orderResult = orderMapper.selectByPrimaryKey(order.getId());

Optional<Order> orderOptional = Optional.ofNullable(orderResult);
if (orderOptional.isPresent()){

return "repeat request";
}

int result = orderMapper.insert(order);
if (result != 1){
return "fail";
}

return "success";
}

​ 对于上述功能实现,在并发下,并不能完成幂等性控制。通过jemeter测试,模拟50个并发,可以发现,插入了重复数据。产生了脏数据。

​ 要解决这个问题,非常简单,在数据库层面添加唯一索引即可,将id设置为唯一索引,也是最容易想到的方式,一旦id出现重复,就会出现异常,避免了脏数据的发生也可以解决永久性幂等。但该方案无法用于分库分表情况,其只适用于单表情况。

乐观锁实现幂等性

数据库乐观锁方案一般只能适用于执行更新操作的过程,我们可以提前在对应的数据表中多添加一个字段,充当当前数据的版本标识。

​ 这样每次对该数据库该表的这条数据执行更新时,都会将该版本标识作为一个条件,值为上次待更新数据中的版本标识的值。

适用操作

  • 更新操作

使用限制

  • 需要数据库对应业务表中添加额外字段

问题抛出

扣减库存数据错误

通过jemeter进行测试,可以发现。当模拟一万并发时,最终的库存数量是错误的。这主要是因为当多线程访问时,一个线程读取到了另外线程未提交的数据造成。

image-20200613155604255

image-20200613155538553

image-20200613155550364

image-20200613155847600

synchronized失效问题

对于现在的问题,暂不考虑秒杀设计、队列请求串行化等,只考虑如何通过锁进行解决,要通过锁解决的话,那最先想到的可能是synchronized

​ 根据synchronized定义,当多线程并发访问时,会对当前加锁的方法产生阻塞,从而保证线程安全,避免脏数据。但是,真的能如预期的一样吗?

1
2
3
4
5
6
7
8
9
10
11
12
@Service
public class StockServiceImpl implements StockService {

@Autowired
private StockMapper stockMapper;

@Override
@Transactional(rollbackFor = Exception.class)
public synchronized int lessInventory(String goodsId, int num) {
return stockMapper.lessInventory(goodsId, num);
}
}

​ 当前已经在在方法上添加了synchronized,对当前方法对象进行了锁定。 通过Jemeter,模拟一万并发对其进行访问。可以发现,仍然出现了脏数据。

image-20200613160145046

事务导致锁失效

该问题的产生原因,就在于在方法上synchronized搭配使用了**@Transactional**。

​ 首先synchronized锁定的是当前方法对象,而**@Transactional**会对当前方法进行AOP增强,动态代理出一个代理对象,在方法执行前开启事务,执行后提交事务。

​ 所以synchronized和**@Transactional其实操作的是两个不同的对象,换句话说就是@Transactional的事务操作并不在synchronized**锁定范围之内。

​ 假设A线程执行完扣减库存方法,会释放锁并提交事务。但A线程释放锁但还没提交事务前,B线程执行扣减库存方法,B线程执行后,和A线程一起提交事务,就出现了线程安全问题,造成脏数据的出现。

乐观锁保证幂等

基于版本号实现

MySQL乐观锁是基于数据库完成分布式锁的一种实现,实现的方式有两种:

  • 基于版本号
  • 基于条件

但是实现思想都是基于MySQL的行锁思想来实现的。

image-20200613161433426

  1. 修改数据表,添加version字段,默认值为0

  2. 修改StockMapper添加基于版本修改数据方法

1
2
@Update("update tb_stock set amount=amount-#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}")
int lessInventoryByVersion(@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version);
  1. 测试模拟一万并发进行数据修改,此时可以发现当前版本号从0变为1,且库存量正确。

image-20200613163451667

image-20200613163524374

基于条件实现

通过版本号控制是一种非常常见的方式,适合于大多数场景。

​ 但现在库存扣减的场景来说,通过版本号控制就是多人并发访问购买时,查询时显示可以购买,但最终只有一个人能成功,这也是不可以的。其实最终只要商品库存不发生超卖就可以。那此时就可以通过条件来进行控制。

  1. 修改StockMapper
1
2
@Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId} and amount-#{num}>=0")
int lessInventoryByVersionOut(@Param("goodsId") String goodsId,@Param("num") int num);
  1. 修改StockController
1
2
3
4
5
6
7
8
9
10
11
12
@PutMapping("/lessInventoryByVersionOut/{goodsId}/{num}")
public String lessInventoryByVersionOut(@PathVariable("goodsId") String goodsId,@PathVariable("num") int num){

int result = stockService.lessInventoryByVersionOut(goodsId, num);
if (result == 1){
System.out.println("购买成功");
return "success";
}

System.out.println("购买失败");
return "fail";
}
  1. 通过jemeter进行测试,可以发现当多人并发扣减库存时,控制住了商品超卖的问题。

乐观锁实现幂等性

在系统中,不光要保证客户端访问的幂等性,同时还要保证服务间幂等。

​ 比较常见的情况,当服务间进行调用时,因为网络抖动等原因出现超时,则很有可能出现数据错误。此时在分布式环境下,就需要通过分布式事务或分布式锁来保证数据的一致性。分布式锁的解决方案中MySQL乐观锁就是其中一种实现。

image-20200615104333586

feign超时重试效果演示

​ 以上图为例,当客户端要生成订单时,可以基于token机制保证生成订单的幂等性,接着订单生成成功后,还会基于feign调用库存服务进行库存扣减,此时则很有可能出现,库存服务执行扣减库存成功,但是当结果返回时,出现网络抖动超时了,那么上游的订单服务则很有可能会发起重试,此时如果不进行扣减库存的幂等性保证的话,则出现扣减库存执行多次。

那可以先来演示当下游服务出现延迟,上游服务基于feign进行重试的效果。

  1. 当前是order调用feign,所以在order中会存在feignConfigure配置类,用于配置超时时间与重试次数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 自定义feign超时时间、重试次数
* 默认超时为10秒,不会进行重试。
*/
@Configuration
public class FeignConfigure {

//超时时间,时间单位毫秒
public static int connectTimeOutMillis = 5000;
public static int readTimeOutMillis = 5000;

@Bean
public Request.Options options() {
return new Request.Options(connectTimeOutMillis, readTimeOutMillis);
}

//自定义重试次数
@Bean
public Retryer feignRetryer(){
Retryer retryer = new Retryer.Default(100, 1000, 4);
return retryer;
}
}
  1. stock服务的StockController中demo方法会延迟六秒。

    通过这种方式模拟超时效果。此时在order中调用stock服务,可以发现,order服务会对stock服务调用四次。

image-20200615182205003

image-20200615182210401

这里就演示了服务间调用超时的效果,当下游服务超时,上游服务会进行重试。

服务调用超时库存多次扣减

根据上述演示,当下游服务超时,上游服务就会进行重试。

​ 那么结合当前的业务场景,当用户下单成功去调用库存服务扣减库存时, 如果库存服务执行扣减库存成功但返回结果超时,则上游订单服务就会重试,再次进行扣减库存,此时就会出现同一订单商品库存被多次扣减。

  1. 在订单服务中生成订单,并调用库存服务扣减库存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Idemptent
@PostMapping("/genOrder")
public String genOrder(@RequestBody Order order){

String orderId = String.valueOf(idWorker.nextId());
order.setId(orderId);
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
int result = orderService.addOrder(order);

if (result != 1){
System.out.println("fail");
return "fail";
}

//生成订单详情信息
List<String> goodsIdArray = JSON.parseArray(order.getGoodsIds(), String.class);

goodsIdArray.stream().forEach(goodsId->{
//插入订单详情
OrderDetail orderDetail = new OrderDetail();
orderDetail.setId(String.valueOf(idWorker.nextId()));
orderDetail.setGoodsId(goodsId);
orderDetail.setOrderId(orderId);
orderDetail.setGoodsName("heima");
orderDetail.setGoodsNum(1);
orderDetail.setGoodsPrice(1);
orderDetailService.addOrderDetail(orderDetail);

//扣减库存(不考虑锁)
stockFeign.reduceStockNoLock(goodsId, orderDetail.getGoodsNum());

});


return "success";
}
  1. 库存服务直接基于商品信息进行库存扣减
1
2
@Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId}")
int reduceStockNoLock(@Param("goodsId") String goodsId,@Param("num") Integer num);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@PutMapping("/reduceStockNoLock/{goodsId}/{num}")
public String reduceStockNoLock(@PathVariable("goodsId") String goodsId,
@PathVariable("num") Integer num) throws InterruptedException {

System.out.println("reduce stock");
int result = stockService.reduceStockNoLock(goodsId, num);

if (result != 1){
return "reduce stock fail";
}

//延迟
TimeUnit.SECONDS.sleep(6000);
return "reduce stock success";
}
  1. 执行生成订单扣减库存,此时可以发现扣减库存方法被执行多次,并且库存数量也被扣减了多次
1
{"totalNum":1,"payMoney":1,"goodsIds":"['1271700536000909313']"}

image-20200615183028169

image-20200615183038434

乐观锁解决服务间重试保证幂等
  1. 修改StockMapper,添加乐观锁控制控制库存
1
2
@Update("update tb_stock set version=version+1,amount=amount-#{num} where goods_id=#{goodsId} and version=#{version} and amount-#{num}>=0")
int reduceStock(@Param("goodsId") String goodsId,@Param("num") Integer num,@Param("version") Integer version);
  1. 修改StockController,添加乐观锁扣减库存方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 乐观锁扣减库存
* @param goodsId
* @param num
* @param version
* @return
*/
@PutMapping("/reduceStock/{goodsId}/{num}/{version}")
public int reduceStock(@PathVariable("goodsId") String goodsId,
@PathVariable("num") Integer num,
@PathVariable("version") Integer version) throws InterruptedException {

System.out.println("exec reduce stock");
int result = stockService.reduceStock(goodsId, num, version);
if (result != 1){
//扣减失败
return result;
}
//延迟
TimeUnit.SECONDS.sleep(6000);
return result;
}
  1. 测试,可以发现虽然发生多次重试,但是库存只会被扣减成功一次。保证了服务间的幂等性。

ps:order服务出现异常,是因为order服务会超时重试四次,但stock服务的延迟每一次都是超过超时时间的,所以最终在order服务才会出现read timeout异常提示。

消息幂等

​ 在系统中当使用消息队列时,无论做哪种技术选型,有很多问题是无论如何也不能忽视的,如:消息必达、消息幂等等。本章节以典型的RabbitMQ为例,讲解如何保证消息幂等的可实施解决方案,其他MQ选型均可参考。

消息重试演示

消息队列的消息幂等性,主要是由MQ重试机制引起的。

​ 因为消息生产者将消息发送到MQ-Server后,MQ-Server会将消息推送到具体的消息消费者。假设由于网络抖动或出现异常时,MQ-Server根据重试机制就会将消息重新向消息消费者推送,造成消息消费者多次收到相同消息,造成数据不一致。

image-20200623180025636

​ 在RabbitMQ中,消息重试机制是默认开启的,但只会在consumer出现异常时,才会重复推送。在使用中,异常的出现有可能是由于消费方又去调用第三方接口,由于网络抖动而造成异常,但是这个异常有可能是暂时的。所以当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。

​ 数据补偿方案:当重试多次后仍然出现异常,则让此条消息进入死信队列,最终进入到数据库中,接着设置定时job查询这些数据,进行手动补偿。

​ 本节中以consumer消费异常为演示主体,因此需要修改RabbitMQ配置文件。

修改配置文件

修改consumer一方的配置文件

1
2
3
4
5
6
7
8
9
10
# 消费者监听相关配置
listener:
simple:
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔时间(毫秒)
initial-interval: 3000
设置消费异常

​ 当consumer消息监听类中添加异常,最终接受消息时,可以发现,消息在接收五次后,最终出现异常。

消息幂等解决

要保证消息幂等性的话,其实最终要解决的就是保证多次操作,造成的影响是相同的。那么其解决方案的思路与服务间幂等的思路其实基本都是一致的。

  1. 消息防重表,解决思路与服务间幂等的防重表一致。

  2. redis:利用redis防重。

这两种方案是最常见的解决方案。其实现思路其实都是一致的。

image-20200624115545723

代码实现

修改OrderController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 此处为了方便演示,不做基础添加数据库操作
* @return
*/
@PostMapping("/addOrder")
public String addOrder(){

String uniqueKey = String.valueOf(idWorker.nextId());

MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(uniqueKey);
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf-8");
Message message = new Message("1271700536000909313".getBytes(),messageProperties);
rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message);

return "success";
}
修改stockApplication
1
2
3
4
@Bean
public JedisPool jedisPool(){
return new JedisPool("192.168.200.150",6379);
}
新增消息监听类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Component
public class ReduceStockListener {

@Autowired
private StockService stockService;

@Autowired
private JedisPool jedisPool;

@Autowired
private StockFlowService stockFlowService;

@RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE)
@Transactional
public void receiveMessage(Message message){

//获取消息id
String messageId = message.getMessageProperties().getMessageId();

Jedis jedis = jedisPool.getResource();

System.out.println(messageId);
try {

//redis锁去重校验
if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))){
System.out.println("重复请求");
return;
}

//mysql状态校验
if (!(stockFlowService.findByFlag(messageId).size() == 0)){
System.out.println("数据已处理");
return;
}

String goodsId = null;
try {
//获取消息体中goodsId
goodsId = new String(message.getBody(),"utf-8");
stockService.reduceStock(goodsId,messageId);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}

int nextInt = new Random().nextInt(100);
System.out.println("随机数:"+nextInt);
if (nextInt%2 ==0){
int i= 1/0;
}


} catch (RuntimeException e) {
//解锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId));
System.out.println("出现异常了");
System.out.println(messageId+":释放锁");
throw e;
}
}
}

消息缓冲区

​ 对于RabbitMQ的使用,默认情况下,每条消息都会进行分别的ack通知,消费完一条后,再来消费下一条。但是这样就会造成大量消息的阻塞情况。所以为了提升消费者对于消息的消费速度,可以增加consumer数据或者对消息进行批量消费。MQ接收到producer发送的消息后,不会直接推送给consumer。而是积攒到一定数量后,再进行消息的发送。 这种方式的实现,可以理解为是一种缓冲区的实现,提升了消息的消费速度,但是会在一定程度上舍弃结果返回的实时性。

​ 对于批量消费来说,也是需要考虑幂等的。对于幂等性的解决方案,沿用刚才的思路即可解决。

评论