CAS&Atomic

CAS

概述

CAS的全称为Compare-And-Swap ,它是一条CPU并发原语 ,比较 工作内存值(预期值) 和 主物理内存 的共享值是否相同,相同则执行规定操作,否则继续比较直到主内存和工作内存的值一致为止。这个过程是原子的

CAS并发原语是现在 Java 语言中就是 sun.misc包下的 UnSafe 类中的各个方法。调用 UnSafe 类中的 CAS 方法, JVM 会帮我实现 CAS 汇编指令。这是一种完全依赖于硬件功能 ,通过它实现了原子操作。

就像我们在使用使用乐观锁操作数据库时,也会 通过版本号来保证多线程情况下只有一个线程能执行成功。

操作

atomicInteger.getAndIncrement()

1
2
3
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

调用了 unsafe.getAndAddInt();

1
2
3
4
5
6
7
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* @Date 2022/8/28 15:22
* @Author c-z-k
*/
public class AtomicIntDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger();
int i = atomicInteger.incrementAndGet();
System.out.println("i = " + i);

int andIncrement = atomicInteger.getAndIncrement();
System.out.println("andIncrement = " + andIncrement);
System.out.println("atomicInteger.compareAndSet(2,3) = " + atomicInteger.compareAndSet(2, 3));
System.out.println("atomicInteger.compareAndSet(2,3) = " + atomicInteger.compareAndSet(2, 3));
}
}

CAS 问题

CAS 方式为乐观锁,synchronized 为悲观锁。使用 CAS 解决并发问题通常情况下性能更优。

但使用 CAS 方式也会有几个问题:

ABA问题

因为CAS需要在操作值的时候,检查值有没有发生变化,比如没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时则会发现它的值没有发生变化,但是实际上却变化了。

ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A->B->A就会变成1A->2B->3A。

从Java 1.5开始,JDK的Atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的 compareAndSet 方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

循环时间长开销大

自旋CAS如果长时间不成功,会给 CPU 带来非常大的执行开销。如果 JVM 能支持处理器提供的 pause 指令,那么效率会有一定的提升。

pause 指令有两个作用:

  • 第一,它可以延迟流水线执行命令(de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零;
  • 第二,它可以避免在退出循环的时候因内存顺序冲突(Memory Order Violation)而引起 CPU 流水线被清空(CPU Pipeline Flush),从而提高 CPU 的执行效率。

只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS 就无法保证操作的原子性,这个时候就可以用锁。

还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i = 2,j = a,合并一下ij = 2a,然后用CAS来操作ij。

从Java 1.5开始,JDK提供了 AtomicReference 类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。

Unsafe

UnSafe类中所有的方法都是native修饰的,也就是说UnSafe类中的方法都是直接调用操作底层资源执行响应的任务

但由于Unsafe类使Java语言拥有了类似C语言指针一样操作内存空间的能力,这无疑也增加了程序发生相关指针问题的风险。在程序中过度、不正确使用Unsafe类会使得程序出错的概率变大,使得Java这种安全的语言变得不再“安全”,因此对Unsafe的使用一定要慎重。

这个类尽管里面的方法都是 public 的,但是并没有办法使用它们,JDK API 文档也没有提供任何关于这个类的方法的解释。总而言之,对于 Unsafe 类的使用都是受限制的,只有授信的代码才能获得该类的实例,当然 JDK 库里面的类是可以随意使用的。

1664524643776

Unsafe与CAS

AtomicInteger 中使用 unsafe。

1661675223957

从源码中发现,内部使用自旋的方式进行CAS更新(while循环进行CAS更新,如果更新失败,则循环再次重试)。

又从Unsafe类中发现,原子操作其实只支持下面三个方法。

1
2
3
4
5
6
public final native boolean compareAndSwapObject(Object paramObject1, long paramLong, Object paramObject2, Object paramObject3);

public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);

public final native boolean compareAndSwapLong(Object paramObject, long paramLong1, long paramLong2, long paramLong3);

发现Unsafe只提供了3种CAS方法:compareAndSwapObject、compareAndSwapInt和compareAndSwapLong。都是native方法。

原子类

基本类型

常用API

方法 解释
public final int get() 获取当前的值
public final int getAndSet(int newValue) 获取到当前的值,并设置新的值
public final int getAndIncrement() 获取当前的值,并自增
public final int getAndDecrement() 获取到当前的值,并自减
public final int getAndAdd(int delta) 获取到当前的值,并加上预期的值
public final int incrementAndGet( ) 返回的是加1后的值
boolean compareAndSet(int expect,int update) 如果输入的数值等于预期值,返回true

