服务幂等性架构设计
防重表实现幂等
对于防止数据重复提交,还有一种解决方案就是通过防重表实现。
防重表的实现思路也非常简单,首先创建一张表作为防重表,同时在该表中建立一个或多个字段的唯一索引作为防重字段,用于保证并发情况下,数据只有一条。在向业务表中插入数据之前先向防重表插入,如果插入失败则表示是重复数据。
为什么不用悲观锁
对于防重表的解决方案,可能有人会说为什么不使用悲观锁,悲观锁在使用的过程中也是会发生死锁的。
悲观锁是通过锁表的方式实现的,假设现在一个用户A访问表A(锁住了表A),然后试图访问表B;
另一个用户B访问表B(锁住了表B),然后试图访问表A。 这时对于用户A来说,由于表B已经被用户B锁住了,所以用户A必须等到用户B释放表B才能访问。
同时对于用户B来说,由于表A已经被用户A锁住了,所以用户B必须等到用户A释放表A才能访问。此时死锁就已经产生了。
唯一主键实现幂等 数据库唯一主键 的实现主要是利用数据库中主键唯一约束的特性,一般来说唯一主键比较适用于“插入”时的幂等性,其能保证一张表中只能存在一条带该唯一主键的记录。
使用数据库唯一主键完成幂等性时需要注意的是,该主键一般来说并不是使用数据库中自增主键,而是使用分布式 ID 充当主键,这样才能能保证在分布式环境下 ID 的全局唯一性。
对于一些后台系统,并发量并不高的情况下,对于幂等的实现非常简单,通过这种思想即可完成幂等控制。
适用场景
使用限制
主要流程
主要流程如下:
客户端执行创建请求,调用服务端接口。
服务端执行业务逻辑,生成一个分布式 ID
,将该 ID 充当待插入数据的主键,然 后执数据插入操作,运行对应的 SQL
语句。
服务端将该条数据插入数据库中,如果插入成功则表示没有重复调用接口。如果抛出主键重复异常,则表示数据库中已经存在该条记录,返回错误信息到客户端。
在业务执行前,先判断是否已经操作过,如果没有则执行,否则判断为重复操作。
效果演示
在并发下访问时,因为是基于id进行判断,那id值就必须要保证在多次提交时,需要唯一。访问流程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override @Transactional(rollbackFor = Exception.class) public String addOrder (Order order) { order.setCreateTime(new Date ()); order.setUpdateTime(new Date ()); Order orderResult = orderMapper.selectByPrimaryKey(order.getId()); Optional<Order> orderOptional = Optional.ofNullable(orderResult); if (orderOptional.isPresent()){ return "repeat request" ; } int result = orderMapper.insert(order); if (result != 1 ){ return "fail" ; } return "success" ; }
对于上述功能实现,在并发下,并不能完成幂等性控制。通过jemeter测试,模拟50个并发,可以发现,插入了重复数据。产生了脏数据。
要解决这个问题,非常简单,在数据库层面添加唯一索引 即可,将id设置为唯一索引,也是最容易想到的方式,一旦id出现重复,就会出现异常,避免了脏数据的发生也可以解决永久性幂等。但该方案无法用于分库分表情况,其只适用于单表情况。
乐观锁实现幂等性
数据库乐观锁方案一般只能适用于执行更新操作 的过程,我们可以提前在对应的数据表中多添加一个字段,充当当前数据的版本标识。
这样每次对该数据库该表的这条数据执行更新时,都会将该版本标识作为一个条件,值为上次待更新数据中的版本标识的值。
适用操作
使用限制
问题抛出 扣减库存数据错误
通过jemeter进行测试,可以发现。当模拟一万并发时,最终的库存数量是错误的。这主要是因为当多线程访问时,一个线程读取到了另外线程未提交的数据造成。
synchronized失效问题
对于现在的问题,暂不考虑秒杀设计、队列请求串行化等,只考虑如何通过锁进行解决,要通过锁解决的话,那最先想到的可能是synchronized 。
根据synchronized 定义,当多线程并发访问时,会对当前加锁的方法产生阻塞,从而保证线程安全,避免脏数据。但是,真的能如预期的一样吗?
1 2 3 4 5 6 7 8 9 10 11 12 @Service public class StockServiceImpl implements StockService { @Autowired private StockMapper stockMapper; @Override @Transactional(rollbackFor = Exception.class) public synchronized int lessInventory (String goodsId, int num) { return stockMapper.lessInventory(goodsId, num); } }
当前已经在在方法上添加了synchronized,对当前方法对象进行了锁定。 通过Jemeter,模拟一万并发对其进行访问。可以发现,仍然出现了脏数据。
事务导致锁失效
该问题的产生原因,就在于在方法上synchronized 搭配使用了**@Transactional**。
首先synchronized 锁定的是当前方法对象,而**@Transactional**会对当前方法进行AOP增强,动态代理出一个代理对象,在方法执行前开启事务,执行后提交事务。
所以synchronized 和**@Transactional其实操作的是两个不同的对象,换句话说就是 @Transactional的事务操作并不在 synchronized**锁定范围之内。
假设A线程执行完扣减库存方法,会释放锁并提交事务。但A线程释放锁但还没提交事务前,B线程执行扣减库存方法,B线程执行后,和A线程一起提交事务,就出现了线程安全问题,造成脏数据的出现。
乐观锁保证幂等 基于版本号实现
MySQL乐观锁是基于数据库完成分布式锁的一种实现,实现的方式有两种:
但是实现思想都是基于MySQL的行锁思想来实现的。
修改数据表,添加version字段,默认值为0
修改StockMapper添加基于版本修改数据方法
1 2 @Update("update tb_stock set amount=amount-#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}") int lessInventoryByVersion (@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version) ;
测试模拟一万并发进行数据修改,此时可以发现当前版本号从0变为1,且库存量正确。
基于条件实现
通过版本号控制是一种非常常见的方式,适合于大多数场景。
但现在库存扣减的场景来说,通过版本号控制就是多人并发访问购买时,查询时显示可以购买,但最终只有一个人能成功,这也是不可以的。其实最终只要商品库存不发生超卖就可以。那此时就可以通过条件来进行控制。
修改StockMapper :
1 2 @Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId} and amount-#{num}>=0") int lessInventoryByVersionOut (@Param("goodsId") String goodsId,@Param("num") int num) ;
修改StockController :
1 2 3 4 5 6 7 8 9 10 11 12 @PutMapping("/lessInventoryByVersionOut/{goodsId}/{num}") public String lessInventoryByVersionOut (@PathVariable("goodsId") String goodsId,@PathVariable("num") int num) { int result = stockService.lessInventoryByVersionOut(goodsId, num); if (result == 1 ){ System.out.println("购买成功" ); return "success" ; } System.out.println("购买失败" ); return "fail" ; }
通过jemeter进行测试,可以发现当多人并发扣减库存时,控制住了商品超卖的问题。
乐观锁实现幂等性
在系统中,不光要保证客户端访问的幂等性,同时还要保证服务间幂等。
比较常见的情况,当服务间进行调用时,因为网络抖动等原因出现超时,则很有可能出现数据错误。此时在分布式环境下,就需要通过分布式事务或分布式锁来保证数据的一致性。分布式锁的解决方案中MySQL乐观锁就是其中一种实现。
feign超时重试效果演示 以上图为例,当客户端要生成订单时,可以基于token机制保证生成订单的幂等性,接着订单生成成功后,还会基于feign调用库存服务进行库存扣减,此时则很有可能出现,库存服务执行扣减库存成功,但是当结果返回时,出现网络抖动超时了,那么上游的订单服务则很有可能会发起重试,此时如果不进行扣减库存的幂等性保证的话,则出现扣减库存执行多次。
那可以先来演示当下游服务出现延迟,上游服务基于feign进行重试的效果。
当前是order调用feign,所以在order中会存在feignConfigure配置类,用于配置超时时间与重试次数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Configuration public class FeignConfigure { public static int connectTimeOutMillis = 5000 ; public static int readTimeOutMillis = 5000 ; @Bean public Request.Options options () { return new Request .Options(connectTimeOutMillis, readTimeOutMillis); } @Bean public Retryer feignRetryer () { Retryer retryer = new Retryer .Default(100 , 1000 , 4 ); return retryer; } }
stock服务的StockController中demo方法会延迟六秒。
通过这种方式模拟超时效果。此时在order中调用stock服务,可以发现,order服务会对stock服务调用四次。
这里就演示了服务间调用超时的效果,当下游服务超时,上游服务会进行重试。
服务调用超时库存多次扣减
根据上述演示,当下游服务超时,上游服务就会进行重试。
那么结合当前的业务场景,当用户下单成功去调用库存服务扣减库存时, 如果库存服务执行扣减库存成功但返回结果超时,则上游订单服务就会重试,再次进行扣减库存,此时就会出现同一订单商品库存被多次扣减。
在订单服务中生成订单,并调用库存服务扣减库存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Idemptent @PostMapping("/genOrder") public String genOrder (@RequestBody Order order) { String orderId = String.valueOf(idWorker.nextId()); order.setId(orderId); order.setCreateTime(new Date ()); order.setUpdateTime(new Date ()); int result = orderService.addOrder(order); if (result != 1 ){ System.out.println("fail" ); return "fail" ; } List<String> goodsIdArray = JSON.parseArray(order.getGoodsIds(), String.class); goodsIdArray.stream().forEach(goodsId->{ OrderDetail orderDetail = new OrderDetail (); orderDetail.setId(String.valueOf(idWorker.nextId())); orderDetail.setGoodsId(goodsId); orderDetail.setOrderId(orderId); orderDetail.setGoodsName("heima" ); orderDetail.setGoodsNum(1 ); orderDetail.setGoodsPrice(1 ); orderDetailService.addOrderDetail(orderDetail); stockFeign.reduceStockNoLock(goodsId, orderDetail.getGoodsNum()); }); return "success" ; }
库存服务直接基于商品信息进行库存扣减
1 2 @Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId}") int reduceStockNoLock (@Param("goodsId") String goodsId,@Param("num") Integer num) ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @PutMapping("/reduceStockNoLock/{goodsId}/{num}") public String reduceStockNoLock (@PathVariable("goodsId") String goodsId, @PathVariable("num") Integer num) throws InterruptedException { System.out.println("reduce stock" ); int result = stockService.reduceStockNoLock(goodsId, num); if (result != 1 ){ return "reduce stock fail" ; } TimeUnit.SECONDS.sleep(6000 ); return "reduce stock success" ; }
执行生成订单扣减库存,此时可以发现扣减库存方法被执行多次,并且库存数量也被扣减了多次
1 {"totalNum":1,"payMoney":1,"goodsIds":"['1271700536000909313']"}
乐观锁解决服务间重试保证幂等
修改StockMapper,添加乐观锁控制控制库存
1 2 @Update("update tb_stock set version=version+1,amount=amount-#{num} where goods_id=#{goodsId} and version=#{version} and amount-#{num}>=0") int reduceStock (@Param("goodsId") String goodsId,@Param("num") Integer num,@Param("version") Integer version) ;
修改StockController,添加乐观锁扣减库存方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @PutMapping("/reduceStock/{goodsId}/{num}/{version}") public int reduceStock (@PathVariable("goodsId") String goodsId, @PathVariable("num") Integer num, @PathVariable("version") Integer version) throws InterruptedException { System.out.println("exec reduce stock" ); int result = stockService.reduceStock(goodsId, num, version); if (result != 1 ){ return result; } TimeUnit.SECONDS.sleep(6000 ); return result; }
测试,可以发现虽然发生多次重试,但是库存只会被扣减成功一次。保证了服务间的幂等性。
ps:order服务出现异常,是因为order服务会超时重试四次,但stock服务的延迟每一次都是超过超时时间的,所以最终在order服务才会出现read timeout异常提示。
消息幂等 在系统中当使用消息队列时,无论做哪种技术选型,有很多问题是无论如何也不能忽视的,如:消息必达、消息幂等等。本章节以典型的RabbitMQ为例,讲解如何保证消息幂等的可实施解决方案,其他MQ选型均可参考。
消息重试演示
消息队列的消息幂等性,主要是由MQ重试机制引起的。
因为消息生产者将消息发送到MQ-Server后,MQ-Server会将消息推送到具体的消息消费者。假设由于网络抖动或出现异常时,MQ-Server根据重试机制就会将消息重新向消息消费者推送,造成消息消费者多次收到相同消息,造成数据不一致。
在RabbitMQ中,消息重试机制是默认开启的,但只会在consumer出现异常时,才会重复推送。在使用中,异常的出现有可能是由于消费方又去调用第三方接口,由于网络抖动而造成异常,但是这个异常有可能是暂时的。所以当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。
数据补偿方案:当重试多次后仍然出现异常,则让此条消息进入死信队列,最终进入到数据库中,接着设置定时job查询这些数据,进行手动补偿。
本节中以consumer消费异常为演示主体,因此需要修改RabbitMQ配置文件。
修改配置文件
修改consumer一方的配置文件
1 2 3 4 5 6 7 8 9 10 listener: simple: retry: enabled: true max-attempts: 5 initial-interval: 3000
设置消费异常 当consumer消息监听类中添加异常,最终接受消息时,可以发现,消息在接收五次后,最终出现异常。
消息幂等解决
要保证消息幂等性的话,其实最终要解决的就是保证多次操作,造成的影响是相同的。那么其解决方案的思路与服务间幂等的思路其实基本都是一致的。
消息防重表,解决思路与服务间幂等的防重表一致。
redis:利用redis防重。
这两种方案是最常见的解决方案。其实现思路其实都是一致的。
代码实现 修改OrderController 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @PostMapping("/addOrder") public String addOrder () { String uniqueKey = String.valueOf(idWorker.nextId()); MessageProperties messageProperties = new MessageProperties (); messageProperties.setMessageId(uniqueKey); messageProperties.setContentType("text/plain" ); messageProperties.setContentEncoding("utf-8" ); Message message = new Message ("1271700536000909313" .getBytes(),messageProperties); rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message); return "success" ; }
修改stockApplication 1 2 3 4 @Bean public JedisPool jedisPool () { return new JedisPool ("192.168.200.150" ,6379 ); }
新增消息监听类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 @Component public class ReduceStockListener { @Autowired private StockService stockService; @Autowired private JedisPool jedisPool; @Autowired private StockFlowService stockFlowService; @RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE) @Transactional public void receiveMessage (Message message) { String messageId = message.getMessageProperties().getMessageId(); Jedis jedis = jedisPool.getResource(); System.out.println(messageId); try { if (!"OK" .equals(jedis.set(messageId, messageId, "NX" , "PX" , 300000 ))){ System.out.println("重复请求" ); return ; } if (!(stockFlowService.findByFlag(messageId).size() == 0 )){ System.out.println("数据已处理" ); return ; } String goodsId = null ; try { goodsId = new String (message.getBody(),"utf-8" ); stockService.reduceStock(goodsId,messageId); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } int nextInt = new Random ().nextInt(100 ); System.out.println("随机数:" +nextInt); if (nextInt%2 ==0 ){ int i= 1 /0 ; } } catch (RuntimeException e) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" ; jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId)); System.out.println("出现异常了" ); System.out.println(messageId+":释放锁" ); throw e; } } }
消息缓冲区 对于RabbitMQ的使用,默认情况下,每条消息都会进行分别的ack通知,消费完一条后,再来消费下一条。但是这样就会造成大量消息的阻塞情况。所以为了提升消费者对于消息的消费速度,可以增加consumer数据或者对消息进行批量消费。MQ接收到producer发送的消息后,不会直接推送给consumer。而是积攒到一定数量后,再进行消息的发送。 这种方式的实现,可以理解为是一种缓冲区的实现,提升了消息的消费速度,但是会在一定程度上舍弃结果返回的实时性。
对于批量消费来说,也是需要考虑幂等的。对于幂等性的解决方案,沿用刚才的思路即可解决。