CAS&Atomic
CAS
概述
CAS的全称为Compare-And-Swap ,它是一条CPU并发原语
,比较 工作内存值(预期值) 和 主物理内存 的共享值是否相同,相同则执行规定操作,否则继续比较直到主内存和工作内存的值一致为止。这个过程是原子的
CAS并发原语是现在 Java 语言中就是 sun.misc包下的 UnSafe 类中的各个方法。调用 UnSafe 类中的 CAS 方法, JVM 会帮我实现 CAS 汇编指令。这是一种完全依赖于硬件功能
,通过它实现了原子操作。
就像我们在使用使用乐观锁操作数据库时,也会 通过版本号来保证多线程情况下只有一个线程能执行成功。
操作
atomicInteger.getAndIncrement()
1 2 3
| public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
|
调用了 unsafe.getAndAddInt();
1 2 3 4 5 6 7
| public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
public class AtomicIntDemo { public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(); int i = atomicInteger.incrementAndGet(); System.out.println("i = " + i);
int andIncrement = atomicInteger.getAndIncrement(); System.out.println("andIncrement = " + andIncrement); System.out.println("atomicInteger.compareAndSet(2,3) = " + atomicInteger.compareAndSet(2, 3)); System.out.println("atomicInteger.compareAndSet(2,3) = " + atomicInteger.compareAndSet(2, 3)); } }
|
CAS 问题
CAS 方式为乐观锁,synchronized 为悲观锁。使用 CAS 解决并发问题通常情况下性能更优。
但使用 CAS 方式也会有几个问题:
ABA问题
因为CAS需要在操作值的时候,检查值有没有发生变化,比如没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时则会发现它的值没有发生变化,但是实际上却变化了。
ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A->B->A就会变成1A->2B->3A。
从Java 1.5开始,JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的 compareAndSet 方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
循环时间长开销大
自旋CAS如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的 pause 指令,那么效率会有一定的提升。
pause 指令有两个作用:
- 第一,它可以延迟流水线执行命令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零;
- 第二,它可以避免在退出循环的时候因内存顺序冲突(Memory Order Violation)而引起 CPU 流水线被清空(CPU Pipeline Flush),从而提高 CPU 的执行效率。
只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS 就无法保证操作的原子性,这个时候就可以用锁。
还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i = 2,j = a,合并一下ij = 2a,然后用CAS来操作ij。
从Java 1.5开始,JDK提供了 AtomicReference 类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。
Unsafe
UnSafe类中所有的方法都是native修饰的,也就是说UnSafe类中的方法都是直接调用操作底层资源执行响应的任务
但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。
这个类尽管里面的方法都是 public 的,但是并没有办法使用它们,JDK API 文档也没有提供任何关于这个类的方法的解释。总而言之,对于 Unsafe 类的使用都是受限制的,只有授信的代码才能获得该类的实例,当然 JDK 库里面的类是可以随意使用的。
Unsafe与CAS
AtomicInteger 中使用 unsafe。
从源码中发现,内部使用自旋的方式进行CAS更新(while循环进行CAS更新,如果更新失败,则循环再次重试)。
又从Unsafe类中发现,原子操作其实只支持下面三个方法。
1 2 3 4 5 6
| public final native boolean compareAndSwapObject(Object paramObject1, long paramLong, Object paramObject2, Object paramObject3);
public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);
public final native boolean compareAndSwapLong(Object paramObject, long paramLong1, long paramLong2, long paramLong3);
|
发现Unsafe只提供了3种CAS方法:compareAndSwapObject、compareAndSwapInt和compareAndSwapLong。都是native方法。
原子类
基本类型
常用API
方法 |
解释 |
public final int get() |
获取当前的值 |
public final int getAndSet(int newValue) |
获取到当前的值,并设置新的值 |
public final int getAndIncrement() |
获取当前的值,并自增 |
public final int getAndDecrement() |
获取到当前的值,并自减 |
public final int getAndAdd(int delta) |
获取到当前的值,并加上预期的值 |
public final int incrementAndGet( ) |
返回的是加1后的值 |
boolean compareAndSet(int expect,int update) |
如果输入的数值等于预期值,返回true |
AtomicInteger 解决 i++
CountDownLatch 如何在程序中使用
在没使用 CountDownLatch 时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package tech.chen.juccode.a14;
import java.util.concurrent.atomic.AtomicInteger;
public class CountDownDemo { private static AtomicInteger atomicInteger = new AtomicInteger(0); public static void main(String[] args) { for (int i = 0; i < 50; i++) { new Thread(()->{ for (int j = 0; j < 100; j++) { atomicInteger.getAndIncrement(); } }).start(); } System.out.println("atomicInteger.get() = " + atomicInteger.get()); } }
|
结果:
atomicInteger.get() = 4900
发现使用 AtomicInteger 居然没有得到理想值 5000,是 AtomicInteger 的问题吗?显然不可能,实际原因是,累加还没结束,主线程就打印出了结果,不信你可以等两秒。但是,实际操作我们不可能设置一个固定的睡眠时间。CountDownLatch 出现的意义也在此。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package tech.chen.juccode.a14;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger;
public class CountDownDemo { private static AtomicInteger atomicInteger = new AtomicInteger(0); private static CountDownLatch countDownLatch = new CountDownLatch(50); public static void main(String[] args) { for (int i = 0; i <50; i++) { new Thread(()->{ for (int j = 0; j < 100; j++) { atomicInteger.getAndIncrement(); } countDownLatch.countDown(); }).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("atomicInteger.get() = " + atomicInteger.get()); } }
|
atomicInteger.get() = 5000
AtomicBoolean中断标识
AtomicBoolean可以作为中断标识停止线程的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class AtomicBooleanDemo { public static void main(String[] args) { AtomicBoolean atomicBoolean=new AtomicBoolean(false);
new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t"+"coming....."); while(!atomicBoolean.get()){ System.out.println("=========="); } System.out.println(Thread.currentThread().getName()+"\t"+"over....."); },"A").start();
new Thread(()->{ atomicBoolean.set(true); },"B").start(); } }
|
Atomic Long
AtomicLong 的底层是 CAS+自旋锁 的思想,适用于低并发的全局计算,高并发后性能急剧下降,原因如下:N个线程CAS操作修改线程的值,每次只有一个成功过,其他N-1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子 cpu 就打高了(AtomicLong的自旋会成为瓶颈)
在高并发的情况下,我们使用LoadAdder
数组类型
AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package tech.chen.juccode.a14;
import java.util.concurrent.atomic.AtomicIntegerArray;
public class AtomicArray { public static void main(String[] args) { AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5); for (int i = 0; i < atomicIntegerArray.length(); i++) { int i1 = atomicIntegerArray.get(i); System.out.println("i1 = " + i1); }
int i = atomicIntegerArray.incrementAndGet(0); System.out.println("i = " + i); int andAdd = atomicIntegerArray.addAndGet(1, 4); System.out.println("andAdd = " + andAdd); } }
|
引用类型
AtomicReference、AtomicStampedReference、AtomicMarkableReference
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package tech.chen.juccode.a14; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceForLock {
private static AtomicReference<Thread> atomicReference = new AtomicReference<>();
private static void lock(){ Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + "\t" + "coming "); while (!atomicReference.compareAndSet(null, thread)) {} }
private static void unlock(){ Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + "\t" + "take over "); atomicReference.compareAndSet(thread,null); }
public static void main(String[] args) { new Thread(()->{ lock(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } unlock(); },"t1").start(); new Thread(()->{ lock(); unlock(); },"t2").start(); } }
|
AtomicStampedReference 这个带有戳记的 引用原子类 在上一篇已经讲过,这里就不赘述了
AtomicMarkableReference
- 原子更新带有标志位的引用类型对象
- 解决是否修改(它的定义就是将状态戳简化位 true|false ) ,类似一次性筷子
- 状态戳 (true/false) 原子引用
- 不建议用它解决 ABA 问题
- Markable – true、false 是否修改过,跟 AtomicStampedReference 的区别就是 它只负责监督一次改变。
对象属性修改类型
目的 : 以一种线程安全的方式操作非线程安全对象内的某些字段 。判断是否可以不要锁定整个对象,减少锁定的范围,只关注长期、敏感性变化的某一个字段,而不是整个对象,达到精确加锁+节约内存的目的
条件:
- 更新的对象属性必须使用 public volatile 修饰符
- 因为对象的属性修改类型原子类都是抽象类 ,所以每次使用都必须使用静态方法 newUpdater( ) 创建一个更新器,并且需要设置想要更新的类和属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package tech.chen.juccode.a14;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class AtomicFieldUpdaterDemo { public static void main(String[] args) { User user = new User(); AtomicIntegerFieldUpdater<User> atomicIntegerFieldUpdaterAge = AtomicIntegerFieldUpdater.newUpdater(User.class,"age"); AtomicReferenceFieldUpdater atomicReferenceFieldUpdaterName = AtomicReferenceFieldUpdater.newUpdater(User.class,String.class,"name"); int i = atomicIntegerFieldUpdaterAge.get(user); System.out.println("i = " + i); Object o = atomicReferenceFieldUpdaterName.get(user); System.out.println("o = " + o); int newAge = atomicIntegerFieldUpdaterAge.accumulateAndGet(user, 18, (x, y) -> x + y); System.out.println("newAge = " + newAge); boolean czk = atomicReferenceFieldUpdaterName.compareAndSet(user, "czk", "0205"); System.out.println("czk = " + czk); } } class User{ public volatile int age = 0; public volatile String name = "czk"; }
|
运行结果
i = 0
o = czk
newAge = 18
原子操作增强类
DoubleAccumulator 、DoubleAdder 、LongAccumulator 、LongAdder
常用API
方法 |
解释 |
void add(long x) |
将当前的value加1 |
void increment( ) |
将当前的value加1 |
void decrement( ) |
将当前value减1 |
long sum( ) |
返回当前的值,特别注意,在没有并发更新value的情况下sum会返回一个精确值,在存在并发的情况下,sum不保证返回精确值 |
long longvale |
等价于long sum( )。将value重置为0,可用于替换重新new一个LongAdder,但此方法只可以在没有并发更新的情况下使用 |
long sumThenReset() |
获取当前value,并将value重置为0 |
- LongAdder只能用来计算加法、减法,且从零开始计算
- LongAccumulator提供了自定义的函数操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
| package tech.chen.juccode.a14;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder;
public class LongAdderCalcDemo { public static final int SIZE_THREAD=50; public static final int _1w=10000;
public static void main(String[] args) { synchronizedTest(); atomicIntegerTest(); atomicLongTest(); longAdderTest(); longAccumulatorTest(); }
private static void longAdderTest() { CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD); ClickNumber clickNumber=new ClickNumber(); long startTime = System.currentTimeMillis(); for (int i = 1 ; i <=SIZE_THREAD ; i++) { new Thread(()->{ try{ for (int j = 1; j <=100*_1w; j++) { clickNumber.add_longAdder(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.print("-----add_longAdder:"+(endTime-startTime)+"毫秒"+"\t"); System.out.println(clickNumber.adder.longValue()); } private static void synchronizedTest() { CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD); ClickNumber clickNumber=new ClickNumber(); long startTime = System.currentTimeMillis(); for (int i = 1 ; i <=SIZE_THREAD ; i++) { new Thread(()->{ try{ for (int j = 1; j <=100*_1w; j++) { clickNumber.add_synchronized(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.print("-----add_synchronized:"+(endTime-startTime)+"毫秒"+"\t"); System.out.println(clickNumber.number); } private static void atomicIntegerTest() { CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD); ClickNumber clickNumber=new ClickNumber(); long startTime = System.currentTimeMillis(); for (int i = 1 ; i <=SIZE_THREAD ; i++) { new Thread(()->{ try{ for (int j = 1; j <=100*_1w; j++) { clickNumber.add_atomicInteger(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.print("-----add_atomicInteger:"+(endTime-startTime)+"毫秒"+"\t"); System.out.println(clickNumber.atomicInteger.get()); } private static void atomicLongTest() { CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD); ClickNumber clickNumber=new ClickNumber(); long startTime = System.currentTimeMillis(); for (int i = 1 ; i <=SIZE_THREAD ; i++) { new Thread(()->{ try{ for (int j = 1; j <=100*_1w; j++) { clickNumber.add_atomicLong(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.print("-----add_atomicLong:"+(endTime-startTime)+"毫秒"+"\t"); System.out.println(clickNumber.atomicLong.get()); } private static void longAccumulatorTest() { CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD); ClickNumber clickNumber=new ClickNumber(); long startTime = System.currentTimeMillis(); for (int i = 1 ; i <=SIZE_THREAD ; i++) { new Thread(()->{ try{ for (int j = 1; j <=100*_1w; j++) { clickNumber.add_longAccumulater(); } }finally { countDownLatch.countDown(); } },String.valueOf(i)).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.print("-----add_longAccumulater:"+(endTime-startTime)+"毫秒"+"\t"); System.out.println(clickNumber.accumulator.get()); } } class ClickNumber{ int number=0;
public synchronized void add_synchronized(){ number++; } AtomicInteger atomicInteger=new AtomicInteger(); public void add_atomicInteger(){ atomicInteger.incrementAndGet(); } AtomicLong atomicLong=new AtomicLong(); public void add_atomicLong(){ atomicLong.incrementAndGet(); } LongAdder adder=new LongAdder(); public void add_longAdder(){ adder.increment(); } LongAccumulator accumulator=new LongAccumulator((x, y)->x+y,0); public void add_longAccumulater(){ accumulator.accumulate(1); } }
|
运行结果:
—–add_synchronized:4089毫秒 50000000
—–add_atomicInteger:822毫秒 50000000
—–add_atomicLong:831毫秒 50000000
—–add_longAdder:187毫秒 50000000
—–add_longAccumulater:123毫秒 50000000
ABA 问题的解决方案
- ABA问题解决方案是使用 AtomicStampedReference 每修改一次都会有一个版本号
- AtomicStampedReference 用来解决 AtomicInteger 中的 ABA 问题,该 demo 企图将 integer 的值从0一直增长到1000,但当integer 的值增长到128后,将停止增长。出现该现象有两点原因:
- 使用int类型而非 Integer 保存当前值
- Interger对-128~127的缓存[这个范围才有效,不在这个范围comareAndSet会一直返回false
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package tech.chen.juccode.a13;
import java.sql.Time; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicStampedReference;
public class ABAResolveDemo { private static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference(100,1); public static void main(String[] args) { int startStamp = stampedReference.getStamp(); Integer startReference = stampedReference.getReference(); new Thread(()->{ int stamp = stampedReference.getStamp(); Integer reference = stampedReference.getReference(); stampedReference.compareAndSet(reference,101,stamp,stamp+1); },"t1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()->{ int stamp = stampedReference.getStamp(); Integer reference = stampedReference.getReference(); stampedReference.compareAndSet(reference,100,stamp,stamp+1); },"t2").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
boolean b = stampedReference.compareAndSet(startReference, startReference + 1, startStamp, startStamp + 1); System.out.println(b + "\t" + stampedReference.getReference()); } }
|
运行结果:
false 100
特殊的 AtomicMarkableReference,它不是维护一个版本号,而是维护一个boolean类型的标记,标记值有修改