ReentrantLock(重入锁, synchronized的功能扩展)

可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。

构造函数

public ReentrantLock(boolean fair): fair为true时表示创建的锁是公平的(锁先到先得)

重要方法

  • lock(): 获得锁,如果锁已经被占用,则等待。
  • lockInterruptibly(): 获得锁,但优先响应中断。
  • tryLock(): 尝试获得锁,如果成功,则返回true, 失败则返回false。该方法不等待,立即返回。
  • tryLock(Long time, TimeUnit unit): 在给定时间内尝试获得锁。
  • unLock(): 释放锁。

重入锁的实现

  • 原子状态。原子状态使用CAS操作来存储当前锁的状态,判断锁是否已经被别的线程持有了。
  • 等待队列。所有没有请求到锁的线程,会进入等待队列进行等待。待有线程释放锁后,系统就能从等待队列中唤醒一个线程,继续工作。
  • 阻塞原语park()和unpark(),用来挂起和恢复线程。

重入锁的搭档:Condition

基本方法:

  1. await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
  2. boolean await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态—》是否超时,超时异常
  3. awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
  4. awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
  5. awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
  6. signal() :唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
  7. signalAll() :唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* condition使用示例:
* 1、condition的使用必须要配合锁使用,调用方法时必须要获取锁
* 2、condition的创建依赖于Lock lock.newCondition();
*/
public class ConditionUseCase {
/**
* 创建锁
*/
public Lock readLock = new ReentrantLock();
/**
* 创建条件
*/
public Condition condition = readLock.newCondition();

public static void main(String[] args) {
ConditionUseCase useCase = new ConditionUseCase();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
//获取锁进行等待
useCase.conditionWait();
});
executorService.execute(() -> {
//获取锁进行唤起读锁
useCase.conditionSignal();
});
}

/**
* 等待线程
*/
public void conditionWait() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "拿到锁了");
System.out.println(Thread.currentThread().getName() + "等待信号");
condition.await();
System.out.println(Thread.currentThread().getName() + "拿到信号");
} catch (Exception e) {

} finally {
readLock.unlock();
}
}

/**
* 唤起线程
*/
public void conditionSignal() {
readLock.lock();
try {
//睡眠5s 线程1启动
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "拿到锁了");
condition.signal();
System.out.println(Thread.currentThread().getName() + "发出信号");
} catch (Exception e) {

} finally {
//释放锁
readLock.unlock();
}
}

}

执行结果:

1
2
3
4
5
1 pool-1-thread-1拿到锁了
2 pool-1-thread-1等待信号 ---释放锁-线程等待 t1
3 pool-1-thread-2拿到锁了
4 pool-1-thread-2发出信号 --- 唤起线程t2释放锁
5 pool-1-thread-1拿到信号---t1继续执行

如示例所示,
一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

允许多个线程同时访问:信号量(Semaphere)

构造函数

  • public Semaphore(int permits):创建具有给定的许可数和非公平的公平设置的 Semaphore。
    • permits - 初始的可用许可数目。此值可能为负数,在这种情况下,必须在授予任何获取前进行释放。
  • public Semaphore(int permits, boolean fair):创建具有给定的许可数和给定的公平设置的 Semaphore。
    • permits - 初始的可用许可数目。此值可能为负数,在这种情况下,必须在授予任何获取前进行释放。
    • fair - 如果此信号量保证在争用时按先进先出的顺序授予许可,则为 true;否则为 false。

基本方法

  • void acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
  • void release():释放一个许可,将其返回给信号量。
  • int availablePermits():返回此信号量中当前可用的许可数。
  • boolean hasQueuedThreads():查询是否有线程正在等待获取。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3);
for(int i=0;i<10;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
sp.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"进入,当前已有" + (3-sp.availablePermits()) + "个并发");
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"即将离开");
sp.release();
//下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
System.out.println("线程" + Thread.currentThread().getName() +
"已离开,当前已有" + (3-sp.availablePermits()) + "个并发");
}
};
service.execute(runnable);
}
}
}

执行结果:

1
2
3
4
5
6
7
8
9
10
线程pool-1-thread-1进入,当前已有3个并发
线程pool-1-thread-3进入,当前已有3个并发
线程pool-1-thread-2进入,当前已有3个并发
线程pool-1-thread-1即将离开
线程pool-1-thread-1已离开,当前已有2个并发
线程pool-1-thread-4进入,当前已有3个并发
线程pool-1-thread-3即将离开
线程pool-1-thread-3已离开,当前已有2个并发
线程pool-1-thread-5进入,当前已有3个并发
...(之后的输出省略)

ReadWriteLock读写锁

接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}

实现类ReetrantReadWriteLock

支持的功能

  1. 支持公平和非公平的获取锁的方式;
  2. 支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
  3. 还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;
  4. 读取锁和写入锁都支持锁获取期间的中断;
  5. Condition支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class CachedData {
Object data;
volatile boolean cacheValid; //缓存是否有效
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock(); //获取读锁
//如果缓存无效,更新cache;否则直接使用data
if (!cacheValid) {
// Must release read lock before acquiring write lock
//获取写锁前须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
// Recheck state because another thread might have acquired
// write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
//锁降级,在释放写锁前获取读锁
rwl.readLock().lock();
rwl.writeLock().unlock(); // Unlock write, still hold read
}

use(data);
rwl.readLock().unlock(); //释放读锁
}
}

倒计数器:CountDownLatch

使用场景

想要做当前事情之前,必须具备前期的若干事件都准备完毕,当前事件才能进行。用程序来说,就是:当前线程处于阻塞状态,必须等到其它的若干线程都运行完毕之后,当前线程才被唤醒得以执行。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class PrepareEvent implements Runnable
{
private CountDownLatch c ;
public PrepareEvent(CountDownLatch c){
this.c = c ;
}

public void run(){
try{
long time = (long)(Math.random() * 5000); //模拟前期准备事件耗时
Thread.sleep(time);
String tName = Thread.currentThread().getName();
System.out.println(tName+"' working time is: "+time);
}catch(InterruptedException e){}

c.countDown();
}
}

public class CountDownLatchTest
{
public static void main(String[] args)
{
int count = 5 ;//计数5
CountDownLatch c = new CountDownLatch(count);

Thread[] ts = new Thread[count];

System.out.println("CountDownLatch init, now count is: "+c.getCount()+"\n\r");
System.out.println("because Of await(), main blocking...\n\r");

for(int i=count-1; i>=0; i--){
ts[i] = new Thread(new PrepareEvent(c),"T"+i);
ts[i].start();
}

try{
c.await();//这里阻塞,直至计数降为零
}catch(InterruptedException e){}

System.out.println(" \n\rafter await(), main continue...\n\r");
System.out.println("CountDownLatch used, now count is: "+c.getCount()+"\n\r");

}
}

循环栅栏:CyclicBarrier

概述:

CyclicBarrier和CountDownLatch很相似,都可以使线程先等待然后再执行。不过CountDownLatch是使一批线程等待另一批线程执行完后再执行;而CyclicBarrier只是使等待的线程达到一定数目后再让它们继续执行

故而CyclicBarrier内部也有一个计数器,计数器的初始值在创建对象时通过构造参数指定,如下所示:

1
2
3
public CyclicBarrier(int parties) {
this(parties, null);
}

每调用一次await()方法都将使阻塞的线程数+1,只有阻塞的线程数达到设定值时屏障才会打开,允许阻塞的所有线程继续执行。除此之外,CyclicBarrier还有几点需要注意的地方:

  • CyclicBarrier的计数器可以重置而CountDownLatch不行,这意味着CyclicBarrier实例可以被重复使用而CountDownLatch只能被使用一次。而这也是循环屏障循环二字的语义所在。

  • CyclicBarrier允许用户自定义barrierAction操作,这是个可选操作,可以在创建CyclicBarrier对象时指定

1
2
3
4
5
6
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties &lt;= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

一旦用户在创建CyclicBarrier对象时设置了barrierAction参数,则在阻塞线程数达到设定值屏障打开前,会调用barrierAction的run()方法完成用户自定义的操作。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Demo1 {
    public static CyclicBarrier cyclicBarrier = new CyclicBarrier(10);

    public static class T extends Thread {
        int sleep;

        public T(String name, int sleep) {
            super(name);
            this.sleep = sleep;
        }

        @Override
        public void run() {
            try {
                //模拟休眠
                TimeUnit.SECONDS.sleep(sleep);
                long starTime = System.currentTimeMillis();
                //调用await()的时候,当前线程将会被阻塞,需要等待其他员工都到达await了才能继续
                cyclicBarrier.await();
                long endTime = System.currentTimeMillis();
                System.out.println(this.getName() + ",sleep:" + this.sleep + " 等待了" + (endTime - starTime) + "(ms),开始吃饭了!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 1; i &lt;= 10; i++) {
            new T("员工" + i, i).start();
        }
    }
}

输出:

1
2
3
4
5
6
7
8
9
10
员工1,sleep:1 等待了9000(ms),开始吃饭了!
员工9,sleep:9 等待了1000(ms),开始吃饭了!
员工8,sleep:8 等待了2001(ms),开始吃饭了!
员工7,sleep:7 等待了3001(ms),开始吃饭了!
员工6,sleep:6 等待了4001(ms),开始吃饭了!
员工4,sleep:4 等待了6000(ms),开始吃饭了!
员工5,sleep:5 等待了5000(ms),开始吃饭了!
员工10,sleep:10 等待了0(ms),开始吃饭了!
员工2,sleep:2 等待了7999(ms),开始吃饭了!
员工3,sleep:3 等待了7000(ms),开始吃饭了!

代码中模拟了10个员工上桌吃饭的场景,等待所有员工都到齐了才能开发,可以看到第10个员工最慢,前面的都在等待第10个员工,员工1等待了9秒,上面代码中调用cyclicBarrier.await();会让当前线程等待。当10个员工都调用了cyclicBarrier.await();之后,所有处于等待中的员工都会被唤醒,然后继续运行。

线程阻塞工具类:LockSupport

基本方法

基于Unsafe类中的park和unpark方法

1
2
3
4
5
6
7
8
9
// park()方法,调用native方法阻塞当前线程
// unpark()方法,唤醒处于阻塞状态的线程Thrread
public static void park() {
UNSAFE.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ThreadParkTest {
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.setName("mt");
mt.start();
try {
Thread.currentThread().sleep(10);
mt.park();
Thread.currentThread().sleep(10000);
mt.unPark();
Thread.currentThread().sleep(10000);
mt.park();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class MyThread extends Thread {

private boolean isPark = false;
public void run() {
System.out.println(" Enter Thread running.....");
while (true) {
if (isPark) {
System.out.println(Thread.currentThread().getName()+"Thread is Park.....");
LockSupport.park();
}
//do something
System.out.println(Thread.currentThread().getName()+">> is running");
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void park() {
isPark = true;
}
public void unPark() {
isPark = false;
LockSupport.unpark(this);
System.out.println("Thread is unpark.....");
}
}
}

执行结果:

1
2
3
4
5
6
7
8
9
10
Enter Thread running.....
mt>> is running
...(省略部分)
mt>> is running
mtThread is Park.....
Thread is unpark.....
mt>> is running
...(省略部分)
mt>> is running
mtThread is Park

Guava和Ratelimiter限流

常见的限流算法有:

  • 桶漏算法:漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

    img

  • 令牌桶算法:以规定的速率往令牌桶中存入Token,用户请求必须获取到令牌中的Token才可以处理请求,如果没有从令牌桶中获取到令牌则丢失该请求或让该请求等待。桶的容量是有限的,比如,当令牌没有被消耗掉时,只能累积有限单位时间内的令牌数量。

    img

两者的区别:

漏桶算法与令牌桶算法在表面看起来类似,很容易将两者混淆。但事实上,这两者具有截然不同的特性,且为不同的目的而使用。漏桶算法与令牌桶算法的区别在于:漏桶算法能够强行限制数据的传输速率。令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。

需要说明的是:在某些情况下,漏桶算法不能够有效地使用网络资源。因为漏桶的漏出速率是固定的,所以即使网络中没有发生拥塞,漏桶算法也不能使某一个单独的数据流达到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率。而令牌桶算法则能够满足这些具有突发特性的流量。通常,漏桶算法与令牌桶算法结合起来为网络流量提供更高效的控制

基于谷歌RateLimiter实现限流

Google的Guava工具包中就提供了一个限流工具类——RateLimiter,本文也是通过使用该工具类来实现限流功能。RateLimiter是基于“令牌通算法”来实现限流的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>


// 表示每秒往令牌桶中存入1个token, 具体根据服务器配置设置
private RateLimiter rateLimiter = RateLimiter.create(1);

// 每秒中限制1个请求 0:表示等待超时时间,设置0表示不等待,直接拒绝请求
boolean tryAcquire = rateLimiter.tryAcquire(0, TimeUnit.SECONDS);
if (!tryAcquire) { // 如果tryAcquire:false表示没有获取到token
return "现在抢购的人数过多,请稍等一下下哦!";