读写锁的改进:stampedlock

概述

StampedLock是Java8引入的一种新的所机制,简单的理解,可以认为它是读写锁的一个改进版本,读写锁虽然分离了读和写的功能,使得读与读之间可以完全并发,但是读和写之间依然是冲突的,读锁会完全阻塞写锁,它使用的依然是悲观的锁策略.如果有大量的读线程,他也有可能引起写线程的饥饿。

而StampedLock则提供了一种乐观的读策略,这种乐观策略的锁非常类似于无锁的操作,使得乐观锁完全不会阻塞写线程

StampedLock的使用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Point {
private double x, y;//内部定义表示坐标点
private final StampedLock s1 = new StampedLock();//定义了StampedLock锁,

void move(double deltaX, double deltaY) {
long stamp = s1.writeLock();//这里的含义和distanceFormOrigin方法中 s1.readLock()是类似的
try {
x += deltaX;
y += deltaY;
} finally {
s1.unlockWrite(stamp);//退出临界区,释放写锁
}
}

double distanceFormOrigin() {//只读方法
long stamp = s1.tryOptimisticRead(); //试图尝试一次乐观读 返回一个类似于时间戳的邮戳整数stamp 这个stamp就可以作为这一个所获取的凭证
double currentX = x, currentY = y;//读取x和y的值,这时候我们并不确定x和y是否是一致的
if (!s1.validate(stamp)) {//判断这个stamp是否在读过程发生期间被修改过,如果stamp没有被修改过,则认为这次读取时有效的,因此就可以直接return了,反之,如果stamp是不可用的,则意味着在读取的过程中,可能被其他线程改写了数据,因此,有可能出现脏读,如果如果出现这种情况,我们可以像CAS操作那样在一个死循环中一直使用乐观锁,知道成功为止
stamp = s1.readLock();//也可以升级锁的级别,这里我们升级乐观锁的级别,将乐观锁变为悲观锁, 如果当前对象正在被修改,则读锁的申请可能导致线程挂起.
try {
currentX = x;
currentY = y;
} finally {
s1.unlockRead(stamp);//退出临界区,释放读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}

StampedLock的小陷阱

StampedLock内部实现时,使用类似于CAS操作的死循环反复尝试的策略。在它挂起线程时,使用的是Unsafe.park()函数,而park()函数在遇到线程中断时,会直接返回(不同于Thread.sleep(),它不会抛出异常)。而在StampedLock的死循环逻辑中,没有处理有关中断的逻辑。因此,这就会导致阻塞在park()上的线程被中断后,会再次进入循环。而当退出条件得不到满足时,就会发生疯狂占用CPU的情况。下面演示了这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class StampedLockCUPDemo {
static Thread[] holdCpuThreads = new Thread[3];
static final StampedLock lock = new StampedLock();
public static void main(String[] args) throws InterruptedException {
new Thread() {
public void run(){
long readLong = lock.writeLock();
LockSupport.parkNanos(6100000000L);
lock.unlockWrite(readLong);
}
}.start();
Thread.sleep(100);
for( int i = 0; i < 3; ++i) {
holdCpuThreads [i] = new Thread(new HoldCPUReadThread());
holdCpuThreads [i].start();
}
Thread.sleep(10000);
for(int i=0; i<3; i++) {
holdCpuThreads [i].interrupt();
}
}
private static class HoldCPUReadThread implements Runnable {
public void run() {
long lockr = lock.readLock();
System.out.println(Thread.currentThread().getName() + " get read lock");
lock.unlockRead(lockr);
}
}
}

在上述代码中,首先开启线程占用写锁(第7行),为了演示效果,这里使用写线程不释放锁而一直等待。

接着,开启3个读线程,让它们请求读锁。此时,由于写锁的存在,所有读线程都会被最终挂起。读线程因为park()的操作进入了等待状态,这种情况是正常的。

而在10秒钟以后(代码在17行执行了10秒等待),系统中断了这3个读线程,之后,就会发现,CPU占用率极有可能会飙升。这是因为中断导致park()函数返回,使线程再次进入运行状态。

此时,这个线程的状态是RUNNABLE,这是我们不愿意看到的,它会一直存在并耗尽CPU资源,直到自己抢占到了锁。

【重要】所以使用StampedLock一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁readLockInterruptibly()和写锁writeLockInterruptibly()

有关StampedLock的实现思想

StampedLock的内部实现是基于CLH锁的,CLH锁是一种自旋锁,它保证没有饥饿的发生,并且可以保证FIFO(先进先出)的服务顺序.

CLH锁的基本思想如下:锁维护一个等待线程队列,所有申请锁,但是没有成功的线程都记录在这个队列中,每一个节点代表一个线程,保存一个标记位(locked).用与判断当前线程是否已经释放锁;locked=true 没有获取到锁,false 已经成功释放了锁

当一个线程视图获得锁时,取得等待队列的尾部节点作为其前序节点.并使用类似如下代码判断前序节点是否已经成功释放锁:

1
2
3
while (pred.locked) {

}

只要前序节点(pred)没有释放锁,则表示当前线程还不能继续执行,因此会自旋等待,

反之,如果前序线程已经释放锁,则当前线程可以继续执行.

释放锁时,也遵循这个逻辑,线程会将自身节点的locked位置标记位false,那么后续等待的线程就能继续执行了

原子类的增强

无锁的原子类操作使用系统的CAS指令,有着远远超越锁的性能。在Java 8中引入了LongAddr类,这个类也在java.util.concurrent.atomic包下,因此,它也是使用了CAS指令。

更快的原子类:LongAddr

数据分离(减小锁粒度)

AtomicInteger的基本实现机制,它们都是在一个死循环内,不断尝试修改目标值,知道修改成功。如果竞争不激烈,那么修改成功的概率就很高,否则,修改失败的概率就很高。在大量修改失败时,这些原子操作就会进行多次循环尝试,因此性能会受到影响。

当竞争激烈的时候,为了进一步提高系统的性能,一种基本方案就是可以使用热点分离,将竞争的数据进行分解,基于这个思路,可以想到一种对传统AtomicInteger等原子类的改进方法。虽然在CAS操作中没有锁,但是像减小锁粒度这种分离热点的思想依然可以使用。一种可行的方案就是仿造ConcurrentHashMap,将热点数据分离。比如,可以将AtomicInteger的内部核心数据value分离成一个数组,每个线程访问时,通过哈希等算法映射到其中一个数字进行计算,而最终的计算结果,则为这个数组的求和累加。热点value被分离成多个单元cell,每个cell独自维护内部的值,当前对象的实际值由所有的cell累计合成,这样,热点就进行了有效的分离,提高了并行度。LongAddr正是使用了这种思想。

在实际的操作中,LongAddr并不会一开始就动用数组进行处理,而是将所有数据都先记录在一个称为base的变量中。如果在多线程条件下,大家修改base都没有冲突,那么也没有必要扩展为cell数组。但是,一旦base修改发生冲突,就会初始化cell数组,使用新的策略。如果使用cell数组更新后,发现在某一个cell上的更新依然发生冲突,那么系统就会尝试创建新的cell,或者将cell的数量加倍,以减少冲突的可能。

下面简单分析一下increment()方法(该方法会将LongAddr自增1)的内部实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void increment() {
add(1L);
}
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
//如果cell表为null,会尝试将x累加到base上。
if ((as = cells) != null || !casBase(b = base, b + x)) {
/*
* 如果cell表不为null或者尝试将x累加到base上失败,执行以下操作。
* 如果cell表不为null且通过当前线程的probe值定位到的cell表中的Cell不为null。
* 那么尝试累加x到对应的Cell上。
*/
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//或者cell表为null,或者定位到的cell为null,或者尝试失败,都会调用下面的Striped64中定义的longAccumulate方法。
longAccumulate(x, null, uncontended);
}
}

它的核心是addd()方法。最开始cells为null,因此数据会向base增加。但是如果对base的操作冲突,则会设置冲突标记uncontended 为true。接着,如果判断cells数组不可用,或者当前线程对应的cell为null,则直接进入longAccumulate()方法。否则会尝试使用CAS方法更新对应的cell数据,如果成功,则退出,失败则进入longAccumulate()方法。

由于longAccumulate()方法比较复杂,这里不再展开讨论,其大致内容是,根据需要创建新的cell或者对cell数组进行扩容,以减少冲突。

避免缓存伪共享

LongAddr的另外一个优化手段是避免了伪共存。LongAddr中并不是直接使用padding这种看起来比较碍眼的做法,而是引入了一种新的注释“@sun.misc.Contended“。

对于LongAddr中的每一个Cell,它的定义如下:

1
2
3
4
5
6
7
8
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value=x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
}

可以看到,在上述代码第1行申明了Cell类为sun.misc.Contended。这将会使得Java虚拟机自动为Cell解决伪共享问题。

当然,在我们自己的代码中也可以使用sun.misc.Contended来解决伪共享问题,但是需要额外使用虚拟机参数-XX:-RestrictContended,否则,这个注释将被忽略。

LongAddr的功能增强版: LongAccumulator

LongAccumulator是LongAddr的亲兄弟,它们有公共的父类Striped64。因此,LongAccumulator内部的优化方式和LongAddr是一样的。它们都将一个long型整数进行分割,存储在不同的变量中,以防止多线程竞争。两者的主要逻辑类似,但是LongAccumulator是LongAddr的功能扩展,对于LongAddr来说,它只是每次对给定的整数执行一次加法,而LongAccumulator则可以实现任意函数惭怍。

可以使用下面的构造函数创建一个LongAccumulator实例:

public LongAccumulator(LongBinaryOperator accumulatorFunction, long identify)

第一个参数accumulatorFunction就是需要执行的二元函数(接收两个long形参数并返回long),第2个参数是初始值。

下面这个例子展示了LongAccurator的使用,它将通过多线程访问若干个整数,并返回遇到的最大的那个数字。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws Exception {
LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MIN_VALUE);
Thread[] ts = new Thread[1000];

for(int i=0; i<1000; i++) {
ts[i] = new Thread(()->{
Random random = new Random();
long value = random.nextLong();
accumulator.accumulate(value);
});
ts[i].start();
}
for(int i=0; i<1000; i++) {
ts[1000].join();
}
System.out.println(accumulator .longValue);
}

上述代码中,构造了LongAccumulator实例。因为要过滤最大值,因此传入Long::max函数句柄。当有数据通过accumulate()方法传入LongAccumulator后,LongAccumulator会通过Long::max识别最大值并且保存在内部(很可能是cell数组,也可能是base)。通过longValue()函数对所有的cell进行Long::max操作,得到最大值。

ConcurrentHashMap的增强

批量处理数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
	/*
* java8为并发哈希映射提供了批量操作数据操作,即使在其他线程同时操作映射时也可以安全的执行。
* 批量数据操作会遍历映射并对匹配的元素进行操作。在批量操作过程中,不需要冻结映射的一个快照。除非你恰好知道在这段时间
* 内映射没有被修改,否则你应该将结果看作是映射状态的一个近似值。批量操作有三类
* a,search会对每个键或值应用一个函数,直到函数返回一个null的结果。然后search会终止并返回该函数的结果。
* b,reduce会通过提供的积累函数,将所有的键或指结合起来。
* c,forEach会对所有键或值对应一个函数。
* 使用这几种操作时,需要指定一个并行阈值,如果映射包含的元素数目超过了这个阈值,批量操作以并行的方式执行。如果希望批量操作数 据在一个线程执行,
* 请使用Long.MAX_VALUE作为阈值。如果希望批量操作尽可能使用更多的线程,则应该使用1作为阈值。
*/


/*
* search
* 比如希望找到第一个出现超过1000次的的单词,我们需要搜索键和值。 返回结果为第一个匹配的元素,或者没有找到任何元素则返回null
*/
ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();
for (int i = 0; i < 1000; i++) {
map.merge("a", 2L, Long::sum);//2000
map.merge("b", 1L, Long::sum);//1000
}
String rs = map.search(1, (k, v) -> v > 1000 ? k : null);
System.out.println(rs);
/*
* foreach
* foreach方法有两种,第一种只是对每个映射数据项简单的用一个消费者函数
* 第二种是额外接受一个转换器函数,首先会应用此转换器函数,然后再将结果传递给消费者函数、
* 转换器函数可以被用作一个过滤器。当转换器函数返回null时,值会被自动跳过
*/
map.forEach(1, (k, v) -> System.out.println(k + "->" + v));
map.forEach(1, (k, v) -> k + "->" + v, System.out::println);
map.forEach(1, (k, v) -> v > 1000 ? k + "->" + v : null, System.out::println);
/*
* reduce
* reduce操作将其输入与一个累加函数结合起来。例如下面是计算所有值的总和
*/
Long sum = map.reduceValues(1, Long::sum);
/*
* 同forEach一样,你也可以提供一个转换器函数。比如计算长度最长的键
*/
Integer maxLength = map.reduceKeys(1,
String::length, //转换器
Integer::max); //累加器
System.out.println(maxLength);
/*
* 转换器函数可以作为一个过滤器,通过返回null来排除不想要的输入。
* tips:如果映射是空的,或者所有的数据项都被过滤掉了,则reduce操作会返回null。如果只有一个元素,
* 那么会返回它转换后的值,并且不会应用累加函数。
*/
Long count = map.reduceValues(1,
v->v>1000?1L:null, //转换器
Long::sum); //累加器
System.out.println(count);
/*
* 对于int,long,double类型的输出,reduce操作提供了专门的方法,分别以ToInt,ToLong和ToDouble结尾。
* 你需要将输入转换为原始类型值,并指定一个默认值和累加器函数。当映射为空时返回默认值。
* tips:这几个专门的方法与用于对象的方法行为不同,它们只会考虑一个元素。与返回转换后的元素不同
* ,他们会对默认值进行累加,因此。默认值必须是累加器的中立元素。
*/
long sum2 = map.reduceValuesToLong(1,
Long::longValue, //转换为原始类型
0, //空映射的默认值
Long::sum);//原始类型累加器
}

computeIfabsent

可以将原始代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
Client esClient = map.get("key");
if (esClient == null) {
//防止启动时高并发导致创建多个client,但会极大地降低方法的性能
synchronized (ClientUtil.class) {
esClient = map.get("key");
if (esClient == null) {
//初始化操作
esClient = new Client();
map.put("key", esClient);
}
}
}

使用computeIfAbsent 代替:

1
2
3
4
5
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.computeIfAbsent("key",()->{
//初始化操作
return new Client();
});

其他新方法

  1. mappingCount() 方法

    返回Map中的条目综述。有别于size() 方法, 该方法返回的是long型数据。硬刺, 当元素总数超过整数最大值时,应该使用这个方法。同时,该方法并不返回精确值,如果在执行该方法时, 同时存在并发的插入或者删除操作,则结果是不准确的。

  2. newKeySet() 方法

    在JDK中, Set的实现依附于Map,实际上,Set时Map的一种特殊情况。如果需要一个线程安全的搞笑并发HashSet,那么基于ConcurrentHashMap的实现是最好的选择。该方法是一个静态工厂方法,返回一个线程安全的Set。