CAS&Atomic
CAS
概述
CAS的全称为Compare-And-Swap ,它是一条CPU并发原语
,比较 工作内存值(预期值) 和 主物理内存 的共享值是否相同,相同则执行规定操作,否则继续比较直到主内存和工作内存的值一致为止。这个过程是原子的
CAS并发原语是现在 Java 语言中就是 sun.misc包下的 UnSafe 类中的各个方法。调用 UnSafe 类中的 CAS 方法, JVM 会帮我实现 CAS 汇编指令。这是一种完全依赖于硬件功能
,通过它实现了原子操作。
就像我们在使用使用乐观锁操作数据库时,也会 通过版本号来保证多线程情况下只有一个线程能执行成功。
操作
atomicInteger.getAndIncrement()
1 2 3
| public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
|
调用了 unsafe.getAndAddInt();
1 2 3 4 5 6 7
| public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
public class AtomicIntDemo { public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(); int i = atomicInteger.incrementAndGet(); System.out.println("i = " + i);
int andIncrement = atomicInteger.getAndIncrement(); System.out.println("andIncrement = " + andIncrement); System.out.println("atomicInteger.compareAndSet(2,3) = " + atomicInteger.compareAndSet(2, 3)); System.out.println("atomicInteger.compareAndSet(2,3) = " + atomicInteger.compareAndSet(2, 3)); } }
|
CAS 问题
CAS 方式为乐观锁,synchronized 为悲观锁。使用 CAS 解决并发问题通常情况下性能更优。
但使用 CAS 方式也会有几个问题:
ABA问题
因为CAS需要在操作值的时候,检查值有没有发生变化,比如没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时则会发现它的值没有发生变化,但是实际上却变化了。
ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A->B->A就会变成1A->2B->3A。
从Java 1.5开始,JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的 compareAndSet 方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
循环时间长开销大
自旋CAS如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的 pause 指令,那么效率会有一定的提升。
pause 指令有两个作用:
- 第一,它可以延迟流水线执行命令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零;
- 第二,它可以避免在退出循环的时候因内存顺序冲突(Memory Order Violation)而引起 CPU 流水线被清空(CPU Pipeline Flush),从而提高 CPU 的执行效率。
只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS 就无法保证操作的原子性,这个时候就可以用锁。
还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i = 2,j = a,合并一下ij = 2a,然后用CAS来操作ij。
从Java 1.5开始,JDK提供了 AtomicReference 类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。
Unsafe
UnSafe类中所有的方法都是native修饰的,也就是说UnSafe类中的方法都是直接调用操作底层资源执行响应的任务
但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。
这个类尽管里面的方法都是 public 的,但是并没有办法使用它们,JDK API 文档也没有提供任何关于这个类的方法的解释。总而言之,对于 Unsafe 类的使用都是受限制的,只有授信的代码才能获得该类的实例,当然 JDK 库里面的类是可以随意使用的。
Unsafe与CAS
AtomicInteger 中使用 unsafe。
从源码中发现,内部使用自旋的方式进行CAS更新(while循环进行CAS更新,如果更新失败,则循环再次重试)。
又从Unsafe类中发现,原子操作其实只支持下面三个方法。
1 2 3 4 5 6
| public final native boolean compareAndSwapObject(Object paramObject1, long paramLong, Object paramObject2, Object paramObject3);
public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);
public final native boolean compareAndSwapLong(Object paramObject, long paramLong1, long paramLong2, long paramLong3);
|
发现Unsafe只提供了3种CAS方法:compareAndSwapObject、compareAndSwapInt和compareAndSwapLong。都是native方法。
原子类
基本类型
常用API
方法 |
解释 |
public final int get() |
获取当前的值 |
public final int getAndSet(int newValue) |
获取到当前的值,并设置新的值 |
public final int getAndIncrement() |
获取当前的值,并自增 |
public final int getAndDecrement() |
获取到当前的值,并自减 |
public final int getAndAdd(int delta) |
获取到当前的值,并加上预期的值 |
public final int incrementAndGet( ) |
返回的是加1后的值 |
boolean compareAndSet(int expect,int update) |
如果输入的数值等于预期值,返回true |
AtomicInteger 解决 i++
CountDownLatch 如何在程序中使用
在没使用 CountDownLatch 时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package tech.chen.juccode.a14;
import java.util.concurrent.atomic.AtomicInteger;
public class CountDownDemo { private static AtomicInteger atomicInteger = new AtomicInteger(0); public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(()->{ for (int j = 0; j < 100; j++) { atomicInteger.getAndIncrement(); } }).start(); } System.out.println("atomicInteger.get() = " + atomicInteger.get()); } }
|
结果:
atomicInteger.get() = 4900
发现使用 AtomicInteger 居然没有得到理想值 5000,是 AtomicInteger 的问题吗?显然不可能,实际原因是,累加还没结束,主线程就打印出了结果,不信你可以等两秒。但是,实际操作我们不可能设置一个固定的睡眠时间。CountDownLatch 出现的意义也在此。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package tech.chen.juccode.a14;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger;
public class CountDownDemo { private static AtomicInteger atomicInteger = new AtomicInteger(0); private static CountDownLatch countDownLatch = new CountDownLatch(50); public static void main(String[] args) { for (int i = 0; i <50; i++) { new Thread(()->{ for (int j = 0; j < 100; j++) { atomicInteger.getAndIncrement(); } countDownLatch.countDown(); }).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("atomicInteger.get() = " + atomicInteger.get()); } }
|
atomicInteger.get() = 5000
AtomicBoolean中断标识
AtomicBoolean可以作为中断标识停止线程的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class AtomicBooleanDemo { public static void main(String[] args) { AtomicBoolean atomicBoolean=new AtomicBoolean(false);
new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t"+"coming....."); while(!atomicBoolean.get()){ System.out.println("=========="); } System.out.println(Thread.currentThread().getName()+"\t"+"over....."); },"A").start();
new Thread(()->{ atomicBoolean.set(true); },"B").start(); } }
|
Atomic Long
AtomicLong 的底层是 CAS+自旋锁 的思想,适用于低并发的全局计算,高并发后性能急剧下降,原因如下:N个线程CAS操作修改线程的值,每次只有一个成功过,其他N-1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子 cpu 就打高了(AtomicLong的自旋会成为瓶颈)
在高并发的情况下,我们使用LoadAdder
数组类型
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package tech.chen.juccode.a14;
import java.util.concurrent.atomic.AtomicIntegerArray;
public class AtomicArray { public static void main(String[] args) { AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5); for (int i = 0; i < atomicIntegerArray.length(); i++) { int i1 = atomicIntegerArray.get(i); System.out.println("i1 = " + i1); }
int i = atomicIntegerArray.incrementAndGet(0); System.out.println("i = " + i); int andAdd = atomicIntegerArray.addAndGet(1, 4); System.out.println("andAdd = " + andAdd); } }
|
引用类型
AtomicReference、AtomicStampedReference、AtomicMarkableReference
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package tech.chen.juccode.a14; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceForLock {
private static AtomicReference<Thread> atomicReference = new AtomicReference<>();
private static void lock(){ Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + "\t" + "coming "); while (!atomicReference.compareAndSet(null, thread)) {} }
private static void unlock(){ Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + "\t" + "take over "); atomicReference.compareAndSet(thread,null); }
public static void main(String[] args) { new Thread(()->{ lock(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } unlock(); },"t1").start(); new Thread(()->{ lock(); unlock(); },"t2").start(); } }
|
AtomicStampedReference 这个带有戳记的 引用原子类 在上一篇已经讲过,这里就不赘述了
AtomicMarkableReference
- 原子更新带有标志位的引用类型对象
- 解决是否修改(它的定义就是将状态戳简化位 true|false ) ,类似一次性筷子
- 状态戳 (true/false) 原子引用
- 不建议用它解决 ABA 问题
- Markable – true、false 是否修改过,跟 AtomicStampedReference 的区别就是 它只负责监督一次改变。
对象属性修改类型
目的 : 以一种线程安全的方式操作非线程安全对象内的某些字段 。判断是否可以不要锁定整个对象,减少锁定的范围,只关注长期、敏感性变化的某一个字段,而不是整个对象,达到精确加锁+节约内存的目的
条件:
- 更新的对象属性必须使用 public volatile 修饰符
- 因为对象的属性修改类型原子类都是抽象类 ,所以每次使用都必须使用静态方法 newUpdater( ) 创建一个更新器,并且需要设置想要更新的类和属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package tech.chen.juccode.a14;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class AtomicFieldUpdaterDemo { public static void main(String[] args) { User user = new User(); AtomicIntegerFieldUpdater<User> atomicIntegerFieldUpdaterAge = AtomicIntegerFieldUpdater.newUpdater(User.class,"age"); AtomicReferenceFieldUpdater atomicReferenceFieldUpdaterName = AtomicReferenceFieldUpdater.newUpdater(User.class,String.class,"name"); int i = atomicIntegerFieldUpdaterAge.get(user); System.out.println("i = " + i); Object o = atomicReferenceFieldUpdaterName.get(user); System.out.println("o = " + o); int newAge = atomicIntegerFieldUpdaterAge.accumulateAndGet(user, 18, (x, y) -> x + y); System.out.println("newAge = " + newAge); boolean czk = atomicReferenceFieldUpdaterName.compareAndSet(user, "czk", "0205"); System.out.println("czk = " + czk); } } class User{ public volatile int age = 0; public volatile String name = "czk"; }
|
运行结果
i = 0
o = czk
newAge = 18
原子操作增强类
DoubleAccumulator 、DoubleAdder 、LongAccumulator 、LongAdder
常用API
方法 |
解释 |
void add(long x) |
将当前的value加1 |
void increment( ) |
将当前的value加1 |
void decrement( ) |
将当前value减1 |
long sum( ) |
返回当前的值,特别注意,在没有并发更新value的情况下sum会返回一个精确值,在存在并发的情况下,sum不保证返回精确值 |
long longvale |
等价于long sum( )。将value重置为0,可用于替换重新new一个LongAdder,但此方法只可以在没有并发更新的情况下使用 |
long sumThenReset() |
获取当前value,并将value重置为0 |
- LongAdder只能用来计算加法、减法,且从零开始计算
- LongAccumulator提供了自定义的函数操作

| package tech.chen.juccode.a14;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder;
public class LongAdderCalcDemo { public static final int SIZE_THREAD=50; public static final int _1w=10000;
public static void main(String[] args) { synchronizedTest(); atomicIntegerTest(); atomicLongTest(); longAdderTest(); longAccumulatorTest(); }
private static void longAdderTest() { CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD); ClickNumber clickNumber=new ClickNumber(); long startTime = System.currentTimeMillis(); for (int i = 1 ; i <=SIZE_THREAD ; i++) { new Thread(()->{ try{ for (int j = 1; j <=100*_1w; j++) { clickNumber.add_longAdder(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.print("-----add_longAdder:"+(endTime-startTime)+"毫秒"+"\t"); System.out.println(clickNumber.adder.longValue()); } private static void synchronizedTest() { CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD); ClickNumber clickNumber=new ClickNumber(); long startTime = System.currentTimeMillis(); for (int i = 1 ; i <=SIZE_THREAD ; i++) { new Thread(()->{ try{ for (int j = 1; j <=100*_1w; j++) { clickNumber.add_synchronized(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.print("-----add_synchronized:"+(endTime-startTime)+"毫秒"+"\t"); System.out.println(clickNumber.number); } private static void atomicIntegerTest() { CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD); ClickNumber clickNumber=new ClickNumber(); long startTime = System.currentTimeMillis(); for (int i = 1 ; i <=SIZE_THREAD ; i++) { new Thread(()->{ try{ for (int j = 1; j <=100*_1w; j++) { clickNumber.add_atomicInteger(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.print("-----add_atomicInteger:"+(endTime-startTime)+"毫秒"+"\t"); System.out.println(clickNumber.atomicInteger.get()); } private static void atomicLongTest() { CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD); ClickNumber clickNumber=new ClickNumber(); long startTime = System.currentTimeMillis(); for (int i = 1 ; i <=SIZE_THREAD ; i++) { new Thread(()->{ try{ for (int j = 1; j <=100*_1w; j++) { clickNumber.add_atomicLong(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.print("-----add_atomicLong:"+(endTime-startTime)+"毫秒"+"\t"); System.out.println(clickNumber.atomicLong.get()); } private static void longAccumulatorTest() { CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD); ClickNumber clickNumber=new ClickNumber(); long startTime = System.currentTimeMillis(); for (int i = 1 ; i <=SIZE_THREAD ; i++) { new Thread(()->{ try{ for (int j = 1; j <=100*_1w; j++) { clickNumber.add_longAccumulater(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.print("-----add_longAccumulater:"+(endTime-startTime)+"毫秒"+"\t"); System.out.println(clickNumber.accumulator.get()); } } class ClickNumber{ int number=0;
public synchronized void add_synchronized(){ number++; } AtomicInteger atomicInteger=new AtomicInteger(); public void add_atomicInteger(){ atomicInteger.incrementAndGet(); } AtomicLong atomicLong=new AtomicLong(); public void add_atomicLong(){ atomicLong.incrementAndGet(); } LongAdder adder=new LongAdder(); public void add_longAdder(){ adder.increment(); } LongAccumulator accumulator=new LongAccumulator((x, y)->x+y,0); public void add_longAccumulater(){ accumulator.accumulate(1); } }
|
运行结果:
—–add_synchronized:4089毫秒 50000000
—–add_atomicInteger:822毫秒 50000000
—–add_atomicLong:831毫秒 50000000
—–add_longAdder:187毫秒 50000000
—–add_longAccumulater:123毫秒 50000000
ABA 问题的解决方案
- ABA问题解决方案是使用 AtomicStampedReference 每修改一次都会有一个版本号
- AtomicStampedReference 用来解决 AtomicInteger 中的 ABA 问题,该 demo 企图将 integer 的值从0一直增长到1000,但当integer 的值增长到128后,将停止增长。出现该现象有两点原因:
- 使用int类型而非 Integer 保存当前值
- Interger对-128~127的缓存[这个范围才有效,不在这个范围comareAndSet会一直返回false
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package tech.chen.juccode.a13;
import java.sql.Time; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicStampedReference;
public class ABAResolveDemo { private static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference(100,1); public static void main(String[] args) { int startStamp = stampedReference.getStamp(); Integer startReference = stampedReference.getReference(); new Thread(()->{ int stamp = stampedReference.getStamp(); Integer reference = stampedReference.getReference(); stampedReference.compareAndSet(reference,101,stamp,stamp+1); },"t1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ int stamp = stampedReference.getStamp(); Integer reference = stampedReference.getReference(); stampedReference.compareAndSet(reference,100,stamp,stamp+1); },"t2").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
boolean b = stampedReference.compareAndSet(startReference, startReference + 1, startStamp, startStamp + 1); System.out.println(b + "\t" + stampedReference.getReference()); } }
|
运行结果:
false 100
特殊的 AtomicMarkableReference,它不是维护一个版本号,而是维护一个boolean类型的标记,标记值有修改