Zookeeper分布式锁架构设计
对于分布式锁的实现,zookeeper天然携带的一些特性能够很完美的实现分布式锁。其内部主要是利用znode节点特性和watch机制完成。
znode节点
在zookeeper中节点会分为四类,分别是:
- 持久节点:一旦创建,则永久存在于zookeeper中,除非手动删除。
- 持久有序节点:一旦创建,则永久存在于zookeeper中,除非手动删除。同时每个节点都会默认存在节点序号,每个节点的序号都是有序递增的。如demo000001、demo000002…..demo00000N。
- 临时节点:当节点创建后,一旦服务器重启或宕机,则被自动删除。
- 临时有序节点:当节点创建后,一旦服务器重启或宕机,则被自动删除。同时每个节点都会默认存在节点序号,每个节点的序号都是有序递增的。如demo000001、demo000002…..demo00000N。
watch监听机制
watch监听机制主要用于监听节点状态变更,用于后续事件触发,假设当B节点监听A节点时,一旦A节点发生修改、删除、子节点列表发生变更等事件,B节点则会收到A节点改变的通知,接着完成其他额外事情。
实现原理
其实现思想是当某个线程要对方法加锁时,首先会在zookeeper中创建一个与当前方法对应的父节点,接着每个要获取当前方法的锁的线程,都会在父节点下创建一个临时有序节点,因为节点序号是递增的,所以后续要获取锁的线程在zookeeper中的序号也是逐次递增的。
根据这个特性,当前序号最小的节点一定是首先要获取锁的线程,因此可以规定序号最小的节点获得锁。
单线程流程
每个线程再要获取锁时,可以判断自己的节点序号是否是最小的,如果是则获取到锁。当释放锁时,只需将自己的临时有序节点删除即可。
并发流程
根据上图,在并发下,每个线程都会在对应方法节点下创建属于自己的临时节点,且每个节点都是临时且有序的。
那么zookeeper又是如何有序的将锁分配给不同线程呢? 这里就应用到了watch监听机制。每当添加一个新的临时节点时,其都会基于watcher机制监听着它本身的前一个节点等待前一个节点的通知,当前一个节点删除时,就轮到它来持有锁了。然后依次类推。
优点
- zookeeper是基于cp模式,能够保证数据强一致性。
- 基于watch机制实现锁释放的自动监听,锁操作性能较好。
- 频繁创建节点,对于zk服务器压力较大,吞吐量没有redis强。
原理剖析
低效锁思想
在通过zookeeper实现分布式锁时,有另外一种实现的写法,这种也是非常常见的,但是它的效率并不高,此处可以先对这种实现方式进行探讨。
此种实现方式,只会存在一个锁节点。当创建锁节点时,如果锁节点不存在,则创建成功,代表当前线程获取到锁,如果创建锁节点失败,代表已经有其他线程获取到锁,则该线程会监听锁节点的释放。当锁节点释放后,则继续尝试创建锁节点加锁。
低效锁实现
创建抽象类
在zookeeper_common中创建抽象类AbstractLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public abstract class AbstractLock {
public static final String ZK_SERVER_ADDR="192.168.200.131:2181";
public static final int CONNECTION_TIME_OUT=30000; public static final int SESSION_TIME_OUT=30000;
protected ZkClient zkClient = new ZkClient(ZK_SERVER_ADDR,SESSION_TIME_OUT,CONNECTION_TIME_OUT);
public abstract boolean tryLock();
public abstract void waitLock();
public abstract void releaseLock();
public void getLock() {
String threadName = Thread.currentThread().getName();
if (tryLock()) { System.out.println(threadName+": 获取锁成功"); }else { System.out.println(threadName+": 获取锁失败,等待中"); waitLock(); getLock(); } } }
|
创建LowLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| public class LowLock extends AbstractLock{
private static final String LOCK_NODE_NAME = "/lock_node";
private CountDownLatch countDownLatch;
@Override public boolean tryLock() { if (zkClient == null){ return false; } try { zkClient.createEphemeral(LOCK_NODE_NAME); return true; } catch (Exception e) { return false; }
}
@Override public void waitLock() {
IZkDataListener zkDataListener = new IZkDataListener() {
@Override public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override public void handleDataDeleted(String dataPath) throws Exception { if (countDownLatch != null){ countDownLatch.countDown(); } } };
zkClient.subscribeDataChanges(LOCK_NODE_NAME,zkDataListener);
if (zkClient.exists(LOCK_NODE_NAME)){
countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); System.out.println(Thread.currentThread().getName()+": 等待获取锁"); } catch (InterruptedException e) { } }
zkClient.unsubscribeDataChanges(LOCK_NODE_NAME,zkDataListener); }
@Override public void releaseLock() {
zkClient.delete(LOCK_NODE_NAME); zkClient.close(); System.out.println(Thread.currentThread().getName()+": 释放锁"); }
}
|
创建测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class LockTest {
public static void main(String[] args) {
for (int i=0;i<10;i++) { Thread thread = new Thread(new LockRunnable()); thread.start(); } }
private static class LockRunnable implements Runnable { @Override public void run() {
AbstractLock abstractLock = new LowLock();
abstractLock.getLock();
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
abstractLock.releaseLock(); } } }
|
测试
经过测试可以发现,当一个线程获取到锁之后,其他线程都会监听这把锁进入到等待状态,一旦持有锁的线程释放锁后,其他线程则都会监听到,并竞争这把锁。
羊群效应
这种方案的低效点就在于,只有一个锁节点,其他线程都会监听同一个锁节点,一旦锁节点释放后,其他线程都会收到通知,然后竞争获取锁节点。
这种大量的通知操作会严重降低zookeeper性能,对于这种由于一个被watch的znode节点的变化,而造成大量的通知操作,叫做羊群效应。
高效锁思想
为了避免羊群效应的出现,业界内普遍的解决方案就是,让获取锁的线程产生排队,后一个监听前一个,依次排序。推荐使用这种方式实现分布式锁
按照上述流程会在根节点下为每一个等待获取锁的线程创建一个对应的临时有序节点,序号最小的节点会持有锁,并且后一个节点只监听其前面的一个节点,从而可以让获取锁的过程有序且高效。
实现高效锁
定义HighLock类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| public class HighLock extends AbstractLock{
private static final String PARENT_NODE_PATH="/high_lock";
private String currentNodePath;
private String preNodePath;
private CountDownLatch countDownLatch;
@Override public boolean tryLock() {
if (!zkClient.exists(PARENT_NODE_PATH)){ zkClient.createPersistent(PARENT_NODE_PATH); }
if (currentNodePath == null || "".equals(currentNodePath)){
currentNodePath = zkClient.createEphemeralSequential(PARENT_NODE_PATH+"/","lock"); }
List<String> childrenNodeList = zkClient.getChildren(PARENT_NODE_PATH);
Collections.sort(childrenNodeList);
if (currentNodePath.equals(PARENT_NODE_PATH+"/"+childrenNodeList.get(0))){ return true; }else { int length = PARENT_NODE_PATH.length(); int currentNodeNumber = Collections.binarySearch(childrenNodeList, currentNodePath.substring(length + 1)); preNodePath = PARENT_NODE_PATH+"/"+childrenNodeList.get(currentNodeNumber-1); } return false; }
@Override public void waitLock() {
IZkDataListener zkDataListener = new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override public void handleDataDeleted(String dataPath) throws Exception {
if (countDownLatch != null){ countDownLatch.countDown(); } } };
zkClient.subscribeDataChanges(preNodePath,zkDataListener);
if (zkClient.exists(preNodePath)){ countDownLatch = new CountDownLatch(1);
try { countDownLatch.await(); } catch (InterruptedException e) {
} }
zkClient.unsubscribeDataChanges(preNodePath,zkDataListener); }
@Override public void releaseLock() { zkClient.delete(currentNodePath); zkClient.close(); } }
|
测试
根据结果可以看到,每一个线程都会有自己的节点信息,并且都会有对应的序号。序号最小的节点首先获取到锁,然后依次类推。
案例演示
案例描述
实现使用zk高效锁来进行扣减库存业务
细节说明
生成操作标识
生成操作标识是为了防止feign调用超时出现重试,如果没有操作标识的话,库存服务无法判定是一次操作还是多次操作,通过标识可以用于区分重试时当前是哪次操作。从而避免多次扣减库存情况的出现。
库存检查
库存服务先检查redis再检查Mysql,出于两点考虑:
- 避免服务间重试时,库存服务无法区分是否为同一个操作,导致相同操作被执行多次。同时缓存结合关系型数据库,可以起到减轻数据库压力的作用。
- 库存流水表不仅用于区分操作,同时每一次扣减库存时信息都会被记录,可以用于后期的库存信息统计等操作。
总的来说,就是通过操作标识结合zookeeper分布式锁,完成mysql乐观锁的操作,思想上都是相同的。
代码实现
订单控制器
OrderController修改生成订单方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
|
@Idemptent @PostMapping("/genOrder") public String genOrder(@RequestBody Order order) throws InterruptedException {
String orderId = String.valueOf(idWorker.nextId()); order.setId(orderId); order.setCreateTime(new Date()); order.setUpdateTime(new Date()); orderService.addOrder(order);
List<String> goodsIds = JSON.parseArray(order.getGoodsIds(), String.class); List<OrderDetail> orderDetailList = new ArrayList<>();
for (String goodsId : goodsIds) { OrderDetail orderDetail = new OrderDetail(); orderDetail.setId(String.valueOf(idWorker.nextId())); orderDetail.setOrderId(orderId); orderDetail.setGoodsId(goodsId); orderDetail.setGoodsPrice(1); orderDetail.setGoodsNum(1); orderDetailService.addOrderDetail(orderDetail);
orderDetailList.add(orderDetail); }
redisTemplate.opsForValue().set(orderId,"orderId",30,TimeUnit.MINUTES);
stockFeign.reduceStock(JSON.toJSONString(orderDetailList),orderId);
if (stockFlowFeign.findByFlag(orderId).size() >0){ return "success"; }else { TimeUnit.SECONDS.sleep(3); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{ return stockFlowFeign.findByFlag(orderId).size(); },executor); try { if (future1.get()>0){ return "success"; } }catch (Exception e){ throw new RuntimeException("执行有误"); }
TimeUnit.SECONDS.sleep(5); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->{ return stockFlowFeign.findByFlag(orderId).size(); },executor); try { if (future2.get()>0){ return "success"; } }catch (Exception e){ throw new RuntimeException("执行有误"); }
TimeUnit.SECONDS.sleep(10); CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(()->{ return stockFlowFeign.findByFlag(orderId).size(); },executor); try { if (future3.get()>0){ return "success"; } }catch (Exception e){ throw new RuntimeException("执行有误"); } return "false"; } }
|
扣减库存
StockController修改扣减库存方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
@PutMapping("/reduceStock/{flag}") public void reduceStock(@RequestParam String orderListValue, @PathVariable("flag") String flag) throws InterruptedException {
System.out.println("reduce stock");
if (!redisTemplate.delete(flag)){ System.out.println("redis验重 重复操作"); return; }
int dbResult = stockFlowService.findByFlag(flag).size(); if (dbResult >0){ System.out.println("mysql验重 重复操作"); return; }
List<OrderDetail> orderDetailList = JSON.parseArray(orderListValue, OrderDetail.class); stockService.reduceStock(orderDetailList,flag);
}
|
扣减库存实现
StockServiceImpl中实现扣减库存方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Autowired private StockMapper stockMapper;
@Autowired private StockFlowMapper stockFlowMapper;
@Autowired private IdWorker idWorker;
@Override @Transactional(rollbackFor = Exception.class) public Boolean reduceStock(List<OrderDetail> orderDetailList, String flag) {
String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();
AbstractLock zkLock = new HighLock("/"+methodName);
try {
zkLock.getLock();
orderDetailList.stream().forEach(orderDetail -> {
int reduceStockResult = stockMapper.reduceStock(orderDetail.getGoodsId(), orderDetail.getGoodsNum()); if (reduceStockResult != 1){ throw new RuntimeException("扣减库存失败"); }
StockFlow stockFlow = new StockFlow(); stockFlow.setId(String.valueOf(idWorker.nextId())); stockFlow.setFlag(flag); stockFlow.setGoodsId(orderDetail.getGoodsId()); stockFlow.setNum(orderDetail.getGoodsNum()); stockFlowMapper.insert(stockFlow); });
return true; } catch (Exception e) { e.printStackTrace(); }finally { zkLock.releaseLock(); }
return false; }
|