您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
经典多线程设计模式(重要)
 
作者:wive
   次浏览      
 2019-10-9
 
编辑推荐:
本文主要介绍了多线程设计的future模式,Master-Woker模式等,希望对您能有所帮助。
本文来自于csdn,由火龙果软件刘琛编辑推荐

一、Future模式

思想:

当service(Main方法模拟)请求一个数据的时候,可以先给他返回一个包装类(空壳,代理对象,未来data,FutureData)

然后开一个线程去异步加载真实数据,这样当service收到FutrueData,就可以做其他业务逻辑,

当要用的时候,再从FutureData中的方法去加载真实数据。(类似ajax的思想)

启动程序:模拟一个请求

/**
* 模拟一个service请求
*/
public class Main {
public static void main(String[] args) throws InterruptedException {

/**
* 未来模式的执行类,提供处理请求的方法handleRequest
*/
FutureExcutor fc = new FutureExcutor();

/**
* 1、Data是一个接口,提供实现类FuctureData和RealData
* 2、未来模式执行类,先给我们返回一个FuctureData,然后开了一个线程去加载真实数据了
*
*/
Data data = fc.handleRequest("请求参数");
/**
* 收到FutureData后,可以去处理其他业务逻辑
*/
System.out.println("做其他的事情...");

/**
* 1、处理完其他,要调用未来数据FutureData的方法加载真实数据了
* 1.1、这个getResultData肯定的是阻塞的,因为不确定真实数据是否加载 成功,只有当setResultData调用了,才能把阻塞打开
* 所以这两个方法getResultData、setResultData可以用阻塞队列来实现SynchronousQueue
* 1.2、这里采用的是wait,notify,synchronized来实现的
*/
String result = data.getResultData();
System.out.println(result);

}
}

-

/**
* Future模式执行器
*/
public class FutureExcutor {
public Data handleRequest(final String queryStr){
//1 我想要一个代理对象(Data接口的实现类) 先返回给发送请求的客户端, 告诉他请求已经接收到,可以做其他的事情
final FutureData futureData = new FutureData();
//2 启动一个新的线程,去加载真实的数据,传递给这个代理对象
new Thread(new Runnable() {
@Override
public void run() {
//3 这个新的线程可以去慢慢的加载真实对象, 然后传递给代理对象
RealData realData = new RealData(queryStr);
futureData.setRealData(realData);
}
}).start();

return futureData;
}
}

-

/**
* 数据接口
*/
public interface Data {

/**
* 提供结果的接口
* @return
*/
String getResultData();

}

-

/**
* future数据实现Data
*/
public class FutureData implements Data{

private RealData realData ;

private boolean isReady = false;

public synchronized void setRealData(RealData realData) {
//如果已经装载完毕了,就直接返回
if(isReady){
return;
}
//如果没装载,进行装载真实对象
this.realData = realData;
isReady = true;
//进行通知
notify();
}

@Override
public synchronized String getResultData() {
//如果没装载好 程序就一直处于阻塞状态
while(!isReady){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//装载好直接获取数据即可
return this.realData.getResultData();
}
}

-

/**
* 真实数据
*/
public class RealData implements Data{

private String result ;

public RealData (String queryStr){
try {
System.out.println("根据" + queryStr + "进行查询,这是一个很耗时的操作");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
result = "20000";
}

@Override
public String getResultData() {
return result;
}
}

JDK已经提供了基于Fucture模式的类(重要):

FutureTask(类似上面的FutureData),Callable(类似RealData的封装,真实加载数据要实现Callable接口)

Fucure(接收执行状态)

想到的future模式应用场景:在service中要执行,比较耗时的sql或者存储过程,可以先开一个线程池单独执行,

可以让他们实现Callable接口,然后继续执行service方法,要用的时候再从FutureTask中获取

用户登录注册-->把记录用户登录明细表的操作,推送等操作(如果不用返回结果可以直接丢到线程池,不需要future模式)

统计功能-->从多个表查询,然后汇总的功能

网络数据获取 -->要调用远程接口得到数据的功能

案例(重要):

public class UseFuture implements Callable<String>{
private String para;

public UseFuture(String para){
this.para = para;
}

/**
* 这里是真实的业务逻辑,其执行可能很慢
*/
@Override
public String call() throws Exception {
//模拟执行耗时
Thread.sleep(5000);
String result = this.para + "真实数据加载";
return result;
}

//主控制函数
public static void main(String[] args) throws Exception {
String queryStr = "query";
//构造FutureTask,并且传入需要真正进行业务逻辑处理的类, 该类 一定是实现了Callable接口的类
FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));

//创建一个固定线程的线程池且线程数为1,
ExecutorService executor = Executors.newFixedThreadPool(2);

//这里提交任务future,则开启线程执行RealData的call()方法执行
//submit和execute的区别: 第一点是submit可以传入 实现Callable接口 的实例对象, 第二点是submit方法有返回值
Future<?> f1 = executor.submit(future); //单独启动一个线程去执行的

while(true){
if(f1.get()==null){
System.out.println("callable已经执行完了");
break;
}
}

try {
//这里可以做额外的数据操作,也就是主程序执行其他业务逻辑
System.out.println("处理实际的业务逻辑...");
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
//调用获取数据方法,如果Callable()方法没有执行完成, 则依然会 进行等待
System.out.println("future模式获得的数据:" + future.get());

executor.shutdown();
}

}

二、Master-Woker模式

思想:

Master-Worker模式是常用的并行计算模式。

它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。

Master负责接收和分配任务,Worker负责处理子任务。

当各个Worker子进程处理完成后,会将结果返回给Master,由Master做归纳和总结。

其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。

public class Main {
public static void main(String[] args) {
//线程池的个数按照机器性能来定,可以用:
int workerCount = Runtime.getRuntime().
availableProcessors();
//System.out.println(Runtime.getRuntime().
availableProcessors());
/**
* 创建Master,并指定创建N个worker
*/
Master master = new Master(workerCount);

/**
* 添加100个任务
*/
Random r = new Random();
for(int i = 1; i <= 100; i++){
Task t = new Task(i, r.nextInt(1000));
//添加到任务队列,非阻塞的,可以采用高性能的
ConcurrentLinkedQueue 来存放
master.submit(t);
}
/**
* 执行所有的worker
*/
master.execute();
long start = System.currentTimeMillis();

while(true){
/**
* 如果所有的woker都停止了,那么结束
*/
if(master.isComplete()){
long end = System.currentTimeMillis() - start;
int priceResult = master.getResult();
System.out.println("最终结果:" + priceResult + ",
执行时间:" + end);
break;
}
}

}
}

Master:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {

//1 有一个盛放任务的容器
private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();

//2 需要有一个盛放worker的集合,可以用线程池Excutor
private HashMap<String, Thread> workers = new HashMap <String, Thread>();

//3 需要有一个盛放每一个worker执行任务 的结果集合
private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

//4 构造方法,创建master和指定个数的worker
public Master(int workerCount){
Worker worker = new Worker();
worker.setWorkQueue(this.workQueue);
worker.setResultMap(this.resultMap);

for(int i = 0; i < workerCount; i ++){
this.workers.put(Integer.toString(i), new Thread (worker));
}

}

//5 需要一个提交任务的方法
public void submit(Task task){
this.workQueue.add(task);
}

//6 需要有一个执行的方法,启动所有的worker方法去执行任务
public void execute(){
for(Map.Entry<String, Thread> me : workers.entrySet()){
me.getValue().start();
}
}

//7 判断是否运行结束的方法
public boolean isComplete() {
for(Map.Entry<String, Thread> me : workers.entrySet()){
if(me.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}

//8 计算结果方法
public int getResult() {
int priceResult = 0;
for(Map.Entry<String, Object> me : resultMap.entrySet()){
priceResult += (Integer)me.getValue();
}
return priceResult;
}
}

Woker:

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Worker implements Runnable {

private ConcurrentLinkedQueue<Task> work Queue;
private ConcurrentHashMap<String, Object> resultMap;

public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
this.workQueue = workQueue;
}

public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
}

@Override
public void run() {
while(true){
//取任务
Task input = this.workQueue.poll();
if(input == null) break;
//处理任务,可把这个方法变成抽象方法,具体的业务,
继承Worker 来实现
Object output = handle(input);
//把处理结果放到Master的结果处理集里面
this.resultMap.put(Integer.toString(input.getId()), output);
}
}

private Object handle(Task input) {
Object output = null;
try {
//处理任务的耗时。。 比如说进行操作数据库。。。
Thread.sleep(500);
output = input.getPrice();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
}
}

-

public class Task {
private int id;
private int price ;

public Task(int id, int price) {
super();
this.id = id;
this.price = price;
}

public Task(){}

//set ,get
}

三、生产者-消费者

生产者和消费者也是一个非常经典的多线程模式,

我们在实际开发中应用非常广泛的思想理念。

在生产-消费模式中:通常由两类线程,即若干个生产者的线程和若干个消费者的线程。

生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务,

在生产者和消费者之间通过共享内存缓存区进行通信。

角色:生产者、消费者、内存缓存区、任务数据(或者叫协议数据)

生产-消费模型中的内存缓存区的主要功能是数据在多线程间共享。

此外,其最重要的是通过该缓存区可以缓解生产者和消费者之间的性能差异。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

public static void main(String[] args) throws Exception {
//内存缓冲区,用于缓存任务
BlockingQueue<Data> queue = new LinkedBlockingQueue <Data>(10);
//生产者
Provider p1 = new Provider(queue);
Provider p2 = new Provider(queue);
Provider p3 = new Provider(queue);
//消费者
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);

//创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,
没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(p1);
cachePool.execute(p2);
cachePool.execute(p3);

cachePool.execute(c1);
cachePool.execute(c2);
cachePool.execute(c3);

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
p1.stop();
p2.stop();
p3.stop();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// cachePool.shutdown();
// cachePool.shutdownNow();
}
}

-

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Provider implements Runnable{
//共享缓存区
private BlockingQueue<Data> queue;
//多线程间是否启动变量,有强制从主内存中刷新的功能。
即时返回线程 的状态
private volatile boolean isRunning = true;
//id生成器
private static AtomicInteger count = new AtomicInteger();
//随机对象
private static Random r = new Random();

public Provider(BlockingQueue<Data> queue){
this.queue = queue;
}

@Override
public void run() {
while(isRunning){
try {
//随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)
Thread.sleep(r.nextInt(1000));
//获取的数据进行累计...
int id = count.incrementAndGet();
//比如通过一个getData方法获取了
Data data = new Data(Integer.toString(id), "数据" + id);
System.out.println("当前线程:" + Thread.current Thread(). getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
System.out.println("提交缓冲区数据失败....");
//do something... 比如重新提交
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public void stop(){
this.isRunning = false;
}

}

-

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{

private BlockingQueue<Data> queue;

public Consumer(BlockingQueue<Data> queue){
this.queue = queue;
}

//随机对象
private static Random r = new Random();

@Override
public void run() {
while(true){
try {
//获取数据
Data data = this.queue.take();
//进行数据处理。休眠0 - 1000毫秒模拟耗时
Thread.sleep(r.nextInt(1000));
System.out.println("当前消费线程:" + Thread. currentThread(). getName() + ", 消费成功,消费数据为id: " + data.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

-

public final class Data {

private String id;
private String name;

public Data(String id, String name){
this.id = id;
this.name = name;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

@Override
public String toString(){
return "{id: " + id + ", name: " + name + "}";
}

}
 
   
次浏览       
相关文章

Java微服务新生代之Nacos
深入理解Java中的容器
Java容器详解
Java代码质量检查工具及使用案例
相关文档

Java性能优化
Spring框架
SSM框架简单简绍
从零开始学java编程经典
相关课程

高性能Java编程与系统性能优化
JavaEE架构、 设计模式及性能调优
Java编程基础到应用开发
JAVA虚拟机原理剖析