在Flex和Rails上实现发布/订阅的消息通信

原文:Publish\Subscribe Messaging with Flex and Rails using Apache ActiveMQ, ActiveMessaging, and STOMP

这是一篇综合性应用很强的文章,介绍如何在Railsflex的基础上,使用ActiveMQ,Rails上的ActiveMessaging插件,和STOMP协议,创建一个发布/订阅的通信模式。这里使用了一个例子来讲解,flex客户端/终端(consumer)接受Rails服务端发送的消息(messages)。

里克:其实这就是用Rails和flex来实现java上的JMS。java上的东西好久没有看过了。所以在继续学习下面的东西,最好还是看看《JMS简介》吧。
JMS是访问企业消息系统的标准API,它便于消息系
统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
下面介绍一下本文要使用的各种技术:
h3. Apache ActiveMQ

Apache ActiveMQ受欢迎的,强大的通信代理[Message Broker]。
它快速,支持多种语言的客户端和协议,拥有许多高级特性,并完全支持JMS 1.1 和 J2EE 1.4。
Apache ActiveMQ 在Apache 2.0 License下发布。
h3. ActiveMessaging

ActiveMessaging将Rails带入到消息驱动开发[Messaging]中。消息驱动,或叫事件驱动结构[event-driven architecture],广泛的应用于企业级集成。这下面的框架包括java的JMS,产品包括ActiveMQ,Tibco,IBM MQSeries等。
h3. STOMP

ActiveMessaging支持Stomp协议。这是一个通过无线连接,使用TCP/IP,实现文本型消息流式传输的协议标准。其设计理念要求尽可能将协议简化,以求无论利用哪种语言开发客户端或服务端,都能相当轻松(比如Ruby Stomp客户端代码,连带注释和空行,总行数仅约400)。因此,它几乎可以集成任何系统。[摘自《Rails ActiveMessaging入门》]
h3. 需要

  • Ruby 1.8.6
  • Rails 1.2.3
  • ava 1.5.0_07+
  • MySQL(或其他支持Rails Migrations的数据库)
  1. 源文件
  1. 创建Rails应用

里克:创建一个叫sales_report的应用,具体的创建可以参照源码,或者按照自己的喜好啦。

主要的是:
script/generate model sale
001_create_sales.rb的代码:

  def self.up
    create_table :sales do |t|
      t.column :customer, :string
      t.column :product, :string
      t.column :quantity, :integer
    end
  end

创建一个controller
script/generate controller sales index
sales_controller.rb代码

class SalesController < ApplicationController
  scaffold :sale
end

下面要开始ActiveMessaging的部分了。
$ gem install daemons
$ gem install stomp
$ script/plugin install
http://activemessaging.googlecode.com/svn/trunk/plugins/activemessaging

来创建一个ActiveMessaging 的processor,其实本案并不需要一个proccessor,但是它会为我们创建一些其他的文件。
$ script/generate processor sale
create app/processors
create app/processors/sale_processor.rb
create test/functional/sale_processor_test.rb
create config/messaging.rb
create config/broker.yml
create app/processors/application.rb
create script/poller

打开config/broker.yml ,编辑stomp adapter


development:
adapter:
stomplogin: ""
passcode: ""
host: localhost
port: 61613
reliable: false

打开config/messaging.rb 文件

ActiveMessaging::Gateway.define do |s|
  s.destination :sale_queue, '/queue/Sale'
end

下面我们对sale model 创建一个Observer
$ script/generate observer sale

require 'activemessaging/processor'
class SaleObserver < ActiveRecord::Observer
  include ActiveMessaging::MessageSender
  publishes_to :sale_queue

  def after_save(sale)
    record = sale.to_xml
    publish :sale_queue, record
  end
end


这个类监视着sale model,当保存一条记录时,after_save方法被调用,这个方法将sale的AR实例转换成xml格式,发送到sale_queue中。
最后,启动我们的rails服务。
$ script/server
h3. 安装 ActiveMQ

很简单
1、先看看这个
2、这有其他的一些介绍
h3. flex应用和STOMP AS3客户端

使flex和ActiveMQ 通信,需要使用STOMP 协议,这有篇文章:ActionScript 3 STOMP client,这里我们需要做的是用flex通过STOMP连接上ActiveMQ,并能解释[consume]rails发布的xml文件。

这是重要的一段代码:

private var sales : ArrayCollection = new ArrayCollection();

private var stomp : STOMPClient = new STOMPClient();
private var queue : String = "/queue/Sale";

private function init () : void
{
	stomp.connect("localhost", 61613);
	stomp.subscribe( queue );

	stomp.addEventListener(MessageEvent.MESSAGE, handleMessages);
	stomp.addEventListener(ReceiptEvent.RECIEPT, handleReceipts);
	stomp.addEventListener(STOMPErrorEvent.ERROR, handleErrors);

}

private function handleMessages(event : MessageEvent) : void
{
	var incomingMsg : XML = XML(event.message.body);
	var processedSale : ObjectProxy = simplerXMLDecoder(incomingMsg);
	orders.addItem(processedSale);
}

private function handleReceipts (event : ReceiptEvent) : void
{
	trace ("Got receipt: " + event.receiptID)
}
private function handleErrors (event : STOMPErrorEvent) : void
{
	trace ("Error: " + event.error.body)
}

private function simplerXMLDecoder (x : XML) : ObjectProxy
{
	var xdoc : XMLDocument =  new XMLDocument();
	xdoc.ignoreWhite = true;
	xdoc.parseXML(x.toXMLString());
	var decoder : SimpleXMLDecoder =  new SimpleXMLDecoder(true);
	return decoder.decodeXML(XMLNode(xdoc.firstChild)) as ObjectProxy;
}

代码解释:
1、在开始的时候,我们创建了一个STOMPClient
2、接下来我们定义了queue,这是我们的订阅地址,就是在messaging.rb中定义的那个。
3、在这个应用加载的时候,init()方法会自动调用,它会自动连接STOMP代理(ActiveMQ),然后订阅[subscribe]这个地址。
4、这样我们就建立了一个检查来自ActiveMQ信息的监听。

所有实际的action操作,都是传递到handleMessages方法里:
1、当Rails应用在产生并发送一个xml是调用handleMessages 方法
2、当接受到一个新信息时,我们对这个信息体[body of the message](实际上是rails发送过来的xml),使用XMLDecoder 类转化成一个可绑定对象[bindable ObjectProxy]。
3、最后,一个新的销售记录增加到销售集合中,并展示在flex的数据表格中。
h3. 运行一下试试

http://localhost:3000/sales/new
rails_sales_report.png

在你的flex中可以看到
flex_sales_report.png

当一个销售记录保存到数据库后,SaleObserver 发布这个消息,ActiveMQ 将这个发布给flex应用。

当把你的init()方法改成下面的样子的时候

private function init () : void
{
        var ch : ConnectHeaders =  new ConnectHeaders();
	ch.clientID = "MYTOTALLYUNIQUECLIENTID";
	stomp.connect("localhost", 61613, ch);

	var sh : SubscribeHeaders = new SubscribeHeaders();
	sh.amqSubscriptionName = "MYSUBSCRIPTION";
	stomp.subscribe( queue, sh );

	stomp.addEventListener(MessageEvent.MESSAGE, handleMessages);
	stomp.addEventListener(ReceiptEvent.RECIEPT, handleReceipts);
	stomp.addEventListener(STOMPErrorEvent.ERROR, handleErrors);
}

当连接的时候增加一个clientID和在订阅的时候增加一个subscriptionName,我们创建了一个永久的订阅者[durable subscriber]。
好了,可以再次运行看一下效果了。

本文中文翻译:里克

参考资料:
Apache Geronimo 的 JMS 实现:ActiveMQ

ActiveMQ 实践之路(一) 启动你的ActiveMQ

http://activemq.apache.org/

ActiveMessaging is a messaging framework for Ruby and Ruby on Rails

Rails ActiveMessaging入门


  • 2016年目标:工作拿出成绩,写知乎专栏,写自习室公众号,写《Rails 实践》第二版。

  • hi,我是里克,这是我的技术博客,从2007年开始,我在这里记录工作中的点点滴滴,同时,它也让我认识了很多的朋友。我相信,『坚持』就会有好运。

  • 我的邮箱:hi(at)liwei.me

  • 《Rails 实践》rails-practice.com

  • 简书主页

  • 知乎主页

公众号
Tags
rss