单例模式

定义: 一个类在系统中只产生一个实例

优点: 对于频繁使用的对象,可以省略new的时间,对于重量级对象来说是一比客观的系统性能提升 内存使用频率低,减少GC次数,缩短GC停顿时间

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//饿汉式 
//优点:这种写法比较简单,就是在类装载的时候就完成实例化。避免了线程同步问题。
//缺点:在类加载的时候就完成实例化,没有达到Lazy Loading的效果。如果从始至终从未使用过这个实例,则会造成内存的浪费
public class Singleton {
public static int STATUS=1;
private Singleton(){
System.out.println("Singleton is create");
}
private static Singleton instance = new Singleton();
public static Singleton getInstance() {
return instance;
}
}
// 懒汉式
// 优点:起到了Lazy Loading的效果
// 缺点:效率太低了,每个线程在想获得类的实例时候,执行getInstance()方法都要进行同步。而其实这个方法只执行一次实例化代码就够了,后面的想获得该类实例,直接return就行了。方法进行同步效率太低要改进。
public class LazySingleton {
private LazySingleton() {
System.out.println("LazySingleton is create");
}
private static LazySingleton instance = null;
public static synchronized LazySingleton getInstance() {
if (instance == null)
instance = new LazySingleton();
return instance;
}
}
// 静态内部类式,集合上述两种优势
// 静态内部类方式在Singleton类被装载时并不会立即实例化,而是在需要实例化时,调用getInstance方法,才会装载SingletonInstance类,而类的静态属性只会在第一次加载类的时候初始化,从而完成Singleton的实例化。
public class StaticSingleton {
public static int STATUS;
private StaticSingleton(){
System.out.println("StaticSingleton is create");
}
private static class SingletonHolder {
private static StaticSingleton instance = new StaticSingleton();
}
public static StaticSingleton getInstance() {
return SingletonHolder.instance;
}
}
// 注意:另外还有一种双重检查模式来创建单例,这种模式丑陋且复杂,甚至在低版本中不能保证正确性,不推荐使用

不变模式

定义: 一个对象一旦被创建,内部状态永远不发生改变,故永远为线程安全的
使用场景: 对象创建后,内部状态和数据不再发生变化,对象需要被共享,被多线程频繁访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final class PCData { //父类不变,子类也必须不变,但无法保证这一点,故使用final
private final int intData; //仅被赋值一次
public PCData(int d){
intData=d;
}
public PCData(String d){
intData=Integer.valueOf(d);
}
public int getData(){
return intData;
}
@Override
public String toString(){
return "data:"+intData;
}
}

注意“:String 类也是不变模式,保证了在多线程下的性能

生产者消费者模式

定义: 生产者负责提交用户请求,消费者负责具体处理生产者提交的任务.生产者和消费者之间通过共享内存缓冲区进行通信.
特点: 对生产者线程和消费者线程进行解耦,优化系统整体结构,环节性能瓶颈对系统性能的影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
//共享数据模型
public final class PCData { // 任务相关的数据
private final int intData;
public PCData(int d){
intData=d;
}
public PCData(String d){
intData=Integer.valueOf(d);
}
public int getData(){
return intData;
}
@Override
public String toString(){
return "data:"+intData;
}
}
//生产者
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue; // 内存缓冲区
private static AtomicInteger count = new AtomicInteger(); // 总数,原子操作
private static final int SLEEPTIME = 1000;
public Producer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
public void run() {
PCData data = null;
Random r = new Random();
System.out.println("start producer id="+Thread.currentThread().getId());
try {
while (isRunning) {
Thread.sleep(r.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet()); // 构造任务数据
System.out.println(data+" is put into queue");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) { // 提交数据到缓冲区中
System.err.println("failed to put data:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop() {
isRunning = false;
}
}
//消费者
public class Consumer implements Runnable {
private BlockingQueue<PCData> queue; // 缓冲区
private static final int SLEEPTIME = 1000;
public Consumer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
public void run() {
System.out.println("start Consumer id=" + Thread.currentThread().getId());
Random r = new Random(); // 随机等待时间
try {
while(true){
PCData data = queue.take(); // 提取任务
if (null != data) {
int re = data.getData() * data.getData(); // 计算平方
System.out.println(MessageFormat.format("{0}*{1}={2}",
data.getData(), data.getData(), re));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
//main方法
public class PCMain {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
Producer producer1 = new Producer(queue); // 建立生产者
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer1 = new Consumer(queue); // 建立消费者
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool(); // 建立线程池
service.execute(producer1); // 运行生产者
service.execute(producer2);
service.execute(producer3);
service.execute(consumer1); // 运行消费者
service.execute(consumer2);
service.execute(consumer3);
Thread.sleep(10 * 1000);
producer1.stop(); // 停止生产者
producer2.stop();
producer3.stop();
Thread.sleep(3000);
service.shutdown();
}
}

生产者消费模式:无锁实现

上述BlockingQueue基于锁和阻塞等待来实现线程同步,在高并发环境下性能有限.
ConcurrentLinkedQueue是一个高性能的队列,基于CAS来实现生产者消费者模式.
CAS编程相对比较困难,我们可以使用Disruptor框架来实现.

无锁缓存框架Disruptor

特点: 环形队列,队列大小需要事先指定,无法动态扩展,数组大小需设置为2的整数次方.内存复用度高,减少GC等消耗

结构: RingBuffer的结构在写入和读取的操作时,均使用CAS进行数据保护.

在这里插入图片描述

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//该示例代码使用的版本是 disruptor-3.3.2
//数据模型
public class PCData {
private long value;
public void set(long value)
{
this.value = value;
}
public long get(){
return value;
}
}
//消费者 需要事先WorkHandler接口
public class Consumer implements WorkHandler<PCData> {
@Override
public void onEvent(PCData event) throws Exception { //框架的回调方法
System.out.println(Thread.currentThread().getId() + ":Event: --"
+ event.get() * event.get() + "--");
}
}
//数据对象工厂类,Disruptor框架初始化时构造所有对象实例
public class PCDataFactory implements EventFactory<PCData> {
public PCData newInstance(){
return new PCData();
}
}
//生产者
public class Producer {
private final RingBuffer<PCData> ringBuffer;
public Producer(RingBuffer<PCData> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void pushData(ByteBuffer bb) { //提取bytebuffer中的数据,装载到环形数组中
long sequence = ringBuffer.next(); // 获取下一个序列号
try {
PCData event = ringBuffer.get(sequence); // 取得环形数组中的PCdata
event.set(bb.getLong(0)); // 设置pcdata
} finally {
ringBuffer.publish(sequence); //必须发布,只有发布后的数据才能被消费者看见
}
}
}
//main函数
public class PCMain {
public static void main(String[] args) throws Exception {
Executor executor = Executors.newCachedThreadPool();
PCDataFactory factory = new PCDataFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024; //缓冲区设置 为2^10
Disruptor<PCData> disruptor = new Disruptor<PCData>(factory, //创建disruptor对象
bufferSize,
executor,
ProducerType.MULTI,
new BlockingWaitStrategy() //消费者监控缓存区的策略
);
// Connect the handler
// disruptor.handleEventsWith(new LongEventHandler());
disruptor.handleEventsWithWorkerPool( //用于消费者,设置4个消费者,并把每个消费者映射到一个线程中
new Consumer(),
new Consumer(),
new Consumer(),
new Consumer());
disruptor.start(); //启动并初始化disruptor系统
RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
Producer producer = new Producer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) { //生产者不断地写入缓冲区
bb.putLong(0, l);
producer.pushData(bb);
Thread.sleep(100);
System.out.println("add data " + l);
}
}
}

Disruptor中消费者监控缓存区的策略

消费者如何监控缓冲区中的信息呢,以下给出了4种策略,这些策略由WaitStrategy接口进行封装

  1. BlcokingWaitStrategy: 默认策略,类似BlockingQueue,利用锁和条件(Condition)进行数据的监控和线程的唤醒, 最节省CPU,但高并发下性能最差
  2. SleepingWaitStrategy: 与上述类似.它会在循环中不断等待,先进行自旋等待,若不成功,则会使用Thread.yield()方法让出cpu,并最终使用LockSupport.parkNanos进行线程休眠.因此这个策略对数据处理可能会产生较高的平均延迟,适合对延时要求不高的场合,且对生产者线程的影响最小.典型场景为异步日志.
  3. YieldingWaitingStrategy: 这个策略用于低延时的场合。消费者线程会不断循环监控缓冲区的变化,在循环内部,它会使用Thread.yield方法让出CPU给别的线程执行时间。如果你需要一个高性能的系统,并且对延时有较为严格的要求,则可以考虑这种策略。使用这种策略时,相当于消费者线程变成了一个内部执行了Thread.yield方法的死循环。因此,你最好有多于消费者线程数量的逻辑CPU数量(这里的逻辑CPU指的是“双核四线程”中的那个四线程,否则,整个应用程序恐怕都会受到影响).
  4. BusySpinWaitStrategy : 这个是最疯狂的等待策略了。它就是一个死循环!消费者线程会尽最大努力疯狂监控缓冲区的变化。因此,它会吃掉所有的CPU资源。只有对延迟非常苛刻的场合可以考虑使用它(或者说,你的系统真的非常繁忙)。因为使用它等于开启了一个死循环监控,所以你的物理CPU数量必须要大于消费者的线程数。注意,我这里说的是物理CPU,如果你在一个物理核上使用超线程技术模拟两个逻辑核,另外一个逻辑核显然会受到这种超密集计算的影响而不能正常工作。

Disruptor中cpu cache的优化: 解决伪共享问题

什么是伪共享问题呢?我们知道,为了提高CPU的速度,CPU有一个高速缓存Cache。在高速缓存中,读写数据的最小单位为缓存行(Cache Line),它是从主存(Memory)复制到缓存(Cache)的最小单位,一般为32字节到128字节。

当两个变量存放在一个缓存行时,在多线程访问中,可能会影响彼此的性能。在图5.4中,假设变量X和Y在同一个缓存行,运行在CPU1上的线程更新了变量X,那么CPU2上的缓存行就会失效,同一行的变量Y即使没有修改也会变成无效,导致Cache无法命中。接着,如果在CPU2上的线程更新了变量Y,则导致CPU1上的缓存行失效(此时,同一行的变量X变得无法访问)。这种情况反复发生,无疑是一个潜在的性能杀手。如果CPU经常不能命中缓存,那么系统的吞吐量就会急剧下降。

为了避免这种情况发生,一种可行的做法就是在变量X的前后空间都先占据一定的位置(把它叫作padding,用来填充用的)。这样,当内存被读入缓存时,这个缓存行中,只有变量X一个变量实际是有效的,因此就不会发生多个线程同时修改缓存行中不同变量而导致变量全体失效的情况,如图5.5所示。(即x占一个缓存行,Y占一个缓存行)。

img

img

Future模式

概述

Future模式是多线程开发中非常常见的一种设计模式,它的核心思想是异步调用。当我们需要调用一个函数方法时,如果这个函数执行得很慢,那么我们就要进行等待。但有时候,我们可能并不急着要结果。因此,我们可以让被调者立即返回,让它在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获得需要的数据。

对于Future模式来说,虽然它无法立即给出你需要的数据,但是它会返回一个契约给你,将来你可以凭借这个契约去重新获取你需要的信息。

Future的简单实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
//核心接口Data: 客户端希望获得的数据.
//RealData : 真实数据,最终希望获得的数据
//FutureData : 提取RealData的凭证,可立刻返回
public interface Data {
public String getResult();
}

public class FutureData implements Data {
protected RealData realdata = null;
protected boolean isReady = false;
public synchronized void setRealData(RealData realdata) {
if (isReady) {
return;
}
this.realdata = realdata;
isReady = true;
notifyAll(); //等realdata注入完后,通知getresult()方法
}
public synchronized String getResult() { //等待realdata构造完成
while (!isReady) {
try {
wait(); //当调用result时,为准备好数据时阻塞住线程
} catch (InterruptedException e) {
}
}
return realdata.result;
}
}

public class RealData implements Data {
protected final String result;
public RealData(String para) {
//RealData的构造可能很慢,需要用户等待很久
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append(para);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
result = sb.toString();
}
public String getResult() {
return result;
}
}

public class Client {
public Data request(final String queryStr) {
final FutureData future = new FutureData();
// RealData的构建很慢
new Thread() {
public void run() {
RealData realdata = new RealData(queryStr);
future.setRealData(realdata);
}
}.start();
return future;
}
}

public class Main {
public static void main(String[] args) {
Client client = new Client();
Data data = client.request("a");
System.out.println("请求完毕");
try {
//这里可以用一个sleep代替了对其它业务逻辑的处理
Thread.sleep(2000);
} catch (InterruptedException e) {
}
//使用真实的数据
System.out.println("数据 = " + data.getResult());
}
}

JDK中的Future

1
2
3
4
5
6
7
8
9
10
11
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<?> future = executorService.submit(() -> {
try {
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
});
future.get(); //需要等待任务完成,get()会阻塞住
System.out.println("处理完毕");

Guava对Future的支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class FutrueDemo2 {
public static void main(String[] args) throws InterruptedException {
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<String> task = service.submit(new RealData("x"));
Futures.addCallback(task, new FutureCallback<String>() {
public void onSuccess(String o) {
System.out.println("异步处理成功,result=" + o);
}
public void onFailure(Throwable throwable) { //对异常的处理
System.out.println("异步处理失败,e=" + throwable);
}
}, MoreExecutors.newDirectExecutorService());
System.out.println("main task done.....");
Thread.sleep(3000);
}
}