BT

如何利用碎片时间提升技术认知与能力? 点击获取答案

PHP开发者的BlazeDS和JMS指南,第一部分

| 作者 Nathan A. Good 关注 0 他的粉丝 ,译者 李强 关注 0 他的粉丝 发布于 2010年7月6日. 估计阅读时间: 43 分钟 | 都知道硅谷人工智能做的好,你知道 硅谷的运维技术 也值得参考吗?QCon上海带你探索其中的奥义

BlazeDS是来自Adobe公司的一个开源项目,它可以让您的Flex应用程序与数据服务进行连接。JMS(Java消息服务)是用Java编写的与服务相互通信的一种方法。本文有助于您体会使用JMS的优点,以及在Flex应用程序中如何使用BlazeDS通过JMS与Java服务进行通信。

JMS概述

JMS是一个API(应用编程接口)标准,它允许您使用Java EE技术发送和接收消息。在Java社区很多软件商都提供了JMS的商业和开源实现,您可以根据自己的需求自由选择软件商。

使用JMS的优点

使用JMS有几个优点,包括抽象、可靠的传递、异步消息,以及故障转移和高可用性。其中,抽象是非常重要的优点,因为JMS的消费者和生产者不必彼此相互依赖。无论是消费者端还是生产者端的代码都可以改变,只要JMS消息保持一致,两者间的连接就不会中断。

JMS通常支持两种形式的消息——持久化的消息和非持久化的消息。非持久化的消息是在内存中进行处理的,速度很快,但可能会因系统故障而丢失消息。持久化的消息则会把消息写入磁盘,即便系统发生故障也可找回消息,但相应的处理速度较慢。不管是否使用事务处理,JMS消息提供者都会确保所有持久化消息只被可靠传递一次。异步消息传递则提供了发送消息而无需等待响应的能力,其在Web应用中非常有用,当发送请求后应用程序无需阻塞或挂起等待响应完成。这种消息传递类型对于需要长时间处理的应用(如酒店预订或构建动态文档等应用)来说是一个理想选择。

只要配置正确,很多JMS的实现都提供了故障转移和高可用性能力。因此,如果某个正在运行JMS的服务器突然失效了,相同JMS实现的另一台服务器能够处理其负载。对于需要提供业务连续性的应用程序来说,故障转移是一个理想选择。

使用JMS和BlazeDS的优点

在使用BlazeDS连接JMS时,您的Flex应用程序可以获得使用JMS的所有好处,而过去这些好处往往只有完全在J2EE平台上实现的Web应用才能享受。

使用BlazeDS和JMS向服务发送消息,可以让您的Flex应用程序开发更加独立于服务。服务的JMS实现甚至可以在更换软件供应商,而您的Flex应用程序不用做任何修改。

使用BlazeDS和JMS的另一个主要优点是,如果您在使用JMS之前正在使用超文本传输协议(HTTP),那么您的Flex应用程序的代码基本不需要做什么改变,BlazeDS会为您处理的所有细节。

理解JMS消息

JMS提供了两种类型的消息传递模型:点对点(P2P)和发布/订阅(pub/sub)模型。简单地说,发布/订阅是一对多的方式广播消息,而点对点则是一对一的方式传递消息。

在JMS中的消息客户端被称为JMS客户端;而消息系统则被称为面向消息的中间件(MOM——Messaging Orientated Middleware),它也是JMS提供者。另外,产生消息的JMS客户端被称为是生产者,而接收消息的JMS客户端被称为消费者。

P2P模型

P2P消息模型允许JMS客户端通过名为队列(Queue)的虚拟信道发送和接收消息。JMS队列可视为消息的容器,就像一个邮箱。JMS发送者推送一条消息到JMS消息队列就如同寄件人把一封信件放到了邮箱。而后,接收者从邮箱中获得消息并对之采取某种动作。就像邮箱中的信件,使用JMS队列从发件人发送的消息由单个接收者读取。尽管JMS对于P2P也支持类似于发布/订阅模型所使用的“推”的方式,传统的P2P模型却是一个基于“拉”或基于“轮询”的模型,是从队列中请求消息,而不是把消息推送到客户端。

可能用到JMS消息和Flex客户端的一个实例是:从Flex客户端初始化一个进程,该进程需要服务层花费一些时间才能完成,比如创建一个按需定制的PDF文件。

发布/订阅模式

在发布/订阅模式中,生产者可以通过名为主题(Topic)的虚拟信道发送一个信息给多个消费者。主题与印刷杂志类似:订阅者向出版商注册订阅,接收指定的杂志。出版商定期向不同的订阅者发送同一杂志的副本。发布/订阅模型总的来说是基于推的模型,消息被广播到消费者,而不是通过请求或拉的方式。

使用JMS主题,您可以从服务层一次发送信息到多个Flex客户端。在Flex客户端使用JMS主题的一个例子是用户上线。在这个例子中,当用户登录到该系统,服务层会经由JMS主题发送一个消息,告诉其他所有的Flex客户端该用户已上线。

JMS消息

JMS提供一种方式来构建信息,使得主题和队列使用起来更加容易。当编写Java代码时,JMS API提供的方法能以一致的方式来正确构建消息。而使用BlazeDS的一个好处是,您不必担心怎样创建正确的信息,BlazeDS会辅助实现这一点的。

JMS的实现

JMS的实现有若干,既有开源项目也有商业项目。本文使用的JMS实现是ActiveMQ,它是Apache软件基金会的一个开放源码的项目,以Apache许可证发布。ActiveMQ除了提供一个JMS的实现外,还提供使用其它编程语言发送消息的功能,包括PHP。其它的JMS实现还包括IBM的WebSphere MQ,JBoss的HornetQ(注:前身是JBoss Messaging,开放的消息队列),以及OpenJMS。

配置BlazeDS使用JMS

若要BlazeDS使用JMS,其配置包括:建立正确的端点(endpoint)并在终点(destination)配置中设置JMS属性。本文的示例用到了一个简单的Flex界面、BlazeDS库,以及ActiveMQ。

JMSAdapter

当消息到达MessageBroker Servlet后,BlazeDS使用JMSAdapter来完成Flex应用程序和JMS的实现之间的消息转换。JMSAdapter在messages-config.xml文件中配置:

<adapters>
	<adapter-definition id="actionscript" class="flex.messaging.services.messaging.adapters.ActionScriptAdapter" default="true" />
	<adapter-definition id="jms" class="flex.messaging.services.messaging.adapters.JMSAdapter" />
</adapters>

端点的不同

一定要记住,BlazeDS的Java库和MessageBroker servlet需要进行配置并运行于Web容器。由于MessageBroker必须处于运行状态,使用JMS适配器的终点(destination)仍要使用AMF(Adobe Action Message Format)通道。在本例中使用的端点是:

<?xml version="1.0" encoding="UTF-8"?>
<services-config>
   <services>
       <service-include file-path="messaging-config.xml" />
   </services>

   <security/>

   <channels>
      <channel-definition id="my-amf" class="mx.messaging.channels.AMFChannel">
         <endpoint url="http://{server.name}:8161/blazeds/messagebroker/amf" class="flex.messaging.endpoints.AMFEndpoint"/>
      </channel-definition>

      <channel-definition id="my-streaming-amf" class="mx.messaging.channels.StreamingAMFChannel">
         <endpoint url="http://{server.name}:8161/blazeds/messagebroker/streamingamf" class="flex.messaging.endpoints.StreamingAMFEndpoint"/>
      </channel-definition>

      <channel-definition id="my-polling-amf" class="mx.messaging.channels.AMFChannel">
         <endpoint url="http://{server.name}:8161/blazeds/messagebroker/amfpolling" class="flex.messaging.endpoints.AMFEndpoint"/>
         <properties>
            <polling-enabled>true</polling-enabled>
            <polling-interval-seconds>1</polling-interval-seconds>
         </properties>
      </channel-definition>
   </channels>
</services-config>

JMS属性

JMS需要一些自己的配置,比如连接工厂用于建立JMS连接的JNDI(Java命名和目录接口)。队列名或主题名也是使用JNDI进行解析的。JNDI是一个Java API,允许把资源的物理位置抽象成名字。JNDI的常见用法是数据库命名,应用程序使用此JNDI名获得数据库连接。

除了连接工厂、主题或队列的JNDI名,JMS变量还允许您定义用户证书、消息类型、终点类型,等等。下面展示的是在本例中使用的JMS属性:

<destination id="my-jms-destination">
   <properties>
      <jms>
         <destination-type>Topic</destination-type>
         <message-type>javax.jms.TextMessage</message-type>
         <connection-factory>ConnectionFactory</connection-factory>
         <destination-jndi-name>dynamicTopics/MyTopic</destination-jndi-name>
         <delivery-mode>NON_PERSISTENT</delivery-mode>
         <message-priority>DEFAULT_PRIORITY</message-priority>
         <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode>
         <initial-context-environment>
            <property>
               <name>Context.INITIAL_CONTEXT_FACTORY</name>
               <value>org.apache.activemq.jndi.ActiveMQInitialContextFactory</value>
            </property>
            <property>
               <name>Context.PROVIDER_URL</name>
               <value>tcp://localhost:61616</value>
            </property>
         </initial-context-environment>
      </jms>
   </properties>
   <channels>
      <channel ref="my-amf" />
   </channels>
   <adapter ref="jms" />
</destination>

您可以查看BlazeDS文档以获得进一步的属性解释。

从BlazeDS发送消息

要尝试本文的例子,请在创建您的测试Flex项目之前,按照下面的步骤行事。下载ActiveMQ二进制发布包并解压,您可以启动ActiveMQ并完成ActiveMQ文档中的验证步骤,但在继续下一步之前请先关闭ActiveMQ。

下载BlazeDS二进制发布包

首先要下载BlazeDS二进制发布包

将BlazeDS发布包blazeds.war解压到ActiveMQ的webapps目录下。这么做可保证您运行的例子中只使用了ActiveMQ。

在Jetty服务器中建立Web应用上下文,Jetty是一个Web容器,ActiveMQ二进制发布包自带了Jetty。如要增加上下文,需修改jetty.xml文件,它位于ActiveMQ目录下的conf目录:

<webAppContext contextPath="/blazeds" resourceBase="${activemq.base}/webapps/blazeds" logUrlOnStart="true"/> 

启动ActiveMQ,在浏览器地址栏输入http://localhost:8161/blazeds。应该可以看到BlazeDS的目录列表。这一步是验证你已经把BlazeDS WAR放在正确的位置,并配置正确。

创建一个Flex项目,它包含两个配置文件,配置文件位于src/WEB-INF/flex目录,分别为:messaging-config.xml和services-config.xml。使用文章前述的例子来设置JMS的终点,并添加JMSAdapter。

把Flex应用项目中的services-config.xml和messaging-config.xml文件复制到ActiveMQ的webapps/blazeds/WEB-INF/flex目录。

发送消息(示例)

为测试从Flex发送的消息,本文使用了ActiveMQ自带的命令行例子。为从Flex应用程序中发送一条消息到命令行客户端,您可以在Flex代码中建立一个生产者(producer),然后用不同的参数运行命令行例子,并查看结果。

您需要在Flex应用程序中创建一个生产者来发送消息。作为一个终点,这个新生产者将使用配置在messaging-config.xml文件中的my-jms-destination。下面这个例子是一个非常简单的带有消息生产者(producer)的Flex表单:

<?xml version="1.0" encoding="utf-8"?> 
<mx:Application xmlns:mx="http://www.adobe.com/2006/mxml" layout="absolute" initialize="consumer.subscribe()"> 
   <mx:Script> 
      <![CDATA[ 
         import mx.messaging.messages.IMessage; 
         import mx.messaging.messages.AsyncMessage; 
         import mx.messaging.events.MessageEvent; 
         private function sendMessage(value : String) : void 
         { 
            var message : IMessage = new AsyncMessage(); 
            message.body = value; 
            producer.send(message); 
          } 
      ]]> 
   </mx:Script> 
   <mx:Producer id="producer" 
      channelConnect="trace('producer connected')" 
        destination="my-jms-destination" 
        fault="trace(event.faultDetail)" 
   /> 
   <mx:VBox> 
      <mx:Form> 
         <mx:FormItem label="Message:"> 
         <mx:TextInput id="textInput" /> 
      </mx:FormItem> 
      </mx:Form> 
      <mx:Button id="sendButton" label="Send Message" click="sendMessage(textInput.text)" /> 
   </mx:VBox> 
</mx:Application> 

界面上有一个按钮sendButton,点击此按钮会调用sendMessage()函数。sendMessage()函数先创建一个AsyncMessage,其是IMessage接口的一个实现,然后使用producer的send()方法发送此消息。赋值给fault属性的trace()方法会把给定的信息打印到控制台,从而帮助您调试任何可能发生的问题。

验证消息:运行Flex应用程序;切换到命令行,运行ActiveMQ自带的消费者(consumer)实例,脚本如下:

$ ant consumer -Dtopic=true -Dsubject=MyTopic -Dmax=2 

如果你输入框中输入消息,然后点击发送消息按钮,该消息会出现在消费者端的命令行上。在收到两条消息后,消费者端的命令行例子会自动退出。

接收消息(示例)

要接收JMS消息,您需要构建一个使用JMS终点的消费者(consumer)。在例子中为简单起见,消费者重用了生产者用来发送消息的终点。创建消费者的代码大致如下:

<mx:Consumer id="consumer" channelConnect="trace('consumer connected')" 
   channelFault="trace(event.faultDetail)" 
   fault="trace(event.faultDetail)" 
   destination="my-jms-destination" 
   message="messageHandler(event)" /> 
... 
<mx:TextArea id="results" width="100%" /> 

在<mx:Script>代码块中处理消息的函数大致如下:

private function messageHandler(event : MessageEvent) : void { 
   results.text += event.message.body + "\n"; 
} 

添加了代码后,启动Flex应用程序。当Flex应用启动后,在命令行运行生产者的例子,脚本如下:

ant producer -Dtopic=true -Dsubject=MyTopic -Dmax=5 

消息将出现在Flex的文本框中。

通过REST服务与PHP集成

Web服务是集成PHP Web应用和Java的方法之一。REST Web服务在Java中作为Servlets以及在PHP中作为脚本实现都比较容易。

REST概述

REST Web服务比SOAP Web服务更为简单,因为REST Web服务没有一个标准的消息格式(例如,单个的网页可以是一个REST Web服务)。URL地址就是服务端点(endpoint),而返回的Web页面则是该服务的响应。

编写REST Web服务的一个优点是可被多种类型的客户端使用。REST Web服务示例如下,它作为一个Java Servlet,能被其它语言如PHP调用,而无需修改服务。

Java Servlet示例

本例使用了一个简单的Java Servlet来作为REST Web服务。此Servlet处理PUT请求,从PUT请求的内容和各种参数中得到消息,例如:得到目标对象(JMS终点的名字),此消息是否是一个主题, JMS连接使用的URL等。

package com.example.servlets;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URLDecoder;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;

public class MessageService extends HttpServlet {
    private static final long serialVersionUID = -8129137144911681674L;

    private Destination destination;
    private String user = ActiveMQConnection.DEFAULT_USER;
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;

    @Override
    protected void doPut(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        Connection connection = null;            

        try {
            boolean isTopic = "true".equals(request.getParameter("topic"));
            boolean isPersistent = "true".equals(request.getParameter("persistent"));
            String subject = request.getParameter("subject");
            String url = URLDecoder.decode(request.getParameter("url"), "UTF-8");
            String messageText = "";

            BufferedReader buffer = new BufferedReader(new InputStreamReader(request.getInputStream()));

            messageText = buffer.readLine();

            System.out.println("Using URL:  <" + url + ">");
            System.out.println("Using Subject:  <" + subject + ">");
            System.out.println("Sending Message Text:  <" + messageText + ">");

            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
            connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            if (isTopic) {
                destination = session.createTopic(subject);
            } else {
                destination = session.createQueue(subject);
            }

            // Create the producer.
            MessageProducer producer = session.createProducer(destination);
            if (isPersistent) {
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            } else {
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            }

            TextMessage message = session.createTextMessage(messageText);

            producer.send(message);

            System.out.println("Done.");

            // Use the ActiveMQConnection interface to dump the connection
            // stats.
            ActiveMQConnection c = (ActiveMQConnection)connection;
            c.getConnectionStats().dump(new IndentPrinter());

        } catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        } finally {
            try {
                connection.close();
            } catch (Throwable ignore) {
            }
        }

    }

}

除了用ActiveMQ特定类,你还可以使用JNDI来写代码。对于想了解使用JNDI来获得连接的更多信息,请参阅ActiveMQ文档

编译后的Servlet类文件(比如前面的MessageService类)必须放在ActiveMQ的webapps目录下。要添加新目录,则需要在conf目录的jetty.xml配置文件中添加入口(与前面的步骤4相似)。

PHP示例

PHP脚本使用CURL库(译者注:原文的CURL库的链接有错)调用REST服务,将数据发送到REST服务的URL(由Java Servlet处理)。如果您的PHP版本没有包含CURL库,您可以使用Ajax来调用REST Web服务。在Web应用的实际产品中,Ajax可能会提供更好的用户体验。  

PHP脚本包含一个表单来提交自身。当提交表单时,该脚本从文本框取得消息,并把消息作为数据发送到REST服务的URL。请看下面的例子:

<?php 
   if (isset($_POST['submit'])) { 
      $url = 'http://localhost:8161/messageservice/?url=tcp://localhost:61616&subject=MyTopic&topic=true'; 
      function doPut($ch, $a_data) { 
         curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'PUT'); 
         curl_setopt($ch, CURLOPT_HTTPHEADER, array('Content-Length: '.strlen($a_data))); 
         curl_setopt($ch, CURLOPT_POSTFIELDS, $a_data); 
         return curl_exec($ch); 
      } 
      $data = $_POST['message']; 
      $ch = curl_init(); 
      curl_setopt($ch, CURLOPT_URL, $url); 
      $response = doPut($ch, $data); 
      echo 'Message sent'; 
   } else { 
      ?> 
         <html><head><title>Post a message</title></head> 
         <body> 
         <form action="<?php echo $_SERVER['PHP_SELF'];?>" method="post"> 
         <p>
            <label for="message">Message: 
               <input type="text" id="message" name="message" /> 
            </label>
         </p> 
         <input type="submit" value="submit" name="submit" id="submit" /> 
         </form> 
         </body> 
      </html> 
      <?php 
   } 
?> 

在URL上传递的JMS服务器的topic、subject和URL信息都赋值给了$url。

用BlazeDS发送/接收的示例

要想看BlazeDS发送/接收的实例,启动Flex应用程序。当应用程序打开,在不同的浏览器上打开PHP脚本,给消息框添加一个值,然后单击提交。消息框的值将会出现在Flex应用的文本框中。

小结

JMS是一个消息传递服务,支持主题和队列两种模式以及其它的很多特征,是可靠消息传递的不错选择。 BlazeDS可以让您只需做少量工作即可完成从Flex客户端向JMS发送消息。

PHP应用程序可以以多种方式连接到JMS。使用REST Web服务是把PHP和Java应用程序整合在一起的一种方式——REST Web服务提供了一种相对简单的接口供客户端使用。使用Java实现REST服务,您可以利用现有的Java代码来发送JMS消息。

本系列的第2部分主要讲通过JMS整合PHP和Flex应用的其它方法:PHP / Java桥和STOMP。

译者注:STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。

查看英文原文:BlazeDS and JMS for PHP Developers, Part 1


译者简介:李强,计算机硕士,毕业于电子科技大学,目前在四川长虹电器股份有限公司技术中心从事研发工作,主要研究领域是SOA、ESB、Web服务以及分布式应用等。

感谢宋玮对本文的审校。

给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家加入到InfoQ中文站用户讨论组中与我们的编辑和其他读者朋友交流。

评价本文

专业度
风格

您好,朋友!

您需要 注册一个InfoQ账号 或者 才能进行评论。在您完成注册后还需要进行一些设置。

获得来自InfoQ的更多体验。

告诉我们您的想法

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我
社区评论

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

讨论

登陆InfoQ,与你最关心的话题互动。


找回密码....

Follow

关注你最喜爱的话题和作者

快速浏览网站内你所感兴趣话题的精选内容。

Like

内容自由定制

选择想要阅读的主题和喜爱的作者定制自己的新闻源。

Notifications

获取更新

设置通知机制以获取内容更新对您而言是否重要

BT