java原子操作CAS
无锁的概念 在谈论无锁概念时,总会关联起乐观派与悲观派,对于乐观派而言,他们认为事情总会往好的方向发展,总是认为坏的情况发生的概率特别小,可以无所顾忌地做事,但对于悲观派而已,他们总会认为发展事态如果不及时控制,以后就无法挽回了,即使无法挽回的局面几乎不可能发生。这两种派系映射到并发编程中就如同加锁与无锁的策略,即加锁是一种悲观策略,无锁是一种乐观策略,因为对于加锁的并发程序来说,它们总是认为每次访问共享资源时总会发生冲突,因此必须对每一次数据操作实施加锁策略。而无锁则总是假设对共享资源的访问没有冲突,线程可以不停执行,无需加锁,无需等待,一旦发现冲突,无锁策略则采用一种称为CAS的技术来保证线程执行的安全性,这项CAS技术就是无锁策略实现的关键,下面我们进一步了解CAS技术的奇妙之处。
什么是CAS
CAS(Compare and Swap),即比较并替换,是用于实现多线程同步的原子指令。
执行函数:CAS(V,E,N)
其包含3个参数
假定有两个操作A和B,如果从执行A的线程来看,当另一个线程执行B时,要么将B全部执行完,要么完全不执行B,那么A和B对彼此来说是原子的。
实现原子操作可以使用锁,锁机制,满足基本的需求是没有问题的了,但是有的时候我们的需求并非这么简单,我们需要更有效,更加灵活的机制,synchronized 关键字是基于阻塞的锁机制,也就是说当一个线程拥有锁的时候,访问同一资源的其它线程需要等待,直到该线程释放锁,
这里会有些问题:首先,如果被阻塞的线程优先级很高很重要怎么办?其次,如果获得锁的线程一直不释放锁怎么办?(这种情况是非常糟糕的)。还有一种情况,如果有大量的线程来竞争资源,那CPU将会花费大量的时间和资源来处理这些竞争,同时,还有可能出现一些例如死锁之类的情况,最后,其实锁机制是一种比较粗糙,粒度比较大的机制,相对于像计数器这样的需求有点儿过于笨重。
实现原子操作还可以使用当前的处理器 基本都支持CAS的指令,只不过每个厂家所实现的算法并不一样,每一个CAS操作过程都包含三个运算符:一个内存地址V,一个期望的值A和一个新值B,操作的时候如果这个地址上存放的值等于这个期望的值A,则将地址上的值赋为新值B,否则不做任何操作。
CAS的基本思路就是,如果这个地址上的值和期望的值相等,则给其赋予新值,否则不做任何事儿,但是要返回原值是多少。循环CAS就是在一个循环里不断的做cas操作,直到成功为止。
CAS是怎么实现线程的安全呢?语言层面不做处理,我们将其交给硬件—CPU和内存,利用CPU的多处理能力,实现硬件层面的阻塞,再加上volatile变量的特性即可实现基于原子操作的线程安全。
CPU指令对CAS的支持 或许我们可能会有这样的疑问,假设存在多个线程执行CAS操作并且CAS的步骤很多,有没有可能在判断V和E相同后,正要赋值时,切换了线程,更改了值。造成了数据不一致呢?答案是否定的,因为CAS是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说CAS是一条CPU的原子指令,不会造成所谓的数据不一致问题。
悲观锁,乐观锁 说到CAS,不得不提到两个专业词语:悲观锁,乐观锁。我们先来看看什么是悲观锁,什么是乐观锁。
悲观锁 顾名思义,就是比较悲观的锁,总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java中synchronized 和ReentrantLock 等独占锁就是悲观锁思想的实现。
乐观锁 反之,总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。我们今天讲的CAS就是乐观锁。
由于CAS操作属于乐观派,它总认为自己可以成功完成操作,当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败,但失败的线程并不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作,这点从图中也可以看出来。基于这样的原理,CAS操作即使没有锁,同样知道其他线程对共享资源操作影响,并执行相应的处理措施。同时从这点也可以看出,由于无锁操作中没有锁的存在,因此不可能出现死锁的情况,也就是说无锁操作天生免疫死锁。
CAS 的优点 非阻塞的轻量级乐观锁, 通过CPU指令实现, 在资源竞争不激烈的情况下性能高, 相比synchronize重量级悲观锁, synchronize有复杂的加锁, 解锁和唤醒线程操作.。
三大问题 ABA问题
假设这样一种场景,当第一个线程执行CAS(V,E,U)操作,在获取到当前变量V,准备修改为新值U前,另外两个线程已连续修改了两次变量V的值,使得该值又恢复为旧值,这样的话,我们就无法正确判断这个变量是否已被修改过,如下图
因为CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。
就像上图描述的一样,线程A原来的值是10,线程B修改为了20,但是线程C又将值修改为了10,这个时候线程A来读取了,与旧值做判断,发现还是10,没有修改过,就做了更新操作,但是我们知道,值有过变更。
这就是典型的CAS的ABA问题,一般情况这种情况发现的概率比较小,可能发生了也不会造成什么问题,比如说我们对某个做加减法,不关心数字的过程,那么发生ABA问题也没啥关系。但是在某些情况下还是需要防止的,那么该如何解决呢?在Java中解决ABA问题,我们可以使用以下两个原子类
AtomicStampedReference
AtomicStampedReference原子类是一个带有时间戳的对象引用,在每次修改后,AtomicStampedReference不仅会设置新值而且还会记录更改的时间。当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值才能写入成功,这也就解决了反复读写时,无法预知值是否已被修改的窘境,测试demo如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 public class ABADemo { static AtomicInteger atIn = new AtomicInteger (100 ); static AtomicStampedReference<Integer> atomicStampedR = new AtomicStampedReference <Integer>(200 ,0 ); static Thread t1 = new Thread (new Runnable () { @Override public void run () { atIn.compareAndSet(100 , 200 ); atIn.compareAndSet(200 , 100 ); } }); static Thread t2 = new Thread (new Runnable () { @Override public void run () { try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } boolean flag=atIn.compareAndSet(100 ,500 ); System.out.println("flag:" +flag+",newValue:" +atIn); } }); static Thread t3 = new Thread (new Runnable () { @Override public void run () { int time=atomicStampedR.getStamp(); atomicStampedR.compareAndSet(100 , 200 ,time,time+1 ); int time2=atomicStampedR.getStamp(); atomicStampedR.compareAndSet(200 , 100 ,time2,time2+1 ); } }); static Thread t4 = new Thread (new Runnable () { @Override public void run () { int time = atomicStampedR.getStamp(); System.out.println("sleep 前 t4 time:" +time); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } boolean flag=atomicStampedR.compareAndSet(100 ,500 ,time,time+1 ); System.out.println("flag:" +flag+",newValue:" +atomicStampedR.getReference()); } }); public static void main (String[] args) throws InterruptedException { t1.start(); t2.start(); t1.join(); t2.join(); t3.start(); t4.start(); } }
对比输出结果可知,AtomicStampedReference类确实解决了ABA的问题,下面我们简单看看其内部实现原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class AtomicStampedReference <V> { private static class Pair <T> { final T reference; final int stamp; private Pair (T reference, int stamp) { this .reference = reference; this .stamp = stamp; } static <T> Pair<T> of (T reference, int stamp) { return new Pair <T>(reference, stamp); } } private volatile Pair<V> pair; public AtomicStampedReference (V initialRef, int initialStamp) { pair = Pair.of(initialRef, initialStamp); } }
接着看看其compareAndSet方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 public boolean compareAndSet (V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference = = current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }
同时对当前数据和当前时间进行比较,只有两者都相等是才会执行casPair()方法,单从该方法的名称就可知是一个CAS方法,最终调用的还是Unsafe类中的compareAndSwapObject方法
1 2 3 private boolean casPair (Pair<V> cmp, Pair<V> val) { return UNSAFE.compareAndSwapObject(this , pairOffset, cmp, val); }
到这我们就很清晰AtomicStampedReference的内部实现思想了,通过一个键值对Pair存储数据和时间戳,在更新时对数据和时间戳进行比较,只有两者都符合预期才会调用Unsafe的compareAndSwapObject方法执行数值和时间戳替换,也就避免了ABA的问题。
AtomicMarkableReference
AtomicMarkableReference与AtomicStampedReference不同的是,AtomicMarkableReference维护的是一个boolean值的标识,也就是说至于true和false两种切换状态,经过测试,这种方式并不能完全防止ABA问题的发生,只能减少ABA问题发生的概率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class ABADemo { static AtomicMarkableReference<Integer> atMarkRef = new AtomicMarkableReference <Integer>(100 ,false ); static Thread t5 = new Thread (new Runnable () { @Override public void run () { boolean mark=atMarkRef.isMarked(); System.out.println("mark:" +mark); System.out.println("t5 result:" +atMarkRef.compareAndSet(atMarkRef.getReference(), 200 ,mark,!mark)); } }); static Thread t6 = new Thread (new Runnable () { @Override public void run () { boolean mark2=atMarkRef.isMarked(); System.out.println("mark2:" +mark2); System.out.println("t6 result:" +atMarkRef.compareAndSet(atMarkRef.getReference(), 100 ,mark2,!mark2)); } }); static Thread t7 = new Thread (new Runnable () { @Override public void run () { boolean mark=atMarkRef.isMarked(); System.out.println("sleep 前 t7 mark:" +mark); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } boolean flag=atMarkRef.compareAndSet(100 ,500 ,mark,!mark); System.out.println("flag:" +flag+",newValue:" +atMarkRef.getReference()); } }); public static void main (String[] args) throws InterruptedException { t5.start();t5.join(); t6.start();t6.join(); t7.start(); } }
AtomicMarkableReference的实现原理与AtomicStampedReference类似,这里不再介绍。到此,我们也明白了如果要完全杜绝ABA问题的发生,我们应该使用AtomicStampedReference原子类更新对象,而对于AtomicMarkableReference来说只能减少ABA问题的发生概率,并不能杜绝。
循环时间长开销大 自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
只能保证一个共享变量的原子操作 当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。
还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java 1.5开始,JDK提供了AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。
Unsafe类
Unsafe类存在于sun.misc包中,其内部方法操作可以像C的指针一样直接操作内存,单从名称看来就可以知道该类是非安全的,毕竟Unsafe拥有着类似于C的指针操作,因此总是不应该首先使用Unsafe类,Java官方也不建议直接使用的Unsafe类,据说Oracle正在计划从Java 9中去掉Unsafe类,但我们还是很有必要了解该类,因为Java中CAS操作的执行依赖于Unsafe类的方法,注意Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都直接调用操作系统底层资源执行相应任务,关于Unsafe类的主要功能点如下:
内存管理
Unsafe类中存在直接操作内存的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public native long allocateMemory (long bytes) ;public native long reallocateMemory (long address, long bytes) ;public native void freeMemory (long address) ;public native void setMemory (Object o, long offset, long bytes, byte value) ;public native void putAddress (long address, long x) ;public native long getAddress (long address) ;public native void putLong (long address, long x) ;public native long getLong (long address) ;public native byte getByte (long address) ;public native void putByte (long address, byte x) ;public native int pageSize () ;
提供实例对象新途径 1 2 public native Object allocateInstance (Class cls) throws InstantiationException;
类和实例对象以及变量的操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public native long objectFieldOffset (Field f) ;public native long staticFieldOffset (Field f) ;public native Object staticFieldBase (Field f) ;public native int getInt (Object o, long offset) ;public native void putInt (Object o, long offset, int x) ;public native Object getObject (Object o, long offset) ;public native void putObject (Object o, long offset, Object x) ;public native void putIntVolatile (Object o, long offset, int x) ;public native int getIntVolatile (Object o, long offset) ;public native void putOrderedInt (Object o,long offset,int x) ;
下面通过一个简单的Demo来演示上述的一些方法以便加深对Unsafe类的理解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public class UnSafeDemo { public static void main (String[] args) throws NoSuchFieldException, IllegalAccessException, InstantiationException { Field field = Unsafe.class.getDeclaredField("theUnsafe" ); field.setAccessible(true ); Unsafe unsafe = (Unsafe) field.get(null ); System.out.println(unsafe); User user = (User) unsafe.allocateInstance(User.class); Class userClass = user.getClass(); Field name = userClass.getDeclaredField("name" ); Field age = userClass.getDeclaredField("age" ); Field id = userClass.getDeclaredField("id" ); unsafe.putInt(user,unsafe.objectFieldOffset(age),18 ); unsafe.putObject(user,unsafe.objectFieldOffset(name),"android TV" ); Object staticBase = unsafe.staticFieldBase(id); System.out.println("staticBase:" +staticBase); long staticOffset = unsafe.staticFieldOffset(userClass.getDeclaredField("id" )); System.out.println("设置前的ID:" +unsafe.getObject(staticBase,staticOffset)); unsafe.putObject(staticBase,staticOffset,"SSSSSSSS" ); System.out.println("设置前的ID:" +unsafe.getObject(staticBase,staticOffset)); System.out.println("输出USER:" +user.toString()); long data = 1000 ; byte size = 1 ; long memoryAddress = unsafe.allocateMemory(size); unsafe.putAddress(memoryAddress, data); long addrData=unsafe.getAddress(memoryAddress); System.out.println("addrData:" +addrData); } } class User { public User () { System.out.println("user 构造方法被调用" ); } private String name; private int age; private static String id="USER_ID" ; @Override public String toString () { return "User{" + "name='" + name + '\'' + ", age=" + age +'\'' + ", id=" + id +'\'' + '}' ; } }
虽然在Unsafe类中存在getUnsafe()方法,但该方法只提供给高级的Bootstrap类加载器使用,普通用户调用将抛出异常,所以我们在Demo中使用了反射技术获取了Unsafe实例对象并进行相关操作。
1 2 3 4 5 6 public static Unsafe getUnsafe () { Class cc = sun.reflect.Reflection.getCallerClass(2 ); if (cc.getClassLoader() != null ) throw new SecurityException ("Unsafe" ); return theUnsafe; }
数组操作 1 2 3 4 public native int arrayBaseOffset (Class arrayClass) ;public native int arrayIndexScale (Class arrayClass) ;
CAS 操作相关
CAS是一些CPU直接支持的指令,也就是我们前面分析的无锁操作,在Java中无锁操作CAS基于以下3个方法实现,在稍后讲解Atomic系列内部方法是基于下述方法的实现的。
1 2 3 4 5 6 7 public final native boolean compareAndSwapObject (Object o, long offset,Object expected, Object x) ; public final native boolean compareAndSwapInt (Object o, long offset,int expected,int x) ;public final native boolean compareAndSwapLong (Object o, long offset,long expected,long x) ;
这里还需介绍Unsafe类中JDK 1.8新增的几个方法,它们的实现是基于上述的CAS方法,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public final int getAndAddInt (Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; } public final long getAndAddLong (Object o, long offset, long delta) { long v; do { v = getLongVolatile(o, offset); } while (!compareAndSwapLong(o, offset, v, v + delta)); return v; } public final int getAndSetInt (Object o, long offset, int newValue) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, newValue)); return v; } public final long getAndSetLong (Object o, long offset, long newValue) { long v; do { v = getLongVolatile(o, offset); } while (!compareAndSwapLong(o, offset, v, newValue)); return v; } public final Object getAndSetObject (Object o, long offset, Object newValue) { Object v; do { v = getObjectVolatile(o, offset); } while (!compareAndSwapObject(o, offset, v, newValue)); return v; }
上述的方法我们在稍后的Atomic系列分析中还会见到它们的身影。
挂起与恢复
将一个线程进行挂起是通过park方法实现的,调用 park后,线程将一直阻塞直到超时或者中断等条件出现。unpark可以终止一个挂起的线程,使其恢复正常。Java对线程的挂起操作被封装在 LockSupport类中,LockSupport类中有各种版本pack方法,其底层实现最终还是使用Unsafe.park()方法和Unsafe.unpark()方法
1 2 3 4 5 public native void park (boolean isAbsolute, long time) ; public native void unpark (Object thread) ;
内存屏障
这里主要包括了loadFence、storeFence、fullFence等方法,这些方法是在Java 8新引入的,用于定义内存屏障,避免代码重排序。
1 2 3 4 5 6 public native void loadFence () ;public native void storeFence () ;public native void fullFence () ;
其他操作 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Deprecated public native void monitorEnter (Object var1) ;@Deprecated public native void monitorExit (Object var1) ;@Deprecated public native boolean tryMonitorEnter (Object var1) ;public native int pageSize () ; public native Class defineClass (String name, byte [] b, int off, int len, ClassLoader loader, ProtectionDomain protectionDomain) ; public native Class defineAnonymousClass (Class hostClass, byte [] data, Object[] cpPatches) ;public native boolean shouldBeInitialized (Class<?> c) ;public native void ensureClassInitialized (Class<?> c)
原子操作类使用
通过前面的分析我们已基本理解了无锁CAS的原理并对Java中的指针类Unsafe类有了比较全面的认识,下面进一步分析CAS在Java中的应用,即并发包中的原子操作类(Atomic系列),从JDK 1.5开始提供了java.util.concurrent.atomic包,在该包中提供了许多基于CAS实现的原子操作类,用法方便,性能高效,主要分以下4种类型。
原子更新基本类型
原子更新基本类型主要包括3个类:
AtomicBoolean:原子更新布尔类型
AtomicInteger:原子更新整型
AtomicLong:原子更新长整型
原理 这3个类的实现原理和使用方式几乎是一样的,这里我们以AtomicInteger为例进行分析,AtomicInteger主要是针对int类型的数据执行原子操作,它提供了原子自增方法、原子自减方法以及原子赋值方法等,鉴于AtomicInteger的源码不多,我们直接看源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 public class AtomicInteger extends Number implements java .io.Serializable { private static final long serialVersionUID = 6214790243416807050L ; private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value" )); } catch (Exception ex) { throw new Error (ex); } } private volatile int value; public AtomicInteger (int initialValue) { value = initialValue; } public AtomicInteger () { } public final int get () { return value; } public final void set (int newValue) { value = newValue; } public final void lazySet (int newValue) { unsafe.putOrderedInt(this , valueOffset, newValue); } public final int getAndSet (int newValue) { return unsafe.getAndSetInt(this , valueOffset, newValue); } public final boolean compareAndSet (int expect, int update) { return unsafe.compareAndSwapInt(this , valueOffset, expect, update); } public final int getAndIncrement () { return unsafe.getAndAddInt(this , valueOffset, 1 ); } public final int getAndDecrement () { return unsafe.getAndAddInt(this , valueOffset, -1 ); } public final int getAndAdd (int delta) { return unsafe.getAndAddInt(this , valueOffset, delta); } public final int incrementAndGet () { return unsafe.getAndAddInt(this , valueOffset, 1 ) + 1 ; } public final int decrementAndGet () { return unsafe.getAndAddInt(this , valueOffset, -1 ) - 1 ; } public final int addAndGet (int delta) { return unsafe.getAndAddInt(this , valueOffset, delta) + delta; } }
通过上述的分析,可以发现AtomicInteger原子类的内部几乎是基于前面分析过Unsafe类中的CAS相关操作的方法实现的,这也同时证明AtomicInteger是基于无锁实现的,这里重点分析自增操作实现过程,其他方法自增实现原理一样。
1 2 3 4 public final int incrementAndGet () { return unsafe.getAndAddInt(this , valueOffset, 1 ) + 1 ; }
我们发现AtomicInteger类中所有自增或自减的方法都间接调用Unsafe类中的getAndAddInt()方法实现了CAS操作,从而保证了线程安全,关于getAndAddInt其实前面已分析过,它是Unsafe类中1.8新增的方法,源码如下
1 2 3 4 5 6 7 8 public final int getAndAddInt (Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; }
可看出getAndAddInt通过一个while循环不断的重试更新要设置的值,直到成功为止,调用的是Unsafe类中的compareAndSwapInt方法,是一个CAS操作方法。这里需要注意的是,上述源码分析是基于JDK1.8的,如果是1.8之前的方法,AtomicInteger源码实现有所不同,是基于for死循环的,如下
1 2 3 4 5 6 7 8 9 10 public final int incrementAndGet () { for (;;) { int current = get(); int next = current + 1 ; if (compareAndSet(current, next)) return next; } }
使用
下面简单看个Demo,感受一下AtomicInteger使用方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class AtomicIntegerDemo { static AtomicInteger i=new AtomicInteger (); public static class AddThread implements Runnable { public void run () { for (int k=0 ;k<10000 ;k++) i.incrementAndGet(); } } public static void main (String[] args) throws InterruptedException { Thread[] ts=new Thread [10 ]; for (int k=0 ;k<10 ;k++){ ts[k]=new Thread (new AddThread ()); } for (int k=0 ;k<10 ;k++){ts[k].start();} for (int k=0 ;k<10 ;k++){ts[k].join();} System.out.println(i); } }
在Demo中,使用原子类型AtomicInteger替换普通int类型执行自增的原子操作,保证了线程安全。至于AtomicBoolean和AtomicLong的使用方式以及实现原理是一样,大家可以自行查阅源码。
原子更新引用 使用
原子更新引用类型可以同时更新引用类型,这里主要分析一下AtomicReference原子类,即原子更新引用类型。先看看其使用方式,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class AtomicReferenceDemo2 { public static AtomicReference<User> atomicUserRef = new AtomicReference <User>(); public static void main (String[] args) { User user = new User ("zejian" , 18 ); atomicUserRef.set(user); User updateUser = new User ("Shine" , 25 ); atomicUserRef.compareAndSet(user, updateUser); System.out.println(atomicUserRef.get().toString()); } static class User { public String name; private int age; public User (String name, int age) { this .name = name; this .age = age; } public String getName () { return name; } @Override public String toString () { return "User{" + "name='" + name + '\'' + ", age=" + age + '}' ; } } }
原理
那么AtomicReference原子类内部是如何实现CAS操作的呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class AtomicReference <V> implements java .io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicReference.class.getDeclaredField("value" )); } catch (Exception ex) { throw new Error (ex); } } private volatile V value; public final boolean compareAndSet (V expect, V update) { return unsafe.compareAndSwapObject(this , valueOffset, expect, update); } public final V getAndSet (V newValue) { return (V)unsafe.getAndSetObject(this , valueOffset, newValue); } } public final Object getAndSetObject (Object o, long offset, Object newValue) { Object v; do { v = getObjectVolatile(o, offset); } while (!compareAndSwapObject(o, offset, v, newValue)); return v; }
从源码看来,AtomicReference与AtomicInteger的实现原理基本是一样的,最终执行的还是Unsafe类,关于AtomicReference的其他方法也是一样的,如下
红框内的方法是Java8新增的,可以基于Lambda表达式对传递进来的期望值或要更新的值进行其他操作后再进行CAS操作,说白了就是对期望值或要更新的值进行额外修改后再执行CAS更新,在所有的Atomic原子类中几乎都存在这几个方法。
原子更新数组
原子更新数组指的是通过原子的方式更新数组里的某个元素 ,主要有以下3个类
AtomicIntegerArray:原子更新整数数组里的元素
AtomicLongArray:原子更新长整数数组里的元素
AtomicReferenceArray:原子更新引用类型数组里的元素
使用
这里以AtomicIntegerArray为例进行分析,其余两个使用方式和实现原理基本一样,简单案例如下,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class AtomicIntegerArrayDemo { static AtomicIntegerArray arr = new AtomicIntegerArray (10 ); public static class AddThread implements Runnable { public void run () { for (int k=0 ;k<10000 ;k++) arr.getAndIncrement(k%arr.length()); } } public static void main (String[] args) throws InterruptedException { Thread[] ts=new Thread [10 ]; for (int k=0 ;k<10 ;k++){ ts[k]=new Thread (new AddThread ()); } for (int k=0 ;k<10 ;k++){ts[k].start();} for (int k=0 ;k<10 ;k++){ts[k].join();} System.out.println(arr); } }
原理
启动10条线程对数组中的元素进行自增操作,执行结果符合预期。使用方式比较简单,接着看看AtomicIntegerArray内部是如何实现,先看看部分源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class AtomicIntegerArray implements java .io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final int base = unsafe.arrayBaseOffset(int [].class); private static final int shift; private final int [] array; static { int scale = unsafe.arrayIndexScale(int [].class); if ((scale & (scale - 1 )) != 0 ) throw new Error ("data type scale not a power of two" ); shift = 31 - Integer.numberOfLeadingZeros(scale); } private long checkedByteOffset (int i) { if (i < 0 || i >= array.length) throw new IndexOutOfBoundsException ("index " + i); return byteOffset(i); } private static long byteOffset (int i) { return ((long ) i << shift) + base; } }
通过前面对Unsafe类的分析,我们知道arrayBaseOffset方法可以获取数组的第一个元素起始地址,而arrayIndexScale方法可以获取每个数组元素占用的内存空间,由于这里是Int类型,而Java中一个int类型占用4个字节,也就是scale的值为4,那么如何根据数组下标值计算每个元素的内存地址呢?显然应该是
每个数组元素的内存地址=起始地址+元素下标 * 每个元素所占用的内存空间
与该方法原理相同
1 2 3 4 private static long byteOffset (int i) { return ((long ) i << shift) + base; }
这是为什么,首先来计算出shift的值
1 shift = 31 - Integer.numberOfLeadingZeros(scale);
其中Integer.numberOfLeadingZeros(scale)是计算出scale的前导零个数(必须是连续的),scale=4,转成二进制为 00000000 00000000 00000000 00000100
即前导零数为29,也就是shift=2,然后利用shift来定位数组中的内存位置,在数组不越界时,计算出前3个数组元素内存地址
1 2 3 4 5 6 7 address = base + 0 * 4 即address= base + 0 << 2 address = base + 1 * 4 即address= base + 1 << 2 address = base + 2 * 4 即address= base + 2 << 2
显然shift=2,替换去就是
1 address= base + i << shift
这就是 byteOffset(int i) 方法的计算原理。因此byteOffset(int)方法可以根据数组下标计算出每个元素的内存地址。至于其他方法就比较简单了,都是间接调用Unsafe类的CAS原子操作方法,如下简单看其中几个常用方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public final int getAndIncrement (int i) { return getAndAdd(i, 1 ); } public final int incrementAndGet (int i) { return getAndAdd(i, 1 ) + 1 ; } public final int decrementAndGet (int i) { return getAndAdd(i, -1 ) - 1 ; } public final int getAndAdd (int i, int delta) { return unsafe.getAndAddInt(array, checkedByteOffset(i), delta); } public final int getAndAddInt (Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; }
至于AtomicLongArray和AtomicReferenceArray原子类,使用方式和实现原理基本一样。
原子更新属性
如果我们只需要某个类里的某个字段,也就是说让普通的变量也享受原子操作,可以使用原子更新字段类,如在某些时候由于项目前期考虑不周全,项目需求又发生变化,使得某个类中的变量需要执行多线程操作,由于该变量多处使用,改动起来比较麻烦,而且原来使用的地方无需使用线程安全,只要求新场景需要使用时,可以借助原子更新器处理这种场景。
Atomic并发包提供了以下三个类:
AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
AtomicLongFieldUpdater:原子更新长整型字段的更新器。
AtomicReferenceFieldUpdater:原子更新引用类型里的字段。
请注意原子更新器的使用存在比较苛刻的条件如下
操作的字段不能是static类型。
操作的字段不能是final类型的,因为final根本没法修改。
字段必须是volatile修饰的,也就是数据本身是读一致的。
属性必须对当前的Updater所在的区域是可见的,如果不是当前类内部进行原子更新器操作不能使用private,protected子类操作父类时修饰符必须是protect权限及以上,如果在同一个package下则必须是default权限及以上,也就是说无论何时都应该保证操作类与被操作类间的可见性。
使用
下面看看AtomicIntegerFieldUpdater和AtomicReferenceFieldUpdater的简单使用方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class AtomicIntegerFieldUpdaterDemo { public static class Candidate { int id; volatile int score; } public static class Game { int id; volatile String name; public Game (int id, String name) { this .id = id; this .name = name; } @Override public String toString () { return "Game{" + "id=" + id + ", name='" + name + '\'' + '}' ; } } static AtomicIntegerFieldUpdater<Candidate> atIntegerUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score" ); static AtomicReferenceFieldUpdater<Game,String> atRefUpdate = AtomicReferenceFieldUpdater.newUpdater(Game.class,String.class,"name" ); public static AtomicInteger allScore=new AtomicInteger (0 ); public static void main (String[] args) throws InterruptedException { final Candidate stu=new Candidate (); Thread[] t=new Thread [10000 ]; for (int i = 0 ; i < 10000 ; i++) { t[i]=new Thread () { public void run () { if (Math.random()>0.4 ){ atIntegerUpdater.incrementAndGet(stu); allScore.incrementAndGet(); } } }; t[i].start(); } for (int i = 0 ; i < 10000 ; i++) { t[i].join();} System.out.println("最终分数score=" +stu.score); System.out.println("校验分数allScore=" +allScore); Game game = new Game (2 ,"zh" ); atRefUpdate.compareAndSet(game,game.name,"JAVA-HHH" ); System.out.println(game.toString()); } }
我们使用AtomicIntegerFieldUpdater更新候选人(Candidate)的分数score,开启了10000条线程投票,当随机值大于0.4时算一票,分数自增一次,其中allScore用于验证分数是否正确(其实用于验证AtomicIntegerFieldUpdater更新的字段是否线程安全),当allScore与score相同时,则说明投票结果无误,也代表AtomicIntegerFieldUpdater能正确更新字段score的值,是线程安全的。对于AtomicReferenceFieldUpdater,我们在代码中简单演示了其使用方式,注意在AtomicReferenceFieldUpdater注明泛型时需要两个泛型参数,一个是修改的类类型,一个修改字段的类型。至于AtomicLongFieldUpdater则与AtomicIntegerFieldUpdater类似,不再介绍。
原理
接着简单了解一下AtomicIntegerFieldUpdater的实现原理,实际就是反射和Unsafe类结合,AtomicIntegerFieldUpdater是个抽象类,实际实现类为AtomicIntegerFieldUpdaterImpl
1 2 3 4 5 6 7 8 9 public abstract class AtomicIntegerFieldUpdater <T> { public static <U> AtomicIntegerFieldUpdater<U> newUpdater (Class<U> tclass, String fieldName) { return new AtomicIntegerFieldUpdaterImpl <U> (tclass, fieldName, Reflection.getCallerClass()); } }
看看AtomicIntegerFieldUpdaterImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 private static class AtomicIntegerFieldUpdaterImpl <T> extends AtomicIntegerFieldUpdater <T> { private static final Unsafe unsafe = Unsafe.getUnsafe(); private final long offset; private final Class<T> tclass; private final Class<?> cclass; AtomicIntegerFieldUpdaterImpl(final Class<T> tclass, final String fieldName, final Class<?> caller) { final Field field; final int modifiers; try { field = AccessController.doPrivileged( new PrivilegedExceptionAction <Field>() { public Field run () throws NoSuchFieldException { return tclass.getDeclaredField(fieldName); } }); modifiers = field.getModifiers(); sun.reflect.misc.ReflectUtil.ensureMemberAccess( caller, tclass, null , modifiers); ClassLoader cl = tclass.getClassLoader(); ClassLoader ccl = caller.getClassLoader(); if ((ccl != null ) && (ccl != cl) && ((cl == null ) || !isAncestor(cl, ccl))) { sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass); } } catch (PrivilegedActionException pae) { throw new RuntimeException (pae.getException()); } catch (Exception ex) { throw new RuntimeException (ex); } Class<?> fieldt = field.getType(); if (fieldt != int .class) throw new IllegalArgumentException ("Must be integer type" ); if (!Modifier.isVolatile(modifiers)) throw new IllegalArgumentException ("Must be volatile type" ); this .cclass = (Modifier.isProtected(modifiers) && caller != tclass) ? caller : null ; this .tclass = tclass; offset = unsafe.objectFieldOffset(field); } }
从AtomicIntegerFieldUpdaterImpl的构造器也可以看出更新器为什么会有这么多限制条件了,当然最终其CAS操作肯定是通过unsafe完成的,简单看一个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public int incrementAndGet (T obj) { int prev, next; do { prev = get(obj); next = prev + 1 ; } while (!compareAndSet(obj, prev, next)); return next; } public boolean compareAndSet (T obj, int expect, int update) { if (obj == null || obj.getClass() != tclass || cclass != null ) fullCheck(obj); return unsafe.compareAndSwapInt(obj, offset, expect, update); }
再谈自旋锁
自旋锁是一种假设在不久将来,当前的线程可以获得锁,因此虚拟机会让当前想要获取锁的线程做几个空循环(这也是称为自旋的原因),在经过若干次循环后,如果得到锁,就顺利进入临界区。如果还不能获得锁,那就会将线程在操作系统层面挂起,这种方式确实也是可以提升效率的。但问题是当线程越来越多竞争很激烈时,占用CPU的时间变长会导致性能急剧下降,因此Java虚拟机内部一般对于自旋锁有一定的次数限制,可能是50或者100次循环后就放弃,直接挂起线程,让出CPU资源。如下通过AtomicReference可实现简单的自旋锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class SpinLock { private AtomicReference<Thread> sign =new AtomicReference <>(); public void lock () { Thread current = Thread.currentThread(); while (!sign .compareAndSet(null , current)){ } } public void unlock () { Thread current = Thread.currentThread(); sign .compareAndSet(current, null ); } }
使用CAS原子操作作为底层实现,lock()方法将要更新的值设置为当前线程,并将预期值设置为null。unlock()函数将要更新的值设置为null,并预期值设置为当前线程。然后我们通过lock()和unlock来控制自旋锁的开启与关闭,注意这是一种非公平锁。事实上AtomicInteger(或者AtomicLong)原子类内部的CAS操作也是通过不断的自循环(while循环)实现,不过这种循环的结束条件是线程成功更新对于的值,但也是自旋锁的一种。