AtomicInteger 解决 i++

CountDownLatch 如何在程序中使用

在没使用 CountDownLatch 时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package tech.chen.juccode.a14;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @Date 2022/8/28 17:04
* @Author c-z-k
*/
public class CountDownDemo {
private static AtomicInteger atomicInteger = new AtomicInteger(0);
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
new Thread(()->{
for (int j = 0; j < 100; j++) {
atomicInteger.getAndIncrement();
}
}).start();
}
System.out.println("atomicInteger.get() = " + atomicInteger.get());
}
}

结果:

atomicInteger.get() = 4900

发现使用 AtomicInteger 居然没有得到理想值 5000,是 AtomicInteger 的问题吗?显然不可能,实际原因是,累加还没结束,主线程就打印出了结果,不信你可以等两秒。但是,实际操作我们不可能设置一个固定的睡眠时间。CountDownLatch 出现的意义也在此。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package tech.chen.juccode.a14;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @Date 2022/8/28 17:04
* @Author c-z-k
*/
public class CountDownDemo {
private static AtomicInteger atomicInteger = new AtomicInteger(0);
private static CountDownLatch countDownLatch = new CountDownLatch(50);
public static void main(String[] args) {
for (int i = 0; i <50; i++) {
new Thread(()->{
for (int j = 0; j < 100; j++) {
atomicInteger.getAndIncrement();
}
countDownLatch.countDown();
}).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("atomicInteger.get() = " + atomicInteger.get());
}
}

atomicInteger.get() = 5000

AtomicBoolean中断标识

AtomicBoolean可以作为中断标识停止线程的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//线程中断机制的实现方法
public class AtomicBooleanDemo {
public static void main(String[] args) {
AtomicBoolean atomicBoolean=new AtomicBoolean(false);

new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t"+"coming.....");
while(!atomicBoolean.get()){
System.out.println("==========");
}
System.out.println(Thread.currentThread().getName()+"\t"+"over.....");
},"A").start();

new Thread(()->{
atomicBoolean.set(true);
},"B").start();
}
}

Atomic Long

AtomicLong 的底层是 CAS+自旋锁 的思想,适用于低并发的全局计算,高并发后性能急剧下降,原因如下:N个线程CAS操作修改线程的值,每次只有一个成功过,其他N-1失败,失败的不停的自旋直到成功,这样大量失败自旋的情况,一下子 cpu 就打高了(AtomicLong的自旋会成为瓶颈)

在高并发的情况下,我们使用LoadAdder

数组类型

AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

代码演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package tech.chen.juccode.a14;

import java.util.concurrent.atomic.AtomicIntegerArray;

/**
* @Date 2022/8/28 18:30
* @Author c-z-k
*/
public class AtomicArray {
public static void main(String[] args) {
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
for (int i = 0; i < atomicIntegerArray.length(); i++) {
int i1 = atomicIntegerArray.get(i);
System.out.println("i1 = " + i1);
}

int i = atomicIntegerArray.incrementAndGet(0);
System.out.println("i = " + i);
int andAdd = atomicIntegerArray.addAndGet(1, 4);
System.out.println("andAdd = " + andAdd);
}
}

引用类型

AtomicReference、AtomicStampedReference、AtomicMarkableReference

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package tech.chen.juccode.a14;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* @Date 2022/8/28 18:39
* @Author c-z-k
*/
public class AtomicReferenceForLock {

private static AtomicReference<Thread> atomicReference = new AtomicReference<>();

private static void lock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t" + "coming ");
while (!atomicReference.compareAndSet(null, thread)) {}
}

private static void unlock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t" + "take over ");
atomicReference.compareAndSet(thread,null);
}

public static void main(String[] args) {
new Thread(()->{
lock();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
unlock();
},"t1").start();
new Thread(()->{
lock();
unlock();
},"t2").start();
}
}

AtomicStampedReference 这个带有戳记的 引用原子类 在上一篇已经讲过,这里就不赘述了

AtomicMarkableReference

  1. 原子更新带有标志位的引用类型对象
  2. 解决是否修改(它的定义就是将状态戳简化位 true|false ) ,类似一次性筷子
  3. 状态戳 (true/false) 原子引用
  4. 不建议用它解决 ABA 问题
  5. Markable – true、false 是否修改过,跟 AtomicStampedReference 的区别就是 它只负责监督一次改变。

对象属性修改类型

