本文内容包括:
了解如何使用 Web 服务来通过 HTTP 进行异步消息传递。本文将分析这样一个场景:在此场景中,触发器将调用 Java? 存储过程,后者又将调用
Web 服务,而 Web 服务会将消息放入面向消息的中间件中。本文专为在 B2B 环境中开发 Web 服务的读者撰写。读者需要具备
Web 服务、DB2? 和 WebSphere? Application Server 方面的知识。
引言
在分布式计算环境中,有各种用于消息传递的模型。这些模型背后的基础概念因两个系统的同步与否而不同。一方面,系统发送消息并等待响应的情况下使用的是同步模型。另一方面,系统发送消息并继续处理(“Fire-and-Forget”消息交换)的情况下则使用的是异步模型。发送消息并不总是要求两个系统均已启动且就绪。异步消息传递系统中的参与者并不必等待接收方的响应,因为它们可以依赖消息传递基础设施来确保成功交付。异步消息传递是松散耦合的面向服务的体系结构(Service-Oriented
Architecture,SOA)的策略之选,因为它克服了远程通信固有的一些限制,如延迟和不可靠等。
异步消息传递的核心是面向消息的中间件(Message-Oriented Middleware,MOM),如 IBM MQSeries?。
下面让我们了解一下其工作方式:为了发送消息,System A 使用 MOM 公开的 API 将消息放入 MOM 中。希望接收消息的
System B 可以使用 MOM 公开的 API 从 MOM 获取消息。如果您没有使用 MOM API 所需的 MOM 库,则可以使用一般
Web 服务,以公开用于将消息放入 MOM 的方法。希望发送消息的系统将需要一个 Web 服务客户机。系统希望发送消息时将调用此客户机,此客户机将调用
Web 服务,此 Web 服务会将消息放入到 MOM 中。
回页首
配置 WebSphere Application Server Version 6 进行缺省消息传递
为了说明本文中的概念,我们需要配置 WebSphere Application Server V6(以下称为 WAS)。要配置
WAS,请执行以下步骤:
- 创建总线
- 创建总线成员
- 创建目的地
- 创建 JMS 连接工厂
- 创建 JMS 队列
- 创建 JMS 激活规范
有关如何进行这些步骤的详细信息,请参考参考资料部分中列出的文章“将消息驱动 Bean 和 JMS 应用程序部署到服务集成总线中”。
回页首
使用 Web 服务将消息放入 MOM
Web 服务将提供一个特定方法,当调用此方法时,会将一条消息放入队列中。出于演示目的,我们将使用文本消息,不过可以针对任何 JMS
消息类型编写 Web 服务。有关如何编写 Java Web 服务的信息,请参见参考资料部分中列出的红皮书“WebSphere version
6 Web Services Handbook Development and Deployment”。清单 1 显示了用于测试本文代码的示例
Web 服务的 putMessage 方法。
清单 1. putMessage 方法
public void putMessage(String message){
try {
Properties prop = new Properties();
prop.put(InitialContext.INITIAL_CONTEXT_FACTORY,
"com.ibm.websphere.naming.WsnInitialContextFactory");
prop.put(InitialContext.PROVIDER_URL,
"corbaloc:iiop:localhost:2809");
InitialContext context = new InitialContext(prop);
//Do the lookup for Queue connection factory.
QueueConnectionFactory
factory =(QueueConnectionFactory)context.lookup("JMS/MyQueue_CF");
//Do the lookup for Queue.
Destination queue = (Destination) context.lookup("JMS/queue");
//Create connection and start it.
Connection connection = factory.createConnection();
connection.start();
System.out.println("connection created");
//Create session and message producer.
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageProducer producer=session.createProducer(queue);
System.out.println("producer created");
System.out.println(producer.getDestination().toString());
//Prepare the text message.
TextMessage msg=session.createTextMessage();
msg.setText(message);
//Send the message.
producer.send( msg);
System.out.println("sent message : "+msg.getText());
} catch (Exception e) {
e.printStackTrace();
}
}
|
putMessage 方法将通过 JNDI 查找连接工厂、JMS 目的地对象。然后,它将创建连接、会话以及 JMS 目的地的消息生成器,并将文本消息放入队列中。
回页首
配置数据库
为了自动调用 Web 服务客户机,需要进行数据库配置。可以采用多种方式完成此任务。例如,可以编写触发器,而让触发器调用用户定义函数;或编写触发器,而让触发器调用
Java 存储过程。出于演示目的,我们使用触发器与 Java 存储过程组合和 DB2 数据库。请执行以下步骤来配置数据库。
创建示例表
运行以下 SQL 命令来创建表:
清单 2. 用于创建示例表的 SQL
CREATE TABLE customer
(
pkey INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START
WITH 1, INCREMENT BY 1, NO CACHE ) PRIMARY KEY,
fname VARCHAR(20),
lname VARCHAR(20),
ccode VARCHAR(10)
);
|
创建 Java SP
此 Java 存储过程(Java Stored Procedure,Java SP)是本文概念中的关键。调用此 Java SP
时将调用的 Java 类中将包含调用 Web 服务的代码。请运行以下 SQL 命令来创建 Java SP:
清单 3. 用于创建 Java SP 的 SQL
CREATE PROCEDURE MyPROCEDURE ( IN var0 INTEGER,IN var1 VARCHAR(20) )
SPECIFIC SQL060622110835012
DYNAMIC RESULT SETS 1
NOT DETERMINISTIC
LANGUAGE Java
EXTERNAL NAME 'MyPROCEDURE.invokeWS'
FENCED
THREADSAFE
PARAMETER STYLE JAVA
|
下面的代码列表显示的是 MyPROCEDURE Java 类。此类具有一个名为 invokeWS 的方法。此方法具有三个参数,主键、对表执行的操作(创建、更新、删除)和结果集。我使用了
Axis API 来编写此方法中的 Web 服务客户机代码。为了编译和运行此类,请确保您的类路径中包含各个 Axis jar 文件。
清单 4. Java SP 的 Java 类
import java.sql.*;
import java.io.FileOutputStream;
import java.io.PrintStream;
import javax.xml.rpc.ServiceFactory;
import javax.xml.rpc.Service;
import javax.xml.rpc.Call;
import javax.xml.rpc.ParameterMode;
import javax.xml.namespace.QName;
public class MyPROCEDURE
{
public static void invokeWS ( int var0, String var1, ResultSet[] rs1 )
throws SQLException, Exception
{
try{
//create service factory
ServiceFactory factory = ServiceFactory.newInstance();
// define qnames
String targetNamespace = "http://asyncmessage.ibm.com";
QName serviceName = new QName(targetNamespace,"AsynchronousMessageWSService");
QName portName = new QName(targetNamespace,"AsynchronousMessageWS");
QName operationName = new QName(targetNamespace,"putMessage");
// create service
Service service = factory.createService(serviceName);
// create call
Call call = service.createCall();
// set port and operation name
call.setPortTypeName(portName);
call.setOperationName(operationName);
// add parameters
call.addParameter("message",
new QName(targetNamespace,"string"),ParameterMode.IN);
call.setReturnType(new QName("",""));
// set end point address
call.setTargetEndpointAddress("http://localhost:9080/AsyncMessagingSamples
/service/AsynchronousMessageWS");
// invoke the remote web service
call.invoke(new Object[]
{ "Message from DB2 : a row with primary key"+var0+" is "+var1+"d"});
}catch(Exception e)
{
try{
FileOutputStream fout=new FileOutputStream("errors.txt");
e.printStackTrace(new PrintStream(fout));
}catch(Exception e1){}
}
}
}
|
在示例表上创建触发器
在示例代码中,我们在 customer 表上创建了三个触发器,分别用于 Insert、Update 和 Delete 操作。例如,只要在
customer 表中插入、更新或删除行,就会使用这些触发器。当发生这三个事件中的一个时,就会执行触发器。这些触发器将随后调用
Java SP,而后者又会调用 invokeWS 方法。请运行以下 SQL 命令来创建触发器:接下来让我们看看如何进行此工作:
清单 5. 用于在示例表上创建触发器的 SQL
//Trigger for row insertion
CREATE TRIGGER event_create
AFTER INSERT ON CUSTOMER REFERENCING NEW AS N
FOR EACH ROW MODE DB2SQL
call MYPROCEDURE(N.pkey,'create') ;
//Trigger for row updation
REATE TRIGGER event_udpate
AFTER UPDATE ON CUSTOMER REFERENCING NEW AS N
FOR EACH ROW MODE DB2SQL
call MYPROCEDURE(N.pkey,'update') ;
//Trigger for row deletion
CREATE TRIGGER event_delete
AFTER DELETE ON CUSTOMER REFERENCING OLD AS O
FOR EACH ROW MODE DB2SQL
call MYPROCEDURE(N.pkey,'delete') ;
|
回页首
创建 MDB
为了完成此方案,我们需要创建消息驱动的 Bean(Message Driven Bean,MDB)。此 MDB 将订阅队列,而我们的
Web 服务会将消息放入到此队列中。有关如何配置 MDB 的详细信息,请参考参考资料部分中列出的文章“将消息驱动 Bean 和
JMS 应用程序部署到服务集成总线中”。清单 6 显示了摘自 ibm-ejb-jar-bnd.xmi 文件和本文中使用的示例 MDB
的 onMessage 方法的代码片段。
注意:为了便于阅读,已将 ibm-ejb-jar-bnd.xmi 中的某些部分删除,而重要属性则使用了粗体进行标记。
清单 6. 摘自 onMessage 和 ibm-ejb-jar-bnd.xmi 文件的代码片段
.........................................................
<ejbBindings .....
activationSpecJndiName="eis/MyQueue_activation_spec" destinationJndiName="JMS/queue">
..........................................................
<resRefBindings ......... jndiName="JMS/MyQueue_CF">
..........................................................
<resRefBindings ..... jndiName="JMS/queue">
...........................................................
</ejbBindings>
public void onMessage (javax.jms.Message msg)
{
try {
System.out.println("***********************************");
System.out.println("***********************************");
System.out.println("***"+((TextMessage)msg).getText()+"***");
System.out.println("***********************************");
System.out.println("***********************************");
} catch (JMSException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
|
回页首
投入使用
我们已经完成所需的步骤。请在 WebSphere Application Server 中部署 Web 服务。现在,只要我们在
customer 表中创建、更新或删除行,就会由 MDB 向 WebSphere Application Server 的 SystemOut.log
写入一条消息。如果您在 customer 表中使用主键 100 创建一个行,对应的消息将与如下所示类似:"Message
from DB2: a row with primary key 100 is created"。
回页首
结束语
在本文中,我们了解了可以如何使用 Web 服务启用 HTTP 上的异步消息传递功能。这个方法的主要好处是它具有平台和语言独立性,允许您自由地编写
JMS 客户机来向 MOM 发送消息。
回页首
下载
描述 |
名字 |
大小 |
下载方法 |
Code samples for this article |
samples.zip |
10KB |
|
回页首
参考资料
|