抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

Zookeeper分布式锁架构设计

查看源图像

对于分布式锁的实现,zookeeper天然携带的一些特性能够很完美的实现分布式锁。其内部主要是利用znode节点特性和watch机制完成。

znode节点

在zookeeper中节点会分为四类,分别是:

  • 持久节点:一旦创建,则永久存在于zookeeper中,除非手动删除。
  • 持久有序节点:一旦创建,则永久存在于zookeeper中,除非手动删除。同时每个节点都会默认存在节点序号,每个节点的序号都是有序递增的。如demo000001、demo000002…..demo00000N。
  • 临时节点:当节点创建后,一旦服务器重启或宕机,则被自动删除。
  • 临时有序节点:当节点创建后,一旦服务器重启或宕机,则被自动删除。同时每个节点都会默认存在节点序号,每个节点的序号都是有序递增的。如demo000001、demo000002…..demo00000N。

watch监听机制

watch监听机制主要用于监听节点状态变更,用于后续事件触发,假设当B节点监听A节点时,一旦A节点发生修改、删除、子节点列表发生变更等事件,B节点则会收到A节点改变的通知,接着完成其他额外事情。

image-20200616145317638

实现原理

​ 其实现思想是当某个线程要对方法加锁时,首先会在zookeeper中创建一个与当前方法对应的父节点,接着每个要获取当前方法的锁的线程,都会在父节点下创建一个临时有序节点,因为节点序号是递增的,所以后续要获取锁的线程在zookeeper中的序号也是逐次递增的。

​ 根据这个特性,当前序号最小的节点一定是首先要获取锁的线程,因此可以规定序号最小的节点获得锁

单线程流程

每个线程再要获取锁时,可以判断自己的节点序号是否是最小的,如果是则获取到锁。当释放锁时,只需将自己的临时有序节点删除即可。

image-20200616153822312

并发流程

根据上图,在并发下,每个线程都会在对应方法节点下创建属于自己的临时节点,且每个节点都是临时且有序的。

​ 那么zookeeper又是如何有序的将锁分配给不同线程呢? 这里就应用到了watch监听机制。每当添加一个新的临时节点时,其都会基于watcher机制监听着它本身的前一个节点等待前一个节点的通知,当前一个节点删除时,就轮到它来持有锁了。然后依次类推。

image-20200616154613434

优点

  1. zookeeper是基于cp模式,能够保证数据强一致性。
  2. 基于watch机制实现锁释放的自动监听,锁操作性能较好。
  3. 频繁创建节点,对于zk服务器压力较大,吞吐量没有redis强。

原理剖析

低效锁思想

在通过zookeeper实现分布式锁时,有另外一种实现的写法,这种也是非常常见的,但是它的效率并不高,此处可以先对这种实现方式进行探讨。

image-20200617101651487

​ 此种实现方式,只会存在一个锁节点。当创建锁节点时,如果锁节点不存在,则创建成功,代表当前线程获取到锁,如果创建锁节点失败,代表已经有其他线程获取到锁,则该线程会监听锁节点的释放。当锁节点释放后,则继续尝试创建锁节点加锁。

低效锁实现
创建抽象类

在zookeeper_common中创建抽象类AbstractLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public abstract class AbstractLock {

//zookeeper服务器地址
public static final String ZK_SERVER_ADDR="192.168.200.131:2181";

//zookeeper超时时间
public static final int CONNECTION_TIME_OUT=30000;
public static final int SESSION_TIME_OUT=30000;

//创建zk客户端
protected ZkClient zkClient = new ZkClient(ZK_SERVER_ADDR,SESSION_TIME_OUT,CONNECTION_TIME_OUT);

/**
* 获取锁
* @return
*/
public abstract boolean tryLock();

/**
* 等待加锁
*/
public abstract void waitLock();

/**
* 释放锁
*/
public abstract void releaseLock();

public void getLock() {

String threadName = Thread.currentThread().getName();

if (tryLock()) {
System.out.println(threadName+": 获取锁成功");
}else {
System.out.println(threadName+": 获取锁失败,等待中");
//等待锁
waitLock();
getLock();
}
}
}
创建LowLock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class LowLock extends AbstractLock{

private static final String LOCK_NODE_NAME = "/lock_node";

private CountDownLatch countDownLatch;

@Override
public boolean tryLock() {
if (zkClient == null){
return false;
}
try {
zkClient.createEphemeral(LOCK_NODE_NAME);
return true;
} catch (Exception e) {
return false;
}

}

@Override
public void waitLock() {

IZkDataListener zkDataListener = new IZkDataListener() {

//节点被改变时触发
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {

}

//节点被删除时触发
@Override
public void handleDataDeleted(String dataPath) throws Exception {
if (countDownLatch != null){
countDownLatch.countDown();
}
}
};

//注册监听器
zkClient.subscribeDataChanges(LOCK_NODE_NAME,zkDataListener);

//如果锁节点存在,阻塞当前线程
if (zkClient.exists(LOCK_NODE_NAME)){

countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+": 等待获取锁");
} catch (InterruptedException e) {
}
}

//删除监听
zkClient.unsubscribeDataChanges(LOCK_NODE_NAME,zkDataListener);
}

@Override
public void releaseLock() {

zkClient.delete(LOCK_NODE_NAME);
zkClient.close();
System.out.println(Thread.currentThread().getName()+": 释放锁");
}

}
创建测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class LockTest {

public static void main(String[] args) {

//模拟多个10个客户端
for (int i=0;i<10;i++) {
Thread thread = new Thread(new LockRunnable());
thread.start();
}
}

private static class LockRunnable implements Runnable {
@Override
public void run() {

AbstractLock abstractLock = new LowLock();

abstractLock.getLock();

try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}

abstractLock.releaseLock();
}
}
}
测试

经过测试可以发现,当一个线程获取到锁之后,其他线程都会监听这把锁进入到等待状态,一旦持有锁的线程释放锁后,其他线程则都会监听到,并竞争这把锁。

image-20200617111459901

image-20200617111513427

image-20200617111527547

image-20200617111535566

羊群效应

这种方案的低效点就在于,只有一个锁节点,其他线程都会监听同一个锁节点,一旦锁节点释放后,其他线程都会收到通知,然后竞争获取锁节点。

​ 这种大量的通知操作会严重降低zookeeper性能,对于这种由于一个被watch的znode节点的变化,而造成大量的通知操作,叫做羊群效应

image-20200617140946750

高效锁思想

为了避免羊群效应的出现,业界内普遍的解决方案就是,让获取锁的线程产生排队,后一个监听前一个,依次排序。推荐使用这种方式实现分布式锁

image-20200617114530599

按照上述流程会在根节点下为每一个等待获取锁的线程创建一个对应的临时有序节点,序号最小的节点会持有锁,并且后一个节点只监听其前面的一个节点,从而可以让获取锁的过程有序且高效。

image-20200617141413504

实现高效锁
定义HighLock类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class HighLock extends AbstractLock{

private static final String PARENT_NODE_PATH="/high_lock";

//当前节点路径
private String currentNodePath;

//前一个节点的路径
private String preNodePath;

private CountDownLatch countDownLatch;

@Override
public boolean tryLock() {

//判断父节点是否存在
if (!zkClient.exists(PARENT_NODE_PATH)){
//不存在
zkClient.createPersistent(PARENT_NODE_PATH);
}

//创建第一个临时有序子节点
if (currentNodePath == null || "".equals(currentNodePath)){

//根节点下没有节点信息,将当前节点作为第一个子节点,类型:临时有序
currentNodePath = zkClient.createEphemeralSequential(PARENT_NODE_PATH+"/","lock");
}

//不是第一个子节点,获取父节点下所有子节点
List<String> childrenNodeList = zkClient.getChildren(PARENT_NODE_PATH);

//子节点升序排序
Collections.sort(childrenNodeList);

//判断是否加锁成功
if (currentNodePath.equals(PARENT_NODE_PATH+"/"+childrenNodeList.get(0))){
//当前节点是序号最小的节点
return true;
}else {
//当前节点不是序号最小的节点,获取其前面的节点名称,并赋值
int length = PARENT_NODE_PATH.length();
int currentNodeNumber = Collections.binarySearch(childrenNodeList, currentNodePath.substring(length + 1));
preNodePath = PARENT_NODE_PATH+"/"+childrenNodeList.get(currentNodeNumber-1);
}
return false;
}

@Override
public void waitLock() {

IZkDataListener zkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {

}

@Override
public void handleDataDeleted(String dataPath) throws Exception {

if (countDownLatch != null){
countDownLatch.countDown();
}
}
};

//监听前一个节点的改变
zkClient.subscribeDataChanges(preNodePath,zkDataListener);

if (zkClient.exists(preNodePath)){
countDownLatch = new CountDownLatch(1);

try {
countDownLatch.await();
} catch (InterruptedException e) {

}
}

zkClient.unsubscribeDataChanges(preNodePath,zkDataListener);
}

@Override
public void releaseLock() {
zkClient.delete(currentNodePath);
zkClient.close();
}
}
测试

根据结果可以看到,每一个线程都会有自己的节点信息,并且都会有对应的序号。序号最小的节点首先获取到锁,然后依次类推。

案例演示

案例描述

实现使用zk高效锁来进行扣减库存业务

image-20200618144550662

细节说明
生成操作标识

​ 生成操作标识是为了防止feign调用超时出现重试,如果没有操作标识的话,库存服务无法判定是一次操作还是多次操作,通过标识可以用于区分重试时当前是哪次操作。从而避免多次扣减库存情况的出现。

库存检查

库存服务先检查redis再检查Mysql,出于两点考虑:

  • 避免服务间重试时,库存服务无法区分是否为同一个操作,导致相同操作被执行多次。同时缓存结合关系型数据库,可以起到减轻数据库压力的作用。
  • 库存流水表不仅用于区分操作,同时每一次扣减库存时信息都会被记录,可以用于后期的库存信息统计等操作。

总的来说,就是通过操作标识结合zookeeper分布式锁,完成mysql乐观锁的操作,思想上都是相同的。

代码实现

订单控制器

OrderController修改生成订单方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/**
* 生成订单
* @param order
* @return
*/
@Idemptent
@PostMapping("/genOrder")
public String genOrder(@RequestBody Order order) throws InterruptedException {

//新增订单
String orderId = String.valueOf(idWorker.nextId());
order.setId(orderId);
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
orderService.addOrder(order);

//新增订单明细
List<String> goodsIds = JSON.parseArray(order.getGoodsIds(), String.class);
List<OrderDetail> orderDetailList = new ArrayList<>();

for (String goodsId : goodsIds) {
OrderDetail orderDetail = new OrderDetail();
orderDetail.setId(String.valueOf(idWorker.nextId()));
orderDetail.setOrderId(orderId);
orderDetail.setGoodsId(goodsId);
orderDetail.setGoodsPrice(1);
orderDetail.setGoodsNum(1);
orderDetailService.addOrderDetail(orderDetail);

orderDetailList.add(orderDetail);
}

//生成操作标识并存入redis
redisTemplate.opsForValue().set(orderId,"orderId",30,TimeUnit.MINUTES);

//扣减库存
stockFeign.reduceStock(JSON.toJSONString(orderDetailList),orderId);

//数据回查
if (stockFlowFeign.findByFlag(orderId).size() >0){
//操作成功
return "success";
}else {
TimeUnit.SECONDS.sleep(3);
//异步查询处理结果
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
return stockFlowFeign.findByFlag(orderId).size();
},executor);
try {
if (future1.get()>0){
return "success";
}
}catch (Exception e){
throw new RuntimeException("执行有误");
}

TimeUnit.SECONDS.sleep(5);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->{
return stockFlowFeign.findByFlag(orderId).size();
},executor);
try {
if (future2.get()>0){
return "success";
}
}catch (Exception e){
throw new RuntimeException("执行有误");
}

TimeUnit.SECONDS.sleep(10);
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(()->{
return stockFlowFeign.findByFlag(orderId).size();
},executor);
try {
if (future3.get()>0){
return "success";
}
}catch (Exception e){
throw new RuntimeException("执行有误");
}
return "false";
}
}
扣减库存

StockController修改扣减库存方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 扣减库存
* @param orderListValue
* @param flag
* @throws InterruptedException
*/
@PutMapping("/reduceStock/{flag}")
public void reduceStock(@RequestParam String orderListValue, @PathVariable("flag") String flag) throws InterruptedException {

System.out.println("reduce stock");

//redis验重
if (!redisTemplate.delete(flag)){
System.out.println("redis验重 重复操作");
return;
}

//MYSQL验重
int dbResult = stockFlowService.findByFlag(flag).size();
if (dbResult >0){
System.out.println("mysql验重 重复操作");
return;
}

//扣减库存
List<OrderDetail> orderDetailList = JSON.parseArray(orderListValue, OrderDetail.class);
stockService.reduceStock(orderDetailList,flag);

//用于模拟测试服务间重试
//TimeUnit.SECONDS.sleep(6);

}
扣减库存实现

StockServiceImpl中实现扣减库存方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Autowired
private StockMapper stockMapper;

@Autowired
private StockFlowMapper stockFlowMapper;

@Autowired
private IdWorker idWorker;

@Override
@Transactional(rollbackFor = Exception.class)
public Boolean reduceStock(List<OrderDetail> orderDetailList, String flag) {

String methodName = Thread.currentThread().getStackTrace()[1].getMethodName();

AbstractLock zkLock = new HighLock("/"+methodName);

//加锁
try {

zkLock.getLock();

orderDetailList.stream().forEach(orderDetail -> {

//扣减库存
int reduceStockResult = stockMapper.reduceStock(orderDetail.getGoodsId(), orderDetail.getGoodsNum());
if (reduceStockResult != 1){
//扣减库存失败
throw new RuntimeException("扣减库存失败");
}

//新增库存流水
StockFlow stockFlow = new StockFlow();
stockFlow.setId(String.valueOf(idWorker.nextId()));
stockFlow.setFlag(flag);
stockFlow.setGoodsId(orderDetail.getGoodsId());
stockFlow.setNum(orderDetail.getGoodsNum());
stockFlowMapper.insert(stockFlow);
});

return true;
} catch (Exception e) {
e.printStackTrace();
}finally {
//释放锁
zkLock.releaseLock();
}

return false;
}

评论