目的 : 以一种线程安全的方式操作非线程安全对象内的某些字段 。判断是否可以不要锁定整个对象,减少锁定的范围,只关注长期、敏感性变化的某一个字段,而不是整个对象,达到精确加锁+节约内存的目的

条件:

  • 更新的对象属性必须使用 public volatile 修饰符
  • 因为对象的属性修改类型原子类都是抽象类 ,所以每次使用都必须使用静态方法 newUpdater( ) 创建一个更新器,并且需要设置想要更新的类和属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package tech.chen.juccode.a14;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* @Date 2022/8/29 8:22
* @Author c-z-k
*/
public class AtomicFieldUpdaterDemo {
public static void main(String[] args) {
User user = new User();
AtomicIntegerFieldUpdater<User> atomicIntegerFieldUpdaterAge =
AtomicIntegerFieldUpdater.newUpdater(User.class,"age");
AtomicReferenceFieldUpdater atomicReferenceFieldUpdaterName =
AtomicReferenceFieldUpdater.newUpdater(User.class,String.class,"name");
int i = atomicIntegerFieldUpdaterAge.get(user);
System.out.println("i = " + i);//i = 0
Object o = atomicReferenceFieldUpdaterName.get(user);
System.out.println("o = " + o);//o = czk
int newAge = atomicIntegerFieldUpdaterAge.accumulateAndGet(user, 18, (x, y) -> x + y);
System.out.println("newAge = " + newAge);//newAge = 18
boolean czk = atomicReferenceFieldUpdaterName.compareAndSet(user, "czk", "0205");
System.out.println("czk = " + czk);//czk = true
}
}
class User{
public volatile int age = 0;
public volatile String name = "czk";
}

运行结果

i = 0
o = czk
newAge = 18

原子操作增强类

DoubleAccumulator 、DoubleAdder 、LongAccumulator 、LongAdder

常用API

方法 解释
void add(long x) 将当前的value加1
void increment( ) 将当前的value加1
void decrement( ) 将当前value减1
long sum( ) 返回当前的值,特别注意,在没有并发更新value的情况下sum会返回一个精确值,在存在并发的情况下,sum不保证返回精确值
long longvale 等价于long sum( )。将value重置为0,可用于替换重新new一个LongAdder,但此方法只可以在没有并发更新的情况下使用
long sumThenReset() 获取当前value,并将value重置为0
  1. LongAdder只能用来计算加法、减法,且从零开始计算
  2. LongAccumulator提供了自定义的函数操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package tech.chen.juccode.a14;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;

/**
* @Date 2022/8/29 8:53
* @Author c-z-k
*/
public class LongAdderCalcDemo {
public static final int SIZE_THREAD=50;
public static final int _1w=10000;

public static void main(String[] args) {
synchronizedTest();
atomicIntegerTest();
atomicLongTest();
longAdderTest();
longAccumulatorTest();
}

private static void longAdderTest() {
// 50个线程和每个线程点在100w次
CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD);
ClickNumber clickNumber=new ClickNumber();
long startTime = System.currentTimeMillis();
for (int i = 1 ; i <=SIZE_THREAD ; i++) {
new Thread(()->{
try{
for (int j = 1; j <=100*_1w; j++) {
//clickNumber.add_synchronized();
clickNumber.add_longAdder();
}
}finally {
countDownLatch.countDown();
}
},String.valueOf(i)).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.print("-----add_longAdder:"+(endTime-startTime)+"毫秒"+"\t");
System.out.println(clickNumber.adder.longValue());
}
private static void synchronizedTest() {
// 50个线程和每个线程点在100w次
CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD);
ClickNumber clickNumber=new ClickNumber();
long startTime = System.currentTimeMillis();
for (int i = 1 ; i <=SIZE_THREAD ; i++) {
new Thread(()->{
try{
for (int j = 1; j <=100*_1w; j++) {
//clickNumber.add_synchronized();
clickNumber.add_synchronized();
}
}finally {
countDownLatch.countDown();
}
},String.valueOf(i)).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.print("-----add_synchronized:"+(endTime-startTime)+"毫秒"+"\t");
System.out.println(clickNumber.number);
}
private static void atomicIntegerTest() {
// 50个线程和每个线程点在100w次
CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD);
ClickNumber clickNumber=new ClickNumber();
long startTime = System.currentTimeMillis();
for (int i = 1 ; i <=SIZE_THREAD ; i++) {
new Thread(()->{
try{
for (int j = 1; j <=100*_1w; j++) {
//clickNumber.add_atomicInteger();
clickNumber.add_atomicInteger();
}
}finally {
countDownLatch.countDown();
}
},String.valueOf(i)).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.print("-----add_atomicInteger:"+(endTime-startTime)+"毫秒"+"\t");
System.out.println(clickNumber.atomicInteger.get());
}
private static void atomicLongTest() {
// 50个线程和每个线程点在100w次
CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD);
ClickNumber clickNumber=new ClickNumber();
long startTime = System.currentTimeMillis();
for (int i = 1 ; i <=SIZE_THREAD ; i++) {
new Thread(()->{
try{
for (int j = 1; j <=100*_1w; j++) {
//clickNumber.add_atomicInteger();
clickNumber.add_atomicLong();
}
}finally {
countDownLatch.countDown();
}
},String.valueOf(i)).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.print("-----add_atomicLong:"+(endTime-startTime)+"毫秒"+"\t");
System.out.println(clickNumber.atomicLong.get());
}
private static void longAccumulatorTest() {
// 50个线程和每个线程点在100w次
CountDownLatch countDownLatch=new CountDownLatch(SIZE_THREAD);
ClickNumber clickNumber=new ClickNumber();
long startTime = System.currentTimeMillis();
for (int i = 1 ; i <=SIZE_THREAD ; i++) {
new Thread(()->{
try{
for (int j = 1; j <=100*_1w; j++) {
//clickNumber.add_longAccumulater();
clickNumber.add_longAccumulater();
}
}finally {
countDownLatch.countDown();
}
},String.valueOf(i)).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.print("-----add_longAccumulater:"+(endTime-startTime)+"毫秒"+"\t");
System.out.println(clickNumber.accumulator.get());
}
}
class ClickNumber{
int number=0;

//(1). 使用synchronized实现number++
public synchronized void add_synchronized(){
number++;
}
//(2). 使用AtomicInteger
AtomicInteger atomicInteger=new AtomicInteger();
public void add_atomicInteger(){
atomicInteger.incrementAndGet();
}
//(3). 使用AtomicLong
AtomicLong atomicLong=new AtomicLong();
public void add_atomicLong(){
atomicLong.incrementAndGet();
}
//(4). 使用LongAdder
LongAdder adder=new LongAdder();
public void add_longAdder(){
adder.increment();
}
//(5). 使用LongAccumulater
LongAccumulator accumulator=new LongAccumulator((x, y)->x+y,0);
public void add_longAccumulater(){
accumulator.accumulate(1);
}
}

运行结果:

—–add_synchronized:4089毫秒 50000000
—–add_atomicInteger:822毫秒 50000000
—–add_atomicLong:831毫秒 50000000
—–add_longAdder:187毫秒 50000000
—–add_longAccumulater:123毫秒 50000000

ABA 问题的解决方案

  1. ABA问题解决方案是使用 AtomicStampedReference 每修改一次都会有一个版本号
  2. AtomicStampedReference 用来解决 AtomicInteger 中的 ABA 问题,该 demo 企图将 integer 的值从0一直增长到1000,但当integer 的值增长到128后,将停止增长。出现该现象有两点原因:
    1. 使用int类型而非 Integer 保存当前值
    2. Interger对-128~127的缓存[这个范围才有效,不在这个范围comareAndSet会一直返回false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package tech.chen.juccode.a13;

import java.sql.Time;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicStampedReference;

/**
* @Date 2022/8/28 16:33
* @Author c-z-k
*/
public class ABAResolveDemo {
private static AtomicStampedReference<Integer> stampedReference = new AtomicStampedReference(100,1);
public static void main(String[] args) {
int startStamp = stampedReference.getStamp();
Integer startReference = stampedReference.getReference();
//模拟主线程运行到一半被别的线程插入操作 1
new Thread(()->{
int stamp = stampedReference.getStamp();
Integer reference = stampedReference.getReference();
stampedReference.compareAndSet(reference,101,stamp,stamp+1);
},"t1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//模拟主线程运行到一半被别的线程插入操作 2
new Thread(()->{
int stamp = stampedReference.getStamp();
Integer reference = stampedReference.getReference();
stampedReference.compareAndSet(reference,100,stamp,stamp+1);
},"t2").start();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}

boolean b = stampedReference.compareAndSet(startReference, startReference + 1, startStamp, startStamp + 1);
System.out.println(b + "\t" + stampedReference.getReference());
}
}

运行结果:

false 100

特殊的 AtomicMarkableReference,它不是维护一个版本号,而是维护一个boolean类型的标记,标记值有修改