原文:Publish\Subscribe Messaging with Flex and Rails using Apache ActiveMQ, ActiveMessaging, and STOMP
这是一篇综合性应用很强的文章,介绍如何在Rails和flex的基础上,使用ActiveMQ,Rails上的ActiveMessaging插件,和STOMP协议,创建一个发布/订阅的通信模式。这里使用了一个例子来讲解,flex客户端/终端(consumer)接受Rails服务端发送的消息(messages)。
里克:其实这就是用Rails和flex来实现java上的JMS。java上的东西好久没有看过了。所以在继续学习下面的东西,最好还是看看《JMS简介》吧。
JMS(Java Message Service)是访问企业消息系统的标准API,它便于消息系
统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
下面介绍一下本文要使用的各种技术:
Apache ActiveMQ
Apache ActiveMQ受欢迎的,强大的通信代理[Message Broker]。
它快速,支持多种语言的客户端和协议,拥有许多高级特性,并完全支持JMS 1.1 和 J2EE 1.4。
Apache ActiveMQ 在Apache 2.0 License下发布。
ActiveMessaging
ActiveMessaging将Rails带入到消息驱动开发[Messaging]中。消息驱动,或叫事件驱动结构[event-driven architecture],广泛的应用于企业级集成。这下面的框架包括java的JMS,产品包括ActiveMQ,Tibco,IBM MQSeries等。
STOMP
ActiveMessaging支持Stomp协议。这是一个通过无线连接,使用TCP/IP,实现文本型消息流式传输的协议标准。其设计理念要求尽可能将协议简化,以求无论利用哪种语言开发客户端或服务端,都能相当轻松(比如Ruby Stomp客户端代码,连带注释和空行,总行数仅约400)。因此,它几乎可以集成任何系统。[摘自《Rails ActiveMessaging入门》]
需要
- Ruby 1.8.6
- Rails 1.2.3
- ava 1.5.0_07+
- MySQL(或其他支持Rails Migrations的数据库)
源文件
创建Rails应用
里克:创建一个叫sales_report的应用,具体的创建可以参照源码,或者按照自己的喜好啦。
主要的是:
script/generate model sale
001_create_sales.rb的代码:
def self.up
create_table :sales do |t|
t.column :customer, :string
t.column :product, :string
t.column :quantity, :integer
end
end
创建一个controller
script/generate controller sales index
sales_controller.rb代码
class SalesController < ApplicationController scaffold :sale end
下面要开始ActiveMessaging的部分了。
$ gem install daemons
$ gem install stomp
$ script/plugin install
http://activemessaging.googlecode.com/svn/trunk/plugins/activemessaging
来创建一个ActiveMessaging 的processor,其实本案并不需要一个proccessor,但是它会为我们创建一些其他的文件。
$ script/generate processor sale
create app/processors
create app/processors/sale_processor.rb
create test/functional/sale_processor_test.rb
create config/messaging.rb
create config/broker.yml
create app/processors/application.rb
create script/poller
打开config/broker.yml ,编辑stomp adapter
development: adapter: stomplogin: "" passcode: "" host: localhost port: 61613 reliable: false
打开config/messaging.rb 文件
ActiveMessaging::Gateway.define do |s| s.destination :sale_queue, '/queue/Sale' end
下面我们对sale model 创建一个Observer。
$ script/generate observer sale
require 'activemessaging/processor'
class SaleObserver < ActiveRecord::Observer
include ActiveMessaging::MessageSender
publishes_to :sale_queue
def after_save(sale)
record = sale.to_xml
publish :sale_queue, record
end
end
这个类监视着sale model,当保存一条记录时,after_save方法被调用,这个方法将sale的AR实例转换成xml格式,发送到sale_queue中。
最后,启动我们的rails服务。
$ script/server
安装 ActiveMQ
flex应用和STOMP AS3客户端
使flex和ActiveMQ 通信,需要使用STOMP 协议,这有篇文章:ActionScript 3 STOMP client,这里我们需要做的是用flex通过STOMP连接上ActiveMQ,并能解释[consume]rails发布的xml文件。
这是重要的一段代码:
private var sales : ArrayCollection = new ArrayCollection();
private var stomp : STOMPClient = new STOMPClient();
private var queue : String = "/queue/Sale";
private function init () : void
{
stomp.connect("localhost", 61613);
stomp.subscribe( queue );
stomp.addEventListener(MessageEvent.MESSAGE, handleMessages);
stomp.addEventListener(ReceiptEvent.RECIEPT, handleReceipts);
stomp.addEventListener(STOMPErrorEvent.ERROR, handleErrors);
}
private function handleMessages(event : MessageEvent) : void
{
var incomingMsg : XML = XML(event.message.body);
var processedSale : ObjectProxy = simplerXMLDecoder(incomingMsg);
orders.addItem(processedSale);
}
private function handleReceipts (event : ReceiptEvent) : void
{
trace ("Got receipt: " + event.receiptID)
}
private function handleErrors (event : STOMPErrorEvent) : void
{
trace ("Error: " + event.error.body)
}
private function simplerXMLDecoder (x : XML) : ObjectProxy
{
var xdoc : XMLDocument = new XMLDocument();
xdoc.ignoreWhite = true;
xdoc.parseXML(x.toXMLString());
var decoder : SimpleXMLDecoder = new SimpleXMLDecoder(true);
return decoder.decodeXML(XMLNode(xdoc.firstChild)) as ObjectProxy;
}
代码解释:
1、在开始的时候,我们创建了一个STOMPClient
2、接下来我们定义了queue,这是我们的订阅地址,就是在messaging.rb中定义的那个。
3、在这个应用加载的时候,init()方法会自动调用,它会自动连接STOMP代理(ActiveMQ),然后订阅[subscribe]这个地址。
4、这样我们就建立了一个检查来自ActiveMQ信息的监听。
所有实际的action操作,都是传递到handleMessages方法里:
1、当Rails应用在产生并发送一个xml是调用handleMessages 方法
2、当接受到一个新信息时,我们对这个信息体[body of the message](实际上是rails发送过来的xml),使用XMLDecoder 类转化成一个可绑定对象[bindable ObjectProxy]。
3、最后,一个新的销售记录增加到销售集合中,并展示在flex的数据表格中。
运行一下试试
http://localhost:3000/sales/new
当一个销售记录保存到数据库后,SaleObserver 发布这个消息,ActiveMQ 将这个发布给flex应用。
当把你的init()方法改成下面的样子的时候
private function init () : void
{
var ch : ConnectHeaders = new ConnectHeaders();
ch.clientID = "MYTOTALLYUNIQUECLIENTID";
stomp.connect("localhost", 61613, ch);
var sh : SubscribeHeaders = new SubscribeHeaders();
sh.amqSubscriptionName = "MYSUBSCRIPTION";
stomp.subscribe( queue, sh );
stomp.addEventListener(MessageEvent.MESSAGE, handleMessages);
stomp.addEventListener(ReceiptEvent.RECIEPT, handleReceipts);
stomp.addEventListener(STOMPErrorEvent.ERROR, handleErrors);
}
当连接的时候增加一个clientID和在订阅的时候增加一个subscriptionName,我们创建了一个永久的订阅者[durable subscriber]。
好了,可以再次运行看一下效果了。
本文中文翻译:里克,guxing203 at gmail dot com,http://railser.cn
参考资料:
Apache Geronimo 的 JMS 实现:ActiveMQ
ActiveMessaging is a messaging framework for Ruby and Ruby on Rails



为什么FLASH是在3000端口加载的,而按道理它是不能从66133端口加载数据的.
这个问题你以前没有遇到吗?
sorry,请google吧。也欢迎将经验发表在这里