随着信息系统的结构的日益复杂和规模的不断扩大,交易中间件在复杂系统的应用也越来越广。交易中间件作为一个中间层的系统,在接收客户端请求时,通常需要做一些负载控制和用户缓存的一些功能。对于软件测试人员来说,测试交易中间件时,避免不了模拟客户端在高负载情况下的一些有规律或随机的行为。这些测试有时是功能性验证测试
(Functional Verification Test),有时也涉及到性能测试 (Performance
Test)。
本文将介绍如何使用 Java 语言编写多线程的自动化测试脚本,并且使用线程池模拟一些特殊的有规律的测试场景。
本文首先会简单的介绍交易中间件及 Java 多线程编程的概念。接着提出项目中遇到的问题。然后就碰到的问题,使用
Java 多线程技术模拟测试场景。解决这个问题后,就类似的问题提出推广的思路。
本示例的必备条件
Java 的多线程应用范围很广,交易中间件的种类也有许多。本文 JDK
的版本是 JDK5,原因是 JDK5 中加入了比较丰富的多线程并发工具。目前,JDK 的最新版本是 JDK7,其中又增加了许多工具包括
Phaser、ThreadLocalRandom、ForkJoinPool、以及 TransferQueue
等,但是如果掌握了 JDK5 的多线程工具,对 JDK7 的工具也一定不会陌生了。
本文的交易中间件是以 IBM Information Management
System(IMS)的 TM 为示例,这个示例是为您创建一个学习的场景,当然 Java 的多线程应用的范围非常广泛,不限于这一种交易中间件。在本文的推广部分,也另外假设了一个场景,并加以实现。
如果您也需要以 IMS TM 来测试,请确保提供以下的测试环境。
需要在 Windows 上安装的软件:
- JDK 1.5(或者更新的版本)
- Rational Functional Tester V7.0(或者更新的版本)
需要在 IBM z/OS 上安装的环境:
- IMS Version 9 (或者更新的版本)
- IMS Connect Version 9 (或者更新的版本)
- OTMA
- TCP/IP
概念介绍
交易中间件
中间件的产品种类很多,根据中间件在系统中所起的作用和采用的技术不同,大致划分为五大类:数据库中间件
(Database Middleware,DM)、远程过程调用中间件 (Remote Procedure
Call, RPC)、基于对象请求代理 (Object Request Broker, ORB)、中间件与交易中间件
( Transaction Processing Monitor, TPM, 也称事务处理中间件 )。
交易中间件是一种复杂的中间件产品,通常是在负载的环境下运用分布式应用的速度和可靠性来实现的。交易中间件向用户提供一系列的服务,包括通信服务、日志服务、系统服务和交易服务等。
交易中间件的通信主要是基于 TCP/IP 的 socket 技术和基于消息传递与排队机制实现的,其通信的过程如图
1 所示:
图1. 交易中间件通信过程
交易中间件端通常有侦听方法在监听客户端的连接请求,返回一个连接后并生成一个相应的客户服务进程。在接收到客户端的数据后,对数据进行分割、加密、封装成消息包。然后做分发、入队、发送等操作。
Java 多线程编程
从交易中间件的概念中可以看出,如果要模拟客户端使用交易中间件,首先要模拟建立
Socket 连接。接下来,如果需要模拟多个用户的连接,就需运用用 Java 的多线程机制了。
Java 的多线程机制使应用程序能够并发的执行,并且 Java 能够运用同步机制使并发的线程能够互斥的使用共享资源,从而达到线程安全的作用。Java
的多线程的实现方法有两种:1) 通过 Thread 继承。为创建一个线程,最简单的方法就是从 Thread
类继承。2) 通过 Runnable 接口。该方式主要是用来实现特殊功能,如复杂的多继承功能等。Runnable
接口的应用虽然提高了软件开发的灵活度,但是同时也是造成 BUG 的根源之一,所以应根据不同的需求,合适的选取两种方法,两种实现的方法的代码可见代码清单一。
清单 1.Java 多线程实现的两种方式
//方式一
publicclassMyThreadextendsThread{
publicvoidrun(){
System.out.println("MyThread.run()");
}
}
//在合适的地方启动线程如下:
MyThreadmyThread1=newMyThread();
MyThreadmyThread2=newMyThread();
myThread1.start();
myThread2.start();
//方式二
publicclassMyThreadextendsOtherClassimplementsRunnable{
publicvoidrun(){
System.out.println("MyThread.run()");
}
}
//为了启动MyThread,需要首先实例化一个Thread,并传入自己的MyThread实例
MyThreadmyThread=newMyThread();
Threadthread=newThread(myThread);
thread.start();
|
在 Java 中,要实现多线程流程控制方法有以下几个:
- 通过 sleep() 来实现 Java 多线程流程控制,可以用于多线程流程控制演示。
- 通过 interrupt() 函数对 Java 多线程进行流程控制。这种控制方法比 sleep()
函数要精确得多,但在用法上与 sleep() 函数不太相同。
- 通过 wait() 和 notify() 来进行流程控制。wait() 方式与 sleep()
方式有相同之处,都是从线程内部对线程进行中断的。不同的是,它没有提供中断时间,其唤醒方式就是 notify()
和 notifyA ll()。
- 通过 join() 来进行流程控制,join()方式结合了 interrupt 和 sleep
方式的优点。在实际设计当中应注意使用 join 方式,因为不恰当的使用可能会打乱线程的流程。
项目面临的问题
项目背景
IBM 的 IMS 和 CICS(1968)是在 IBM 大型机 mainframe
时代最早的交易中间件,它们都采用请求队列管理、检查点机制和批处理的启动机制。本文所要测试的交易中间件正是
IBM 公司的大型数据库产品 IMS 的交易中间件。它提供了交易处理的通信、并发访问控制、事务控制、资源管理和必要的事务监听功能。IMS
的控制及数据流程可见图 2
图2. IMS控制及数据流程
项目需求
在 IMS TM 中,负责 TCP/IP 通信的模块是 IMS Connect,这也是本文测试的核心连接模块。该模块的新增功能包括
TCPIPQ 和 Health Report 功能。TCPIPQ 是一个 socket 连接的缓冲队列,那些暂时没有被接受的连接请求会进入这个队列,它的大小是可控的。Health
Report 功能是定时的检查目前的 socket 连接是否达到最大连接数,并且将已连接的比例汇报给相应的控制模块,客户也可以通过命令查看当前已建立
socket 的占有率。
面临的问题
- 在测试这两个新增功能时,由于 IMS Connect 能够建立的最大 Socket 连接数可以达到
65535 个,而最少的数目也有 100。这么多的 socket 数目,如果一个一个的开启客户端程序来发起交易是不现实的。
- 在测试 Health Report 时,当需要测试占有率达到百分之百,即模拟当前的 socket
连接已经达到上限的场景,需要同时建立并发的请求,并且数目要足够多,达到连接上限。
- 在测试 TCPIPQ 时,只有让现有的 socket 连接达到最大连接数,新的连接才能进入 TCPIPQ
队列。如果需要测试队列的出队和入队操作是否正确,需要多个线程有规律的简历 socket 连接。
解决方法及实现细节
为了解决项目面临的测试难题,本文采用 Java 的多线程机制来模拟这三个问题所涉及的场景。由于在本项目中客户端发出的请求需要复杂的处理,所以本文采用实现
Runnable 接口的方法来实现多线程,这样发送请求的类还可以继承其它的工具类来实现复杂的处理。
由于 IMS Connect 所支持的 socket 的数目 (MAXSOC)
可以从 100 到 65535,而 TCPIPQ 队列的大小可以从 50 到 65535,本文在实现客户端多线程请求前,设置
IMS Connect 的 MAXSOC=100 并且设置 TCPIPQ=50。
在这个前提下,本文实现的线程类在演示时,如果创建 100 个线程的客户端连接,就会达到
socket 连接的上限,使 IMS Connect 的 Health 为 0,意味着不能再监听多余的
socket 请求,如果继续发送 50 个请求,这些请求会进入 TCPIPQ 的队列,使这个缓存队列也达到上限。再继续发送请求就会被拒绝,并得到拒绝的提示信息。
如果使用最简单的实现方法,可以参考如下的代码清单 2。
清单 2. 多客户端并发连接的简单实现
classUtility{//工具类
publicbyte[]getHeadData(String...parms){
.....
....
}
publicbyte[]getBodyData(String...parms){
.....
....
}
publicbyte[]getFootData(String...parms){
.....
....
}
}
classSocketClientextendsUtilityimplementsRunnable,Test{//实现Runnable接口
StringhostName;
intport;
publicSocketClient(StringhostName,intport){
this.hostName=hostName;
this.port=port;
}
//线程启动后的连接,并发送数据的
publicvoidrun(){
try{
InetAddressinetAddress=InetAddress.getByName(hostName);
Socketsocket=newSocket(inetAddress,port);
OutputStreamos=socket.getOutputStream();
BufferedOutputStreambos=newBufferedOutputStream(os);
byte[]sendHeadData=getHeadData("aaa","bbb","ccc");
byte[]sendBodyData=getBodyData("ddd","eee","fff");
byte[]sendFootData=getFootData("ggg","hhh","iii");
bos.write(sendHeadData);
bos.write(sendBodyData);
bos.write(sendFootData);
bos.flush();
}catch(IOExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}
}
}
//测试的主线程,演示测试的一些场景
publicclassSocketConnectionDemo{
publicintconnectionNumb;//连接数
publicStringhostName;//连接的IMSConnect的域名
publicintport;//连接的端口
publicSocketConnectionDemo(intconnectionNumb,StringhostName,intport){
this.connectionNumb=connectionNumb;
this.hostName=hostName;
this.port=port;
}
publicstaticvoidmain(String[]args){
intconnectionNumb=50;
StringhostName="ec32181.vmec.svl.com";
intport=9999;
SocketConnectionDemoscd=newSocketConnectionDemo(connectionNumb,hostName,port);
}
//最简单的测试场景
publicvoidtestScenario1(){
for(inti=1;i<=connectionNumb;i++){
SocketClientsc=newSocketClient(hostName,port);
ThreadclientT=newThread(sc,"Client"+i);
clientT.start();
}
//测试的具体逻辑
…..
….
}
}
|
从这个代码清单可以看出,testScenario1 是具体的测试场景调用的方法。这个测试场景使用最简单的方法实现线程的启动,并根据具体的
connectionNumb 建立相应的连接数,在这些连接都建立之后,可以执行具体的测试逻辑,测试在峰值情况下的
IMS Connect 的表现,并查看具体的 Health Value。
这个实现也存在一些问题,这些多线程的客户端虽然是并行的执行,但是它们并不是在同一起跑线开始的,因为在主线程中,它们都是一个一个建立的线程,一旦建立就
start 了。为了实现所有线程同时开始,快速使得 IMS Connect 达到峰值,就需要使用线程池技术,这就是
JDK1.5 中新增并发工具类。在此使用了 CountDownLatch 来实现连接数的控制,并且通过一个值为
1 的 CountDownLatch 对象来实现同一起跑线开始的闸门功能。详细代码请查看代码清单 3
清单 3. 使用 CountDownLatch 实现并发连接
classThreadTestimplementsRunnable{
privatestaticCountDownLatchstartCdl;//用于启动所有连接线程的闸门
privatestaticCountDownLatchdoneCdl;//所有连接工作都结束的控制器
publicThreadTest(CountDownLatchstartCdl,CountDownLatchdoneCdl){
this.startCdl=startCdl;
this.doneCdl=doneCdl;
}
publicvoidrun(){
try{
startCdl.await();
System.out.println(
Thread.currentThread().getName()+"hasbeenworking!!!!");
//此处需要代码清单一的那些连接操作
……..
……
…..
doneCdl.countDown();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
publicclassCountDownLatchDemo1{
publicstaticvoidmain(String[]args){
CountDownLatchstartCdl=newCountDownLatch(1);//启动的闸门值为1
CountDownLatchdoneCdl=newCountDownLatch(100);//连接的总数为100
for(inti=1;i<=100;i++){
ThreadTesttt=newThreadTest(startCdl,doneCdl);
newThread(tt,"Thread"+i).start();
}
//记录所有连接线程的开始时间
longstart=System.nanoTime();
//所有线程虽然都已建立,并start。但只有等闸门打开才都开始运行。
startCdl.countDown();
try{
doneCdl.await();//主线程等待所有连接结束
//连接达到峰值后,执行一些测试逻辑代码
......
......
......
}catch(InterruptedExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}
//记录所有连接线程的结束时间
longend=System.nanoTime();
System.out.println("Thetasktakestime(ms):"+(end-start)/100000);
}
}
|
应用及验证
在上一节,介绍了整个多线程的实现细节。在本节中,将运行这个多线程的应用,并结合项目的
IMS Connect 服务器,演示连接数是否能像预期一样达到上限,进行峰值的测试。
IMS Connect 目前的版本是 V12,正在开发的版本是 V13。在
V13 中,可以通过一些命令来查看当前的 Health 值,即连接数与上限的比例情况。本文中,设置 IMS
Connect 的上限连接数为 100,开放端口为 5,那么在 IMS Connect 开启时,可以通过命令查看得到
Health 值为 95,这意味着目前连接数占有率为 95%。
接下来,开启多线程的连接,并发的连接 95 个连接。IMS Connect
后台会显示连接在不断的增长,当达到峰值时会出现相应的提示信息。
此时,再查看 Health 的值,会发现已经变成了 000,这意味着 IMS Connect 已经达到了峰值。
推广
本文为了实现多个连接线程同时开启并发拼抢连接交易中间件服务器的过程,使用了
JDK1.5 的并发类 CountDownLatch。但是,JDK1.5 以后的版本引入了许多高级的并发特性,充分利用了现代多处理器和多核心系统的功能以编写大规模并发应用程序。主要包含原子量、并发集合、同步器、可重入锁,并对线程池的构造提供了强力的支持。
除了本文使用的 CountDownLatch,还有许多同步器。例如,Semaphore、Barrier、Exchanger、Future
等。为了实现项目后期的连接缓存队列 TCPIPQ 的测试,Exchanger 可能会被使用。Exchanger
可以实现两组线程互相交换一些共享资源的功能。
为了说明 Exchanger 的使用,本文假设一种场景。假设有一个连接缓存队列,有一个连接器负责创建连接,创建的连接会存储在队列里。另外一个释放连接器可以释放连接,从队列里移除连接。连接器每次会随机地创建
1 或 2 个连接。释放连接器只能每次释放 1 个连接。最后肯定会有连接队列满的时候,那时就可以进行连接队列的测试了
要实现这一种场景,可以使用 Exchanger 来实现。可以查看代码清单
4。
清单 4. 使用 Exchanger 实现连接缓存队列的测试
// 假设建立连接的类是 Connection class Connection{ private String connName; private String ipAddress; public Connection(String connName,String ipAddString) { this.connName=connName; this.ipAddress=ipAddString; } ..... ... }
public class ConnectionQueueDemo {
// 使用交换器实现连接器与释放连接器之间资源的共享
private static Exchanger<LinkedList<Connection>>
exconn =
new Exchanger<LinkedList<Connection>>();
// 连接器
public class Connector implements Runnable{
private LinkedList<Connection> connQueue;
private String ipAddress;
public Connector(LinkedList<Connection>
connQueue,String ipAddress) {
this.connQueue=connQueue;
this.ipAddress=ipAddress;
}
public void run() {
boolean flag=true;
while(flag){
// 每次连接随机的 1~2 个连接。
Random random = new Random();
int connNumb = (random.nextInt())%2 + 1; // 得到随机的
1~2 个连接数
if(connNumb > 1){
System.out.println("Connector creates 2 connection!");
}else{
System.out.println("Connector creates 1 connection!");
}
for(int i=0; i<connNumb; i++){
Connection conn = new Connection("Connector",
getIpAddress());
connQueue.add(conn);
}
// 休息 1 秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
// 交换给释放连接器,让释放连接器工作!
connQueue =(LinkedList<Connection>) exconn.exchange(connQueue);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(connQueue.size()==10){
System.out.println("The connection queue
is full!! The programme is end!");
// 当队列满时,可以加入一些测试逻辑代码
....
...
flag=false;
System.exit(0);
}else{
System.out.println("After Disconnector, the
size of the queue is "+connQueue.size());
}
}
}
public String getIpAddress(){
return ipAddress;
}
}
// 释放连接器
public class Disconnector implements Runnable
{
private LinkedList<Connection> connQueue;
public Disconnector(LinkedList<Connection>
connQueue) {
this.connQueue=connQueue;
}
public void run() {
boolean flag=true;
while(flag){
System.out.println("Disconnector disconnects
1 connection!");
if(!connQueue.isEmpty())
connQueue.remove(0);
// 休息 1 秒
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
// 交换给连接器,让连接器工作!
connQueue =(LinkedList<Connection>) exconn.exchange(connQueue);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(connQueue.size()==0){
System.out.println("There is no connection
in the queue!");
}else{
System.out.println(
"After Connector, the size of the queue is
"+connQueue.size());
}
}
}
}
public static void main(String[] args) {
LinkedList<Connection> connQueue = new LinkedList<Connection>();
ConnectionQueueDemo connectionQueueDemo = new
ConnectionQueueDemoEx();
new Thread(connectionQueueDemo.new Connector(connQueue,"192.168.1.1")).start();
new Thread(connectionQueueDemo.new Disconnector(connQueue)).start();
}
}
|
在 main 函数里是具体的 Demo 实现。新建了连接器和释放连接器两个线程,它们共享一个连接缓存队列。由于,连接器每次随机的连接的连接数要大于释放连机器释放的连接数,所以最后,连接队列会满。后台打印的输出截屏如下:
图3.Exchanger 实现的多线程 Demo
的后台输出
总结
在交易中间件的测试中,伴随着网络连接的复杂情况,程序员需要模拟不同的场景来测试不同的边界情况。不仅如此,还需要模拟大量客户端连接来测试达到峰值的情况。本文以
IMS 的交易管理器的一个组件 IMS Connect 为例,阐述了如何使用 Java 并发编程的技术来模拟多个客户端发出连接请求的方法。
本文也对 Java 多线程编程做出了一些推广应用。Java 的多线程技术在
JDK1.5 之后有了许多新的特性和扩展,在推广的章节,本文利用了 Exchanger 类来实现了客户端连接和释放的测试场景。
最后,希望通过本文的介绍能对会面临同样问题或类似问题的程序员们提供一些思路,能有利于测试人员更好的测试交易中间件,达到峰值测试的效果。
|