编辑推荐: |
本文来自csdn,本文主要介绍了Pipeline(流水线)模式的问题解决思路,并通过示例代码介绍Pipeline模式的服务,希望本文对您的学习有所帮助。 |
|
模式名称
Pipeline(流水线)模式
模式解决的问题
有时一些线程的步奏比较冗长,而且由于每个阶段的结果与下阶段的执行有关系,又不能分开。
解决思路
可以将任务的处理分解为若干个处理阶段,上一个阶段任务的结果交给下一个阶段来处理,这样每个线程的处理是并行的,可以充分利用资源提高计算效率。
模式所使用的类:
Pipe对处理阶段的抽象,负责对输入进行处理,并将输出作为下一个阶段的输入:process()用于接收前一个处理阶段的处理结果,作为该处理阶段的输入,init()初始化当前处理阶段对外提供的服务,shutdown()关闭当前处理阶段对外提供的服务,setNextPipe()设置当前处理阶段的下一个处理阶段。
PipeContext对各个处理阶段的计算环境进行抽象,主要用于异常处理:handleError()用于对处理阶段泡醋的异常进行处理。
AbstractPipe类Pipe接口的抽象实现类:process()接收抢一个处理阶段的输入并调用其子类实现的doProcess方法对输入元素进行处理,init()保存对其参数中制定的PipeContext实例的引用,子类可根据需要覆盖该方法以实现其服务的初始化。
Shutdown默认实现什么也不做,子类可根据需要覆盖该方法实现服务停止:setNextPipe()设置当前处理阶段的下一个处理阶段,doProcess()留给子类实现的抽象方法,用于实现其服务的初始化。
WorkerThreadPipeDecprator基于工作线程的Pipe实现类,该Pipe实例会将接收到的输入元素存入队列,由制定个数的工作者线程对队列中输入元素进行处理,该类的自身主要负责工作者线程的生命周期的管理:process()接收前一个处理阶段的输入,并将其存入队列由工作者线程运行时取出进行处理,init()启动工作者线程并调用委托Pipe实例的init方法,shutdown()停止工作者线程并委托Pipe实例的shutdown方法,setNextPipe()调用委托Pipe实例的setNextPipe方法,dispatch()取队列中的输入元素并调用委托Pipe实例的process方法对其进行处理
ThreadPoolPopeDecorator基于线程池的Pipe的实现类:process()接收前一个处理阶段的输入,并向线程池提交一个对该输入进行相应处理的任务,init()调用委托pipe实例的init方法,shutdown()关闭当前Pipe实例对外提供的服务并调用委托Pipe实例的shutdown方法,setNextPipe()调用委托Pipe实例的setNextPipe方法。
AbstractParallelPile类AbstractPipe的子类,支持并行处理的Pipe实现类,该类对其每个输入元素(原始任务)生成相应的一组子任务,并以并行的方式去执行这些子任务,各个子任务的执行结果会被合并为相应原始任务的输出结果:bulidTasks()流给子类实现的抽象方法,用于根据制定的输入构造一组子任务,combineResults()留给子类实现的抽象方法,对各个并行子任务的处理结果进行合并,形成相应输入元素的输出结果。invokeParallel()实现以并行的方式执行一组任务,doProcess()实现该类对其输入的处理逻辑。
ConcreteParallelPipe类由应用定义的AbstractParallelPipe的子类:buildTasks()根据指定的输入构造一组子任务,combineResults()对各个并行子任务的处理结果进行合并,形成相应输入元素的输出结果
Pipeline类对符合Pipe的抽象:addPipe()往该Pipeline实例中添加一个Pipe实例。
SimplePipeline类基于AbstractPipe的Pipeline接口的一个简单实现类:addPipe()往该Pipeline实例中添加一个Pipe实例,addAsWorkerThreadBasedPipe()将制定的Pipe实例用WorkerThreadPipeDecorator实例包装后加入Pipeline实例,addAsThreadPoolBasedPipe()将制定的Pipe实例用ThreadPoolPipeDecorator实例包装后加入Pipeline实例。
Pipeline模式的服务初始化序列图
示例代码
某系统需要一个数据同步的定时任务,该定时任务将数据库中符合制定条件的记录数据以文件的形式FTP传输(同步)到制定的主机上。该定时任务需要满足以下要求:
1.每个数据文件最多只包含N(如10000,具体可配置)条记录;当一个数据文件被写满时,其他代谢记录会被写入新的数据文件。
2.每个数据文件可能需要被传输到多台主机上。
3.本地要保留同步过的数据文件的备份。
因此,该定时任务需要做三件事情,都是比较耗时的操作,而且后面的操作还需要依赖前面操作的结果才能进行,不易拆分,如果只是用多线程,每个线程中仍然是按顺序串行处理也是不合适的,这样的话第二个步奏会出现多线程之间争夺资源导致时间浪费的问题,会更难完成任务,所以需要有Pipeline模式去执行。
数据同步定时任务
public class
DataSynctask implements Runnable{
public void run(){
ResultSet rs = null;
SimplePipeline<RecordSaveTask,String>
pipeline = buildPipeline();
pipeline.init(pipeline.newDefaultPipeContext());
Connection dbConn = null;
try{
dbConn = getConnection();
rs = qryRecords(dbConn);
processRecords(rs.pipeline);
}catch(Exception e){
e.printStackTrace();
}finally{
if(null != dbConn){
try{
dbConn.close();
}catch (SQLException e){
;
}
}
pipeline.shutdown(360,TimeUnit.SECONDS);
}
private ResultSet qryRecords(Connection dbConn) throws Exception{
dbConn.setReadOnly(true);
PreparedStatement ps = dbConn
.prepareStatement("select id,productId,packageId,msisdn,operationTime,
operationTyoe," +
"effectiveDate,dueDate from subscriptions
order by operationTime",
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ
_ONLY);
ResultSet rs = ps.executeQuery();
return rs;
}
private static Connection getConnection() throws
Exception{
Connection dbConn = null;
Class.forName("org.hsqldb.jdbc.JDBCDriver");
dbConn = DriverManager.getConnectio ("jdbc:hsqldb:hsql://192.168.1.105:9001","SA","");
return dbConn;
}
private static Record makeRecordFrom(ResultSet
rs)
throws SQLException{
Record record = new Record();
record.setId(rs.getInt("id"));
record.setProductId(rs.getString("productId"));
record.setPackageId(rs.getString("msisdn"));
record.setOperationTime(rs.getTimestamp ("operationTime"));
record.setOperationType(rs.getInt("operationType"));
record.setEffectiveDate(rs.getTimestamp ("effectiveDate"));
record.setDueDate(rs.getTimestamp("dueDate"));
}
private static class RecordSaveTask{
public final Record[] records;
public final int targetFileIndex;
public final String recordDay;
public RecordSaveTask(Record[] records,int targetFileIndex){
this.records = records;
this.targetFileIndex = targetFileIndex;
this.recordDay = null;
}
puclic RecordSaveTask(String recordDay,int
targeFileIndex){
this.records = null;
this.targetFileIndex = targetFileIndex;
this.recordDay = recordDay;
}
} private SimplePipeline<RecordSavetask,String>
buildPipeline(){
/*
* 线程的本质是重复利用一定数量的线程,而不是针对 每个任务
都有一个专门的工作者线程。
* 这里,各个Pipe的初始化完全可以在上游Pipe初始化
完毕后再
初始化其后继Pipe,二不必多
* 个Pipe同时初始化。
* 因此,这个初始化的动作可以由一个线程来处理。
该线程处理
完各个Pipe的初始化后,可以继续
* 处理之后可能产生的任务,如出错处理。
* 所以,上述这些先后产生的任务可以由线程池中的
一个工作者
线程从头到尾执行。
*/
final ExecutorService helperExecutor = Executors.newSingleThreadExecutor();
final SimplePipeline<RecordSaveTask,String>
pipeline = new
SimplePipeline<RecordSaveFile,String>(helperExcecutor);
Pipe<RecordSaveTask,File> stageSaveFile
= new AbstractPipe<RecordSaveTask,File>(){
final RecordWriter recordWriter =RecordWriter.getInstance();
final Record[] records = task.records;
File file;
if(null == records){
file = recordWriter.write(records,task.targerFileIndex);
}else{
try{
file = recordWriter.write(records.task.targetFileIndex);
}catch(IOException e){
throw new PipeException(this,task,"Failed
to save records",e);
}
}
};
/*
* 由于这里的几个Pipe都是处理I/O的,为了避免使用锁
(以减少不必要的上下文切换)但又能
* 保证线程安全,故每个Pipe都采用单线程处理。
* 若各个Pipe要改用线程池来处理,需要注意:1
线程安全2)死锁。
*/
pipeline.addAsWorkerThreadBasedPipe(stageSaveFile,1);
final String[][] ftpServerConfigs = retrieveFTP
ServConf();
final ThreadPoolExecutor ftpExecutorService
= new ThreadPoolExecutor(1
ftpServerConfigs.length,60,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100),new
RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r,ThreadPoolExecutor
executor){
if(!executor.isShutdown()){
try{
executor.getQueue().put(r);
}catch(InterruptedException e){
;
}
}
}
});
Pipe<File,File,File> stageTransferFile
= new AbstractParallelPipe<File,File,
File>((new SynchronousQueue<file>(),ftpExecutor
Service)){
Future[ftpServerConfigs.length];
@Override
public void init(PipeContext piptCtx){
super.init(pipeCtx);
String[] ftpServerConfig;
for(int i = 0; i <ftoServerConfigs.length;
i++){
ftpServerConfig = ftpServerConfigs[i];
ftpClientUtilHolders[i] = FTPClientUtil.newInstance(
ftpServerConfig[0],ftpServerConfig[1],ftpServer
Config[2]);
}
}
@Override
protected List<Callable<File>> buildTasks(final
File file){
List<Callable<File>> tasks = new
LinkedList<Callable<File>>();
for(Fucture<FTPClientUtil> ftpClientUtilHolder:
ftpClientUtilHolders){
tasks.add(new ParallelTask(fipClientUtilHolder,file));
}
return tasks;
}
@Override
protected File combineResults(List<Future<File>>
subTaskResults)
throws Exception{
if(0 == subTaskResults.size()){
return null;
}
File file = null;
file = subTaskResults.get(0).get();
return file;
}
@Override
public void shutdown(long timeout,TimeUnit unit){
super.shutdown(timeout/unit);
ftpExecutorService.shutdown();
try{
ftpExecutorService.awaitTermination(timeout,
unit);
}catch(InterruptedException e){
;
}
for(Future<FTPClientUtil> ftpClientUtilHolder:
ftpClientUtilHolders){
try{
ftpClientUtilHolder.get().disconnect();
}catch(Exception e){
;
}
}
}
class ParallelTask implements Callable<File>{
public final Future<FTPClientUtil>ftpUtilHodler;
public final File file2Transfer;
public ParallelTask(Future<FTPClientUtil>
ftpUtilHodler,
File file2Transfer){
this.ftpUtilHodler = ftpUtilHodler;
this.file2Transfer = file2Transfer;
}
@Override
public File call() throws Exception{
File transferedFile = null;
ftpUtilHodler.get().upload(file2Transfer);
transferedFile = file2Transfer;
return transferedFile;
}
}
};
pipeline.addAsWorkerThreadBasedPipe(stageTrans
ferFile,1);
//备份已经传输的数据文件
Pipe<File,Void> stageBackupFile = new
AbstractPipe<File,Void>(){
@Override
protected Void doProcess(File transferedFile)
throws PipeException{
RecordWriter.backupFile(transferedFile);
return null;
}
public void shutdown(long timeout,TimeUnit unit){
//所有文件备份完毕后,七里掉空文件夹
RecordWriter.purgeDir();
}
};
pipeline.addAsWorkerThreadBasedPipe(stageTransferFile,
1);
return pipeline;
}
private String[] retrieveFTPServConf(){
String[][]ftpServerConfigs = new String[][]{
{"192.168.1.105","datacenter","abc123"}
,
{"192.168.1.105","datacenter","abc123"}
,
{"192.168.1.105","datacenter","abc123"}
};
return ftpServerConfits;
}
private void processRecords(ResiltSet rs,
Pipeline<RecordSaveTask,String>pipeline)
throws Exception{
Recprd record;
Recprd[]records = new Record[Config.RECORD_SAVE_CHNK_SIZE];
int targeFileIndex = 0;
int nextTargetFileIndex = 0;
int recordCountInTheDay = 0;
int recordCountInTheFile = 0;
String recordDay = null;
String lastRecordDay = null;
SimpleDateFormat sdf = new SimpleDateFormat("yyMMdd");
while(rs.next()){
record = makeRecordFrom(rs);
lastRecordDay = recordDay;
recordDay = sdf.format(record.getOperationTime());
if(recordDay.equals(lastRecordDay)){
records[recordCountInTheFile] = record;
recordCountInTheDay++;
}else{
//实际以发生的不同日期记录文件切换
if(null != lastRecprdDay){
pipeline.process(new RecordSaveTask(Arrays.copyOf(records,
recordCountInTheFile).targetFileIndex));
}else{
pipeline
.process(new RecordSaveTask(laskRecordDay,targeFileIndex));
}
//在此之前,先将records中的内容写入文件
records[0] = record;
recordCountInTheFile = 0;
}
recordCountInTheDay = 1;
}
if(nextTargetFileIndex == targetFileIndex){
recordCountInTheFile++;
if(0 == (recordCountInTheFile %Config.RECORD_SAVE_CHUNK_SIZE)){
pipeline.process(new RecordSaveTask(Arrays.copyOf(recirds,
recordCountInTheFile),targetFileIndex));
recordCountInTheFile = 0;
}
}
nextTargetFileIndex = (recordCountInTheDay)/
Config.MAX_RECORDS_PER_FILE;
if(nextTargetFileIndex > targetFileIndex){
//预测到将发生同日期记录文件切换
if(recordCountInTheFile > 1){
pipeline.process(new RecordSaveTask(Arrays.copyOf(records,
recordCountInTheFile),targetFileIndex));
}else{
pipeline.process(new RecordSaveTask(recordDay,targetFileIndex));
}
recordCountInTheFile = 0;
targetFileIndex = nextTargetFileIndex;
}else if(nextTargetFileIndex < targetFileIndex){
//实际已发生的异日期记录文件切换,recordCountInTheFile
保持当前值
targetFileIndex = nextTargetFileIndex;
}
}
if(recordCountInTheFile > 0){
pipeline.process(new RecordSaveTask(Arrays.copyOf(records,
recordCountInTheFile),targetFileIndex));
}
}
}
|
FTPClientUtil类
public class
FTPClientUtil {
private final FtpClient ftp = new FTPClient();
private final Map<String,Boolean>dirCreateMap
= new HashMap<String,Boolean>();
/*
* helperExecutor是个静态变量,这使得newInstance方法在生成不同的FTPClientUtil实例是
* 可以公用同一个线程池;
* 模式角色:Promise.TaskExecutor
*/
private volatile static ExecurorService helperExecutor;
static{
helperExcurtor = new ThreadPoolExecutor(1,
Runtime.getRuntime().availableProcessors()*2,
60,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),new
ThreadFactory(){
public Thread newThread(Runable r){
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
},new ThreadPoolExecutor.CallerRunsPolicy());
}
//私有构造器
private FTPClientUtil(){
}
//模式角色:Promise.Promisor.conmpute
public static Future<FTOClientUtil> newInstance(final
String ftpServer,final
String userName,final String password){
Callable<FTPClientUtil> callable = new Callable<FTPClientUtil>(){
@Override
public FTPClientUtil call() throws Exception{
FTPClientUtil selt = new FTPClientUtil();
selt.init(ftpServer,userName,password);
return self;
}
};
//task相当于模式角色:Promise.Primise
final FutureTask<FTOClientUtil> task = new
FutureTask<FTOClientUtil>(callable);
helperExecuror.execute(task);
return task;
}
private void init(String ftpServer,String userName,String
password)throws
Exception{
FTPClientConfig config = new FTPClientConfig();
ftp.configure(config);
int reply;
ftp.connect(ftpServer);
System.out.print(ftp.getReplyString());
reply =ftp.getReplyCode();
if(!FTPReply.isPositiveCompletion(reply)){
ftp.disconnect();
throw new RuntimeException("FTp server refufused
connection.");
}
boolean isOK = ftp.login(uesrName, password);
if(isOK){
System.out.println(fto,getReplyString());
}else{
throw new RuntimeException("Failed to login."
+ ftp.getReplyString());
}
reply = ftp.cwd("`/subspeync");
if(!FTPReply.isPositiveCompletion(reply)){
ftp.disconnect();
throw new RuntimeException("Failed to change
working"+ "directory.reply."
+ reply);
}else{
System.out.println(ftp.getReplyString());
}
ftp.setFileType(FTP.ASEII_FILE_TYPE);
}
public void upload(File file) throws Exception{
InputStream dataIn = new BufferedInputStream(new
FileInputStream(file)1024*8);
boolean isOS;
String dirName = file.getParent();
String fileName = dirName + '/' + file.getName();
ByteArrayInputStream checkFileInputSream = new
ByteArrayInputStream("".getBytes());
try{
if(!dirCreateMap.containskey(dirName)){
ftp.makeDirectory(dirName);
dirCreateMap.put(dirName,null);
try{
isOK = ftp.storeFile(fileName,dataIn);
}catch(IOException e){
throw new RuntimeException("Failed to upload
" + file,e);
}
if(isOK){
ftp.storeFile(fileName + ".c",checkFileInputStream);
}else{
throw new RuntimeException("Failed to upload
" + file + ",reply:" + ","
+ ftp.getReplyString());
}
}finally{
dataIn.close();
}
}
public void disconnect(){
if(fto,isConnected){
try{
ftp.disconnect();
}catch(IOException ioe){
//什么也不做
}
}
}
}
} |
RecordWriter类
public class
RecirdWriter {
private static final RecoedWriter INSTANCE =
new RecordWriter();
//HashMap不是安全的,但RecordWriter实例是在单线
程中使用的
,因此不会产生问题
private static Map<Sting,PrintWriter>
printWriterMap
= new HashMap<String,PrintWriter>();
private static String baseDir;
private static final char FIELD_SEPARATOR =
'|';
//SimpleDateFormat不是线程安全的,但RecordWriter
实例
是在单线程中使用的,因此不会产生问题。
private static final SimpleDateFormat FILE_INDEX_
FORMATTER = new DecimalFormat{
"0000"};
private static final int RECORD_JOIN_SIZE =
Config.RECORD_JOIN_SIZE;
private static final FieldPosition FIELD_POS
= new FieldPssition(
DateFormat.Field.DAY_OF_MONTH);
//私有构造器
private RecordWriter(){
baseDir = System.getProperty("user.home")
+ "/tmp/
subspsync/";
}
public static RecordWriter getInstance(){
return INSTANCE;
}
public File write(Record[]targetFileIndex) throws
IOException{
if(null == records || 0 == records.length){
throw new IllegalArgumentException("records
is null or empty");}
int recordCount = records.length;
String recordDay;
recordDay = DIRECTORY_NAME_FORMATTER.format
(records[0].
getOperat
ionTime());
String fileKey = recordDay + '-' + targeFileIndex;
PrintWriter pwr = printWriterMap.get(fileKey);
if(null == pwr){
File file = new Fiole(baseDir + '/' + recordDay
+
"/subspeync-gw-"
+ FILE_INDEX_FORMATTER.format(targetFileIndex)
+
".dat");
File dir = file.getParentFile();
if(!dir.exists() && !dir.mkdirr()){
throw new IOException("No such directory:"
+ dir);
}
pwr = new PrintWriter(new BufferedWritern(new
FileWriter(file,true),
Cofig.WRITER_BUFFER_SIZE));
printWriterMap.put(fileKey,pwr);
}
StringBuffer strBuf = new StringBuffer(40);
int i = 0;
for(Record record:records){
i++;
pwr.print(String.valueOf(record.getId()));
pwr.print(FIELD_SEPARATOR);
pwr.print(record.getMsisdn());
pwr.print(FIELD_SEPARATOR);
pwr.print(record.getProductId());
pwr.print(FIELD_SEPARATOR);
pwr.print(record.getRackageId());
pwr.print(FUEKD_SEPARATOR);
pwr.print(String.valueOf(record. getOperationType()));
pwr.print(FIELD_SEPARATOR);
strBuf.delete(0, 40);
pwr.print(sdf.format(recore.getOperationtime(),
strBuff,FIELD_POS));pwr.print(FIELD_SEPARATOR);
strBuf.delete(0, 40);
pwr.print(sdf.format(recore.getOperationtime(),
strBuff,FIELD_POS));
strBuf.delete(0, 40);
pwr.print(FIELD_SEPARATOR);
pwr.print(sdf.format(record.getDueDate(),
strBuf,FIELD_POS));
pwr.print('\n');
if(0 == (i % RECORD_JOIN_SIZE)){
pwr.flush();
i = 0;
//Thread.yield();
}
}
if(i > 0){
pwr.flush();
}
File file = null;
//处理当前文件中的最后一组记录
if(recordCount <Config.RECORD_SAVE_CHUNK_SIZE){
pwr.close();
file = new FIle(baseDir + '/' + recordDay +
"/
subspsync-gw-"
+ FILE_INDEX_FORMATER.format(targetFileIndex)
+ ".dat");
printWriterMap.remove(fileKey);
}
return file;
}
public File finishRecords(String recordDay,int
targetFileIndex){
String fileKey = recordDay + '-' + targetFileIndex;
PrintWriter pwr = printWriterMap.get(fileKey);
File file = null;
if(null != pwr){
pwr.flush();
pwr.close();
file = new File(baseDir + '/' + recordDay +
"/
subspsync-gw-"
+ FILE_INDEX_FORMATER.format(targetFileIndex)
+
".dat");
printWriterMap.remove(fileKey);
}
retutn file;
}
public static void backupFile(final File file){
String recordDay = file.getParentFile().getName();
File destFile = new File(baseDir + "/backup"
+
recordDay);
if(!destFile.exists()){
throw new RuntimeException("Failed to backup
file
" + file);
}
file.delete();
}
public static void purgeDir(){
File[]dirs = new File(baseDir).listFiles();
for(File dir:dirs){
if(dir.isDirectory() && 0 ==dir.list().length){
dir.delete();
}
}
}
}
|
模式考量
Pipeline模式可以对有依赖关系的任务实现并行处理。并行和并发编程中,为了提高并发性我们旺旺需要将规模较大的任务分解撤柜若干个规模较小的子任务,这些子任务间同城没有依赖关系。而Pipeline模式则允许子任务间存在依赖关系的条件下实现并行运算。
Pipeline模式为用单线程模式编程提供了便利。多线程编程总的来说是复杂的,不仅代码编写比较复杂,出现问题也不好定位,多线程出现非预期结果是,开发人员不仅要考虑算法是否正确,还要考虑是否是多线程先关问题导致非预期的结果。相反,单线程编程就显得相对简单。Pipeline模式非常便于我们采用单线程模式实现对子任务的处理。
Pipeline模式中,每个Pipeline实例都是一个Pipe实例,因此,我们可以添加成其他实例,这就加大了该模式的扩展性和灵活性。
当然Pipeline模式也有一定的风险:Pipeline模式中各个处理阶段所用的工作者线程或者线程池,表示各个阶段的输入/输出对象的创建和一定(进出队列)都有其自身的时间和空间开销,所以使用Pipeline模式的时候需要考虑它所付出的代价。建议处理规模较大的任务,否则可能得不偿失。
模式需要注意的东西
1.Pipeline的深度:Pipeline中Pipe的个数被称作Pipeline的深度。所以我们在用Pipeline的深度与JVM宿主机的CPU个数间的关系。如果Pipeline实例所处的任务多属于CPU密集行,那么深度最好不超过Ncpu。如果Pipeline所处理的任务多属于I/O密集型,那么Pipeline的深度最好不要超过2*Ncpu。
2.基于线程池的Pipe:如果Pipe实例使用线程池,由于有多个Pipe实例,更容易出现线程死锁的问题,需要仔细考虑。
3.错误处理:Pipe实例对其任务进行过程中跑出的异常可能需要相应Pipe实例之外进行处理。此时,处理方法通常有两种:一是各个Pipe实例捕获到异常后调用PipeContext实例的handleError进行错误处理。另一个是创建一个专门负责错我处理的Pipe实例,其他Pipe实例捕获异常后提交相关数据给该Pipe实例处理。
4.可配置的Pipeline:Pipeline模式可以用代码的方式将若干个Pipe实例添加,可以用配置文件的方式实现动态方式添加Pipe。
|