BT

如何利用碎片时间提升技术认知与能力? 点击获取答案

基于AWS技术实现发布/订阅服务

| 作者 Boris Lublinsky 关注 1 他的粉丝 ,译者 王丽娟 关注 0 他的粉丝 发布于 2013年8月21日. 估计阅读时间: 54 分钟 | ArchSummit北京2018 共同探讨机器学习、信息安全、微服务治理的关键点

AWS提供两种服务——Amazon简单通知服务(Simple Notification Service)和Amazon简单队列服务(Simple Queue Service),两者结合起来可以为完整的发布/订阅服务提供支撑。

现有的AWS功能

Amazon简单通知服务(Amazon SNS)是一个Web服务,能让应用、最终用户和设备立即从云端发送和接收通知。简化的SNS架构如下图所示(图1):

(点击查看大图)

图1:Amazon SNS的基础架构

多个发布应用和多个订阅应用可以将SNS主题作为中介互相通讯。这样实现的优点是发布者和订阅者不需要知道对方,因此,应用可以完全动态地进行集成。SNS支持用多种传输协议传递通知,包括HTTP、HTTPS、Email、SMS和Amazon简单队列(Simple Queue)。

Amazon简单队列服务(Amazon SQS)提供可靠、可伸缩的托管队列,用来存储计算机之间传输的消息。使用Amazon SQS,你可以在执行不同任务的应用分布式组件之间移动数据,而不会丢失消息,也不必要求每个组件始终都是可用的。SQS和SNS结合起来会带来两个额外的优势——解除时间上的耦合度,根据消费应用特定的情况提供负载均衡——这是SNS无法单独提供的。要做到第二个附加优势,需要同一个应用的多个实例从同一个队列里读取消息。下图展示了SNS和SQS结合的总体架构(图2)。其中的一个订阅应用显示为负载均衡的。

(点击查看大图)

图2:结合SNS和SQS

这个实现的主要缺点是,发布者和订阅者需要明确统一SNS主题的名称。此外,如果一个特定的消费者想从多个主题获取信息,那他需要把队列注册到多个主题上。

期望中的发布/订阅实现

这个问题的典型解决方案是采用基于树的主题组织,大部分发布/订阅引擎都是这样实现的。OASIS规范的Web Services Topics 1.3概述了这种组织的主要原则。

这个规范将主题定义为:

“……主题是一组通知的组织和分类方式。主题机制为订阅者推断出感兴趣的通知提供了便捷的方式……发布者可以将通知发布和一或多个主题关联起来。当订阅者创建订阅的时候,可以提供一个主题的过滤器表达式,将订阅和一或多个主题关联起来……每个主题都可以有零或多个子主题,子主题本身也可以进一步包含子主题。没有“父亲”的主题叫根主题。特定的根主题和它所有的后代会形成一个层次结构(称为主题树)。”

下面是手机销售的一个主题树例子(图3)。

图3:主题树示例

主题树的根表示销售。销售可以按区域细分(在我们的例子中有北美、欧洲和亚太地区)。特定区域的销售还可以按照手机类型进一步细分,依此类推。

在发布/订阅系统中,这样的结构之所以重要是因为树反映了数据的组织。如果消费者对北美的智能手机销售感兴趣,他可以监听这个特定的主题。如果他对北美所有的销售都感兴趣,那他就可以监听北美的主题,从子主题获取所有的通知。

当然,这种方法并不能解决所有的问题。比如说,如果消费者想监听所有智能手机销售的事件,他就需要明确订阅所有地区的智能手机销售事件。这种情况通常是主题树设计的问题。树的设计基于信息的组织和典型的使用模式。在某些情况下,会设计多个主题来满足不同的内部需求(参见Web Services Topics 1.3里的主题命名空间)。发布/订阅架构的另一个重要特性就是基于内容的消息过滤

“在基于内容的系统中,如果消息的属性或内容与订阅者定义的约束相匹配,消息就只会传递给这个订阅者。订阅者负责消息的分类。”

换句话说,订阅者在这种情况下可以使用正则表达式列表,明确指定他们感兴趣的消息内容。

把这种过滤和结构化的主题结构结合起来,可以创建出非常灵活和强大的发布/订阅实现。

我们将在本文中展示如何用AWS组件轻松构建这类系统。

发布/订阅架构建议

建议给大家的架构如下图所示(图4)。在这个架构中,发布/订阅服务器的实现是一个Tomcat容器里运行的Web应用。我们还充分利用了AWS的弹性负载均衡器(Elastic Load Balancer),它可以根据当前的负载动态扩展或缩减发布/订阅服务器集群的大小。此外,架构还用关系型数据服务(Relational Data Service)存储当前的配置,以便动态新增发布/订阅实例。为了提高整体性能,我们在内存里保留了当前的拓扑结构,尽量减少数据库访问的次数。这样的话,实际的消息路由会非常迅速。这个解决方案需要一种机制,能在拓扑结构发生变化的时候去通知所有的服务器(因为任何服务器都能处理负载均衡器)。Amazon SNS能轻而易举地做到这一点。最后,我们用Amazon SQS将通知分发给消费者。需要注意的是,一个消费者可以监听多个队列。

(点击查看大图)

图4:整体架构建议

发布/订阅服务器

这个实现的核心是一个自定义的发布/订阅服务器。服务器实现包括三个主要的层——持久化、域和服务。

持久化

服务器持久化层采用JPA 2.0实现,定义了三个主要的实体——主题、订阅和语义过滤器。

主题实体(清单1)描述了特定主题要存储的相关信息,包括主题ID(数据库的内部ID)、主题名称(标识主题的字符串)、一个布尔变量(定义该主题是否是个根主题)、到父主题和孩子主题的引用(以便对主题层次结构进行遍历),以及与给定主题关联的订阅列表。

@Entity
@NamedQueries({
    @NamedQuery(name="Topic.RootTopics",
                    query="SELECT t FROM Topic t where t.root='true'"),
    @NamedQuery(name="Topic.AllTopics",
	                     query="SELECT t FROM Topic t")
})
@Table(name = "Topic")
public class Topic {

	@Id @GeneratedValue(strategy=GenerationType.IDENTITY)
	private long id;    // 自动生成的ID

	@Column(name = "name",nullable = false, length = 32)
	private String name;                          // 主题名称
  
	@Column(name = "root",nullable = false)
	private Boolean root = false;           // 根主题标识    

	@ManyToOne(fetch=FetchType.LAZY)
	@JoinColumn(name="TOPIC_ID")
	private Topic parent;

	@OneToMany(mappedBy="parent",cascade=CascadeType.ALL,orphanRemoval=true)
	private List<Topic> children; 
	
	@OneToMany(mappedBy="topic",cascade=CascadeType.ALL,orphanRemoval=true) 
	private List<Subscription> subscriptions; 
	………………………………………………………………………………………………

清单1:主题实体

我们定义了两个命名的查询,用来访问主题:RootTopics获取从根开始的主题结构,AllTopics获取所有现有的主题。

这个实体提供了一个完整的主题定义,也可以支持多个主题树(而不是实现示例的一部分)。

订阅实体(清单2)描述了订阅相关的信息,包括订阅ID(数据库的内部ID)、队列名称(SQS队列的ARN,ARN即Amazon Resource Name)、对订阅关联主题的引用,还有一个语义过滤器列表。只有所有的过滤器都接受消息(见下文),通知才会分发给给定的队列(客户端)。如果通知不包含语义过滤器,那来自于关联主题的所有消息都会直接传递给队列。

@Entity
@NamedQueries({
    @NamedQuery(name="Subscription.AllSubscriptions",
	                    query="SELECT s FROM Subscription s")
})
@Table(name = "Subscription")
public class Subscription {

       @Id @GeneratedValue(strategy=GenerationType.IDENTITY)
       private long id;    // 自动生成的ID

       @Column(name = "queue",nullable = false, length = 128)
       private String queue;
 
       @ManyToOne(fetch=FetchType.LAZY)
       @JoinColumn(name="TOPIC_ID")
       private Topic topic;
    
       @OneToMany(mappedBy="subscription",
         	                        cascade=CascadeType.ALL,orphanRemoval=true)
       private List<SemanticFilter> filters;   
       ……………………………………………………………………………………

清单2:订阅实体

我们还定义了一个命名的查询,获得所有存在的订阅。

最后,语义过滤器实体(清单3)描述了特定语义过滤器的信息,包括语义过滤器ID(数据库的内部ID)、该语义过滤器测试的属性名称、使用的正则表达式,以及对语义过滤器关联订阅的引用。

@Entity
@NamedQueries({
    @NamedQuery(name="SemanticFilter.AllSemanticFilters",
	                    query="SELECT sf FROM SemanticFilter sf")
})
@Table(name = "Filter")
public class SemanticFilter {

       @Id @GeneratedValue(strategy=GenerationType.IDENTITY)
       private long id;    // 自动生成的ID
    
       @Column(name = "attribute",nullable = false, length = 32)
       private String attribute;                     // 属性名称

       @Column(name = "filter",nullable = false, length = 128)
       private String filter;                       // 正则表达式过滤器

       @ManyToOne(fetch=FetchType.LAZY)
       @JoinColumn(name="SUBSCRIPTION_ID")
       private Subscription subscription;
       …………………………………………………………………… 

清单3:语义过滤器实体

我们一样定义一个命名的查询,用来获取所有现有的语义过滤器。

除了实体,持久化层还包含一个持久化管理类,负责:

管理数据库访问和事务

从数据库读取、写入对象

对域对象(见下文)和持久化实体进行相互转换

发送拓扑结构变化的通知

域模型

域模型对象的主要职责是支持服务操作,包括数据的订阅和发布,并把通知真正发布到订阅的队列上。在这个简单的实现里,域模型和持久化模型是合在一起的,但为了阐述得更清楚,我们分开介绍。这两层的数据模型是一样的,但域对象会多一些明确支持发布/订阅实现的方法。

过滤器处理的实现(清单4)利用了Java String里对正则表达式处理的内置支持

	public boolean accept(String value){
	       if(value == null)
	              return false;
	       return value.matches(_pattern);
	}

清单4:过滤器处理方法

发布实现(清单5)是订阅类的一个方法。请注意,这个方法对语义过滤器进行了或操作。如果给定的客户端能有多个订阅,或者对订阅实现进行扩展、让它支持Boolean函数,那就可以突破这个限制了。

public void publish(Map<String, String> attributes, String message){
    
    if((_filters != null) && (_filters.size() > 0)){
        for(DomainSemanticFilter f : _filters){
            String av = attributes.get(f.getField());
            if(av == null)
                return;
            if(!f.accept(av))
                return;
        }
    }
    SQSPublisher.getPublisher().sendMessage(_queue, message);
}

清单5:发布实现

这个实现利用了基于现有AWS Java API的SQSPublisher类(清单6)。

import java.io.IOException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.DeleteQueueRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;

public class SQSPublisher {

	 private static SQSPublisher _publisher;
	  
	 private AmazonSQSClient _sqs;    
	    
	 private SQSPublisher()throws IOException {
	           AWSCredentials credentials = new PropertiesCredentials(
	                this.getClass().getClassLoader().
getResourceAsStream("AwsCredentials.properties"));
	           _sqs = new AmazonSQSClient(credentials);
	 }

	 public String createQueue(String name){
	          CreateQueueRequest request = new CreateQueueRequest(name);
	          return _sqs.createQueue(request).getQueueUrl();
	 }

	 public void sendMessage(String queueURL, String message){
	          SendMessageRequest request = new SendMessageRequest(queueURL, 
message);
	          _sqs.sendMessage(request);

	 }

         public void deleteQueue(String queueURL){
	          DeleteQueueRequest request = new DeleteQueueRequest(queueURL);
	          _sqs.deleteQueue(request);
	 }

	 public static synchronized SQSPublisher getPublisher(){
	          if(_publisher == null)
	                   try {
	                              _publisher = new SQSPublisher();
	                   }catch (IOException e) {
	                            e.printStackTrace();
	                   }
	          return _publisher;
	 }

}

清单6:SQS发布者

订阅者可以利用这个类的其他方法创建/销毁SQS队列。

除了SQS队列,我们的实现还利用SNS进行数据库变化的同步。与SNS的交互由SNSPubSub类实现(清单7),这个实现也利用了AWS SNS Java API。

import java.io.IOException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sns.model.UnsubscribeRequest;

public class SNSPubSub {

	  private static SNSPubSub _topicPublisher;
	  private static String _topicARN;
	  private static String _endpoint;
	    
	  private AmazonSNSClient _sns;
	  private String _protocol = "http";
	  private String _subscriptionARN;
	    
	  private SNSPubSub()throws IOException {
	            AWSCredentials credentials = new PropertiesCredentials(
	                this.getClass().getClassLoader().
getResourceAsStream("AwsCredentials.properties"));
	            _sns = new AmazonSNSClient(credentials);
	  }

	  public void publish(String message){
	            PublishRequest request = new PublishRequest(_topicARN, message);
	            _sns.publish(request);
	  }
	  
	  public void subscribe(){
	            SubscribeRequest request = new SubscribeRequest
(_topicARN, _protocol, _endpoint);
	            _sns.subscribe(request);
	  }
	    
	  public void confirmSubscription(String token){
	            ConfirmSubscriptionRequest request = new
ConfirmSubscriptionRequest(_topicARN, token);          ConfirmSubscriptionResult result = _sns
.confirmSubscription(request);          _subscriptionARN = result.getSubscriptionArn();   }        public void unSubscribe(){          if(_subscribed){              UnsubscribeRequest request = new UnsubscribeRequest(_subscriptionARN);              _sns
.unsubscribe(request);          }   }        public static void configureSNS(String topicARN, String endpoint){          _topicARN = topicARN;        _endpoint = endpoint;   }          public static synchronized SNSPubSub getSNS(){          if(_topicPublisher == null){              try{                 _topicPublisher = new SNSPubSub();              }              catch(Exception e){                 e.printStackTrace();              }          }          return _topicPublisher;   } }

清单7:SNS Pub/Sub

使用SNS

使用SNS的时候要谨记:订阅主题并不意味着你已经准备好监听主题。SNS订阅的过程包含两个步骤。向SNS发送订阅请求时,SNS返回的响应表明确认订阅的必要性。这正是清单8既有subscribe方法又有confirmSubscription方法的原因。

<xsd:complextype name="NotificationType">
    <xsd:sequence>
        <xsd:element name="Type" type="xsd:string" />
        <xsd:element name="MessageId" type="xsd:string" />
        <xsd:element name="Token" type="xsd:string" minoccurs="0" />
        <xsd:element name="TopicArn" type="xsd:string" />
        <xsd:element name="Message" type="xsd:string" />
        <xsd:element name="SubscribeURL" type="xsd:string" minoccurs="0" />
        <xsd:element name="Timestamp" type="xsd:string" />
        <xsd:element name="SignatureVersion" type="xsd:string" />
        <xsd:element name="Signature" type="xsd:string" />
        <xsd:element name="SigningCertURL" type="xsd:string" />
        <xsd:element name="UnsubscribeURL" type="xsd:string" minoccurs="0" />
    </xsd:sequence>
</xsd:complextype>

上面的Schema描述了两种消息类型——确认请求和实际的通知。两种类型通过Type元素进行区分。如果元素值是“SubscriptionConfirmation”,那它就是订阅确认的请求,如果是“Notification”,就表明是个真正的通知。

主题类实现了两个方法(清单8),以便支持发布。

public void publish(Map<String, String> attributes, String message){
    
    if(_subscriptions == null)
        return;
    for(DomainSubscription ds : _subscriptions)
        ds.publish(attributes, message);
}
    
public void processPublications(List<DomainTopic> tList, StringTokenizer st) throws PublicationException{
    
    tList.add(this);
    if(!st.hasMoreTokens())
        return;
    String topic = st.nextToken();
    for(DomainTopic dt : _children){
        if(topic.equalsIgnoreCase(dt.getName())){
            dt.processPublications(tList, st);
            return;
        }
    }
    throw new PublicationException("Subtopic " + topic + " is not found in topic " + _name);
}

清单8:主题对发布的支持

processPublications方法创建了一个主题列表,这些主题与给定的消息相关联。这个方法有一个标记过的主题树字符串,如果标记和主题名称相对应,就会把当前的主题添加到列表中。主题的publish方法维护一个消息属性的映射,对主题相关的每个订阅来说,publish方法还会尝试着去发布一条消息。

上面的方法都由Domain管理器类的publish方法调用(清单9)。这个方法首先标记主题字符串,然后用processPublications方法创建一个订阅者感兴趣的主题列表。列表一旦被创建好,就会构建一个消息属性的映射(我们假设是一个XML消息),并把这个映射发布给列表里的所有主题。

     public void publish (String topic, String message){
	      StringTokenizer st = new StringTokenizer(topic, ".");
	      List<DomainTopic> topics = new LinkedList<Domaintopic>(); 	
	      DomainTopic root = PersistenceManager.getPersistenceManager().getRoot();
	      try { 	
	               if(!st.hasMoreTokens()) 
	                      return; 	
	               String t = st.nextToken(); 	
	               if(!t.equalsIgnoreCase(root.getName())) 	
	                      throw new PublicationException("Unrecognized subtopic name " + topic); 	
	               root.processPublications(topics, st); 	
	      }catch (PublicationException e) { 	
	               e.printStackTrace(); 	
	               return; 	
	      } 	
	      MessageType msg = null; 	
	      try { 	
	               JAXBElement<MessageType> msgEl = (JAXBElement<MessageType>)
	                      _unmarshaller.unmarshal(new ByteArrayInputStream(message.getBytes()));	
	               msg = msgEl.getValue(); 	
	      } catch (JAXBException e) { 	
	               e.printStackTrace(); 	
	               return; 	
	      } 	
	      Map<String, String> attributes = new HashMap<String, String>();
	      MessageEnvelopeType envelope = msg.getEnvelope();
	      if(envelope != null){
	               for(MessageAttributeType attribute : envelope.getAttribute()){
	                      attributes.put(attribute.getName(), attribute.getValue());
	               }
	      }
	      for(DomainTopic t : topics)
	               t.publish(attributes, message);
    }

清单9:发布方法实现

服务模型

我们用一组REST服务对发布/订阅功能进行访问(清单10)。

@Path("/")
public class PubSubServiceImplementation {

	  // 功能方法
	  @POST
	  @Path("publish")
	  @Consumes("application/text")
	  public void publish (@QueryParam("topic")String topic, String message) throws PublicationException{
	           DomainManager.getDomainManager().publish(topic, message);
	  }

	  @GET
	  @Path("publish")
	  public void publishGet (@QueryParam("topic")String topic, @QueryParam("message")String message)  throws
PublicationException{
	           DomainManager.getDomainManager().publish(topic, message);
	  }

	  @POST
	  @Path("synch")
	  @Consumes("text/plain")
	  public void getSynchNotification (Object message){
	           PersistenceManager.setUpdated();
	  }

	  // 配置方法

	  @GET
	  @Path("root")
	  @Produces("application/json")
	  public TopicType getRoot()throws PublicationException {
	           return DomainManager.getDomainManager().getRoot();
	  }

	  @GET
	  @Path("filters")
	  @Produces("application/json")
	  public FiltersType getFilters() throws PublicationException {
	           return DomainManager.getDomainManager().getFilters();
	  }

	  @POST
	  @Path("filter")
	  @Consumes("application/json")
	  public long addFilter(FilterType filter) throws PublicationException {
	           return DomainManager.getDomainManager().addFilter(filter);
	  }

	  @DELETE
	  @Path("filter/{id}")
	  public void deleteFilter(@PathParam("id")long id) throws PublicationException {
	           DomainManager.getDomainManager().removeFilter(id);
	  }

	  @GET
	  @Path("subscriptions")
	  @Produces("application/json")
	  public SubscriptionsType getSubscriptions() throws PublicationException {
	           return DomainManager.getDomainManager().getSubscriptions();
	  }

	  @POST
	  @Path("subscription")
	  @Consumes("application/json")
	  public long addSubscription(SubscriptionType s) throws PublicationException {
	           return DomainManager.getDomainManager().addSubscription(s, null);
	  }

	  @DELETE
	  @Path("subscription/{id}")
	  public void deleteSubscription(@PathParam("id")long id) throws PublicationException {
	           DomainManager.getDomainManager().removeSubscription(id);
	  }

	  @POST
	  @Path("subscriptionFilters/{sid}")
	  @Consumes("application/json")
	  public long assignFilersToSubscription(@PathParam("sid")long sid, IDsType ids)throws PublicationException{
	           return DomainManager.getDomainManager().assignFilersToSubscription(sid, ids);

	  }    

	  @POST
	  @Path("topic")
	  @Consumes("application/json")
	  public long addTopic(TopicType t) throws PublicationException {
	           return DomainManager.getDomainManager().addTopic(t, null);
	  }

	  @DELETE
	  @Path("topic/{id}")
	  public void deleteTopic(@PathParam("id")long id) throws PublicationException {
	           DomainManager.getDomainManager().removeTopic(id);
	  }

	  @POST
	  @Path("topicsubscription/{tid}")
	  @Consumes("application/json")
	  public void assignTopicHierarchy(@PathParam("tid")long tid, IDsType ids) throws PublicationException{
	           DomainManager.getDomainManager().assignTopicHierarchy(tid, ids);
	  }

	  @POST
	  @Path("topicsubscription/{tid}")
	  @Consumes("application/json")
	  public long assignTopicSubscriptions(@PathParam("tid")long tid, IDsType ids)throws PublicationException{
	           return DomainManager.getDomainManager().assignTopicSubscriptions(tid, ids);
	  }

清单10:发布/订阅服务

这些服务的使用者有消息发布者(publish方法)、服务订阅者(创建/删除语义过滤器,订阅,还有订阅和主题订阅相关的过滤器)、内部的发布/订阅实现(获取同步的服务)和管理应用。

结论

这个实现虽然简单,但创建了一个非常强大、可扩展的发布/订阅实现,同时利用了很多现有的AWS功能和少量的Java定制代码。另外它还充分利用了现有AWS部署功能对负载均衡和容错的支持。

作者简介

Boris Lublinsky博士是Nokia的主要架构师,参与大数据、SOA、BPM和中间件实现的相关工作。Boris去Nokia前是Herzum软件的主要架构师,负责为客户设计大型、可伸缩的SOA系统;在此之前,他是CNA保险的企业架构师,参与CNA集成和SOA策略的设计及实现,构建应用框架,实现面向服务的架构。Boris在企业技术架构和软件工程方面有二十五年多的经验。他是OASIS SOA RM委员会的活跃成员,和他人一起编著了《Applied SOA: Service-Oriented Architecture and Design Strategies》一书,另外他还写了很多关于架构、编程、大数据、SOA和BPM的文章。

查看英文原文:基于AWS技术实现发布/订阅服务

评价本文

专业度
风格

您好,朋友!

您需要 注册一个InfoQ账号 或者 才能进行评论。在您完成注册后还需要进行一些设置。

获得来自InfoQ的更多体验。

告诉我们您的想法

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我
社区评论

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

讨论

登陆InfoQ,与你最关心的话题互动。


找回密码....

Follow

关注你最喜爱的话题和作者

快速浏览网站内你所感兴趣话题的精选内容。

Like

内容自由定制

选择想要阅读的主题和喜爱的作者定制自己的新闻源。

Notifications

获取更新

设置通知机制以获取内容更新对您而言是否重要

BT