BT

如何利用碎片时间提升技术认知与能力? 点击获取答案

Java多线程编程模式实战指南一:Active Object模式(上)

| 作者 黄文海 关注 6 他的粉丝 发布于 2014年11月22日. 估计阅读时间: 39 分钟 | QCon上海2018 关注大数据平台技术选型、搭建、系统迁移和优化的经验。

Active Object模式简介

Active Object模式是一种异步编程模式。它通过对方法的调用与方法的执行进行解耦来提高并发性。若以任务的概念来说,Active Object模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离。这有点类似于System.gc()这个方法:客户端代码调用完gc()后,一个进行垃圾回收的任务被提交,但此时JVM并不一定进行了垃圾回收,而可能是在gc()方法调用返回后的某段时间才开始执行任务——回收垃圾。我们知道,System.gc()的调用方代码是运行在自己的线程上(通常是main线程派生的子线程),而JVM的垃圾回收这个动作则由专门的线程(垃圾回收线程)来执行的。换言之,System.gc()这个方法所代表的动作(其所定义的功能)的调用方和执行方是运行在不同的线程中的,从而提高了并发性。

再进一步介绍Active Object模式,我们可先简单地将其核心理解为一个名为ActiveObject的类,该类对外暴露了一些异步方法,如图1所示。

图 1. ActiveObject对象示例

doSomething方法的调用方和执行方运行在各自的线程上。在并发的环境下,doSomething方法会被多个线程调用。这时所需的线程安全控制封装在doSomething方法背后,使得调用方代码无需关心这点,从而简化了调用方代码:从调用方代码来看,调用一个Active Object对象的方法与调用普通Java对象的方法并无太大差别。如清单1所示。

清单 1. Active Object方法调用示例

ActiveObject ao=...;
Future future = ao.doSomething("data");
//执行其它操作
String result = future.get();
System.out.println(result);

Active Object模式的架构

当Active Object模式对外暴露的异步方法被调用时,与该方法调用相关的上下文信息,包括被调用的异步方法名(或其代表的操作)、调用方代码所传递的参数等,会被封装成一个对象。该对象被称为方法请求(Method Request)。方法请求对象会被存入Active Object模式所维护的缓冲区(Activation Queue)中,并由专门的工作线程负责根据其包含的上下文信息执行相应的操作。也就是说,方法请求对象是由运行调用方代码的线程通过调用Active Object模式对外暴露的异步方法生成的,而方法请求所代表的操作则由专门的线程来执行,从而实现了方法的调用与执行的分离,产生了并发。

Active Object模式的主要参与者有以下几种。其类图如图2所示。

图 2. Active Object模式的类图

(点击图像放大)

  • Proxy:负责对外暴露异步方法接口。当调用方代码调用该参与者实例的异步方法doSomething时,该方法会生成一个相应的MethodRequest实例并将其存储到Scheduler所维护的缓冲区中。doSomething方法的返回值是一个表示其执行结果的外包装对象:Future参与者的实例。异步方法doSomething运行在调用方代码所在的线程中。
  • MethodRequest:负责将调用方代码对Proxy实例的异步方法的调用封装为一个对象。该对象保留了异步方法的名称及调用方代码传递的参数等上下文信息。它使得将Proxy的异步方法的调用和执行分离成为可能。其call方法会根据其所包含上下文信息调用Servant实例的相应方法。
  • ActivationQueue:负责临时存储由Proxy的异步方法被调用时所创建的MethodRequest实例的缓冲区。
  • Scheduler:负责将Proxy的异步方法所创建的MethodRequest实例存入其维护的缓冲区中。并根据一定的调度策略,对其维护的缓冲区中的MethodRequest实例进行执行。其调度策略可以根据实际需要来定,如FIFO、LIFO和根据MethodRequest中包含的信息所定的优先级等。
  • Servant:负责对Proxy所暴露的异步方法的具体实现。
  • Future:负责存储和返回Active Object异步方法的执行结果。

Active Object模式的序列图如图3所示。

图 3. Active Object模式的序列图

(点击图像放大)

第1步:调用方代码调用Proxy的异步方法doSomething。

第2~7步:doSomething方法创建Future实例作为该方法的返回值。并将调用方代码对该方法的调用封装为MethodRequest对象。然后以所创建的MethodRequest对象作为参数调用Scheduler的enqueue方法,以将MethodRequest对象存入缓冲区。Scheduler的enqueue方法会调用Scheduler所维护的ActivationQueue实例的enqueue方法,将MethodRequest对象存入缓冲区。

第8步:doSomething返回其所创建的Future实例。

第9步:Scheduler实例采用专门的工作线程运行dispatch方法。

第10~12步:dispatch方法调用ActivationQueue实例的dequeue方法,获取一个MethodRequest对象。然后调用MethodRequest对象的call方法

第13~16步:MethodRequest对象的call方法调用与其关联的Servant实例的相应方法doSomething。并将Servant.doSomething方法的返回值设置到Future实例上。

第17步:MethodRequest对象的call方法返回。

上述步骤中,第1~8步是运行在Active Object的调用者线程中的,这几个步骤实现了将调用方代码对Active Object所提供的异步方法的调用封装成对象(Method Request),并将其存入缓冲区。这几个步骤实现了任务的提交。第9~17步是运行在Active Object的工作线程中,这些步骤实现从缓冲区中读取Method Request,并对其进行执行,实现了任务的执行。从而实现了Active Object对外暴露的异步方法的调用与执行的分离。

如果调用方代码关心Active Object的异步方法的返回值,则可以在其需要时,调用Future实例的get方法来获得异步方法的真正执行结果。

Active Object模式实战案例

某电信软件有一个彩信短号模块。其主要功能是实现手机用户给其它手机用户发送彩信时,接收方号码可以填写为对方的短号。例如,用户13612345678给其同事13787654321发送彩信时,可以将接收方号码填写为对方的短号,如776,而非其真实的号码。

该模块处理其接收到的下发彩信请求的一个关键操作是查询数据库以获得接收方短号对应的真实号码(长号)。该操作可能因为数据库故障而失败,从而使整个请求无法继续被处理。而数据库故障是可恢复的故障,因此在短号转换为长号的过程中如果出现数据库异常,可以先将整个下发彩信请求消息缓存到磁盘中,等到数据库恢复后,再从磁盘中读取请求消息,进行重试。为方便起见,我们可以通过Java的对象序列化API,将表示下发彩信的对象序列化到磁盘文件中从而实现请求缓存。下面我们讨论这个请求缓存操作还需要考虑的其它因素,以及Active Object模式如何帮助我们满足这些考虑。

首先,请求消息缓存到磁盘中涉及文件I/O这种慢的操作,我们不希望它在请求处理的主线程(即Web服务器的工作线程)中执行。因为这样会使该模块的响应延时增大,降低系统的响应性。并使得Web服务器的工作线程因等待文件I/O而降低了系统的吞吐量。这时,异步处理就派上用场了。Active Object模式可以帮助我们实现请求缓存这个任务的提交和执行分离:任务的提交是在Web服务器的工作线程中完成,而任务的执行(包括序列化对象到磁盘文件中等操作)则是在Active Object工作线程中执行。这样,请求处理的主线程在侦测到短号转长号失败时即可以触发对当前彩信下发请求进行缓存,接着继续其请求处理,如给客户端响应。而此时,当前请求消息可能正在被Active Object线程缓存到文件中。如图4所示。

图 4 .异步实现缓存

其次,每个短号转长号失败的彩信下发请求消息会被缓存为一个磁盘文件。但我们不希望这些缓存文件被存在同一个子目录下。而是希望多个缓存文件会被存储到多个子目录中。每个子目录最多可以存储指定个数(如2000个)的缓存文件。若当前子目录已存满,则新建一个子目录存放新的缓存文件,直到该子目录也存满,依此类推。当这些子目录的个数到达指定数量(如100个)时,最老的子目录(连同其下的缓存文件,如果有的话)会被删除。从而保证子目录的个数也是固定的。显然,在并发环境下,实现这种控制需要一些并发访问控制(如通过锁来控制),但是我们不希望这种控制暴露给处理请求的其它代码。而Active Object模式中的Proxy参与者可以帮助我们封装并发访问控制。

下面,我们看该案例的相关代码通过应用Active Object模式在实现缓存功能时满足上述两个目标。首先看请求处理的入口类。该类就是本案例的Active Object模式的客调用方代码。如清单2所示。

清单 2. 彩信下发请求处理的入口类

public class MMSDeliveryServlet extends HttpServlet {

	private static final long serialVersionUID = 5886933373599895099L;

	@Override
	public void doPost(HttpServletRequest req, HttpServletResponse resp)
			throws ServletException, IOException {
		//将请求中的数据解析为内部对象
		MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream());
		Recipient shortNumberRecipient = mmsDeliverReq.getRecipient();
		Recipient originalNumberRecipient = null;

		try {
			// 将接收方短号转换为长号
			originalNumberRecipient = convertShortNumber(shortNumberRecipient);
		} catch (SQLException e) {

			// 接收方短号转换为长号时发生数据库异常,触发请求消息的缓存
			AsyncRequestPersistence.getInstance().store(mmsDeliverReq);

			// 继续对当前请求的其它处理,如给客户端响应
			resp.setStatus(202);
		}

	}

	private MMSDeliverRequest parseRequest(InputStream reqInputStream) {
		MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest();
		//省略其它代码
		return mmsDeliverReq;
	}

	private Recipient convertShortNumber(Recipient shortNumberRecipient)
			throws SQLException {
		Recipient recipent = null;
		//省略其它代码
		return recipent;
	}

}

清单2中的doPost方法在侦测到短号转换过程中发生的数据库异常后,通过调用AsyncRequestPersistence类的store方法触发对彩信下发请求消息的缓存。这里,AsyncRequestPersistence类相当于Active Object模式中的Proxy参与者。尽管本案例涉及的是一个并发环境,但从清单2中的代码可见,AsyncRequestPersistence类的调用方代码无需处理多线程同步问题。这是因为多线程同步问题被封装在AsyncRequestPersistence类之后。

AsyncRequestPersistence类的代码如清单3所示。

清单 3. 彩信下发请求缓存入口类(Active Object模式的Proxy)

// ActiveObjectPattern.Proxy
public class AsyncRequestPersistence implements RequestPersistence {
	private static final long ONE_MINUTE_IN_SECONDS = 60;
	private final Logger logger;
	private final AtomicLong taskTimeConsumedPerInterval = new AtomicLong(0);
	private final AtomicInteger requestSubmittedPerIterval = new AtomicInteger(0);
	
	// ActiveObjectPattern.Servant
	private final DiskbasedRequestPersistence 
						delegate = new DiskbasedRequestPersistence();
	// ActiveObjectPattern.Scheduler
	private final ThreadPoolExecutor scheduler;

	private static class InstanceHolder {
		final static RequestPersistence INSTANCE = new AsyncRequestPersistence();
	}

	private AsyncRequestPersistence() {
		logger = Logger.getLogger(AsyncRequestPersistence.class);
		scheduler = new ThreadPoolExecutor(1, 3, 
				60 * ONE_MINUTE_IN_SECONDS,
				TimeUnit.SECONDS,
				// ActiveObjectPattern.ActivationQueue
				new LinkedBlockingQueue(200), 
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						Thread t;
						t = new Thread(r, "AsyncRequestPersistence");
						return t;
					}

				});

		scheduler.setRejectedExecutionHandler(
				new ThreadPoolExecutor.DiscardOldestPolicy());

		// 启动队列监控定时任务
		Timer monitorTimer = new Timer(true);
		monitorTimer.scheduleAtFixedRate(
		    new TimerTask() {

			@Override
			public void run() {
				if (logger.isInfoEnabled()) {

					logger.info("task count:" 
							+ requestSubmittedPerIterval
							+ ",Queue size:" 
							+ scheduler.getQueue().size()
							+ ",taskTimeConsumedPerInterval:"
							+ taskTimeConsumedPerInterval.get() 
							+ " ms");
				}

				taskTimeConsumedPerInterval.set(0);
				requestSubmittedPerIterval.set(0);

			}
		}, 0, ONE_MINUTE_IN_SECONDS * 1000);
	}

	public static RequestPersistence getInstance() {
		return InstanceHolder.INSTANCE;
	}

	@Override
	public void store(final MMSDeliverRequest request) {
		/*
		 * 将对store方法的调用封装成MethodRequest对象, 并存入缓冲区。
		 */
		// ActiveObjectPattern.MethodRequest
		Callable methodRequest = new Callable() {
			@Override
			public Boolean call() throws Exception {
				long start = System.currentTimeMillis();
				try {
					delegate.store(request);
				} finally {
					taskTimeConsumedPerInterval.addAndGet(
							System.currentTimeMillis() - start);
				}

				return Boolean.TRUE;
			}

		};
		scheduler.submit(methodRequest);

		requestSubmittedPerIterval.incrementAndGet();
	}

}

AsyncRequestPersistence类所实现的接口RequestPersistence定义了Active Object对外暴露的异步方法:store方法。由于本案例不关心请求缓存的结果,故该方法没有返回值。其代码如清单4所示。

清单 4. RequestPersistence接口源码

public interface RequestPersistence {

	 void store(MMSDeliverRequest request);
}

AsyncRequestPersistence类的实例变量scheduler相当于Active Object模式中的Scheduler参与者实例。这里我们直接使用了JDK1.5引入的Executor Framework中的ThreadPoolExecutor。在ThreadPoolExecutor类的实例化时,其构造器的第5个参数(BlockingQueue<Runnable> workQueue)我们指定了一个有界阻塞队列:new LinkedBlockingQueue<Runnable>(200)。该队列相当于Active Object模式中的ActivationQueue参与者实例。

AsyncRequestPersistence类的实例变量delegate相当于Active Object模式中的Servant参与者实例。

AsyncRequestPersistence类的store方法利用匿名类生成一个java.util.concurrent.Callable实例methodRequest。该实例相当于Active Object模式中的MethodRequest参与者实例。利用闭包(Closure),该实例封装了对store方法调用的上下文信息(包括调用参数、所调用的方法对应的操作信息)。AsyncRequestPersistence类的store方法通过调用scheduler的submit方法,将methodRequest送入ThreadPoolExecutor所维护的缓冲区(阻塞队列)中。确切地说,ThreadPoolExecutor是Scheduler参与者的一个“近似”实现。ThreadPoolExecutor的submit方法相对于Scheduler的enqueue方法,该方法用于接纳MethodRequest对象,以将其存入缓冲区。当ThreadPoolExecutor当前使用的线程数量小于其核心线程数量时,submit方法所接收的任务会直接被新建的线程执行。当ThreadPoolExecutor当前使用的线程数量大于其核心线程数时,submit方法所接收的任务才会被存入其维护的阻塞队列中。不过,ThreadPoolExecutor的这种任务处理机制,并不妨碍我们将它用作Scheduler的实现。

methodRequest的call方法会调用delegate的store方法来真正实现请求缓存功能。delegate实例对应的类DiskbasedRequestPersistence是请求消息缓存功能的真正实现者。其代码如清单5所示。

清单 5. DiskbasedRequestPersistence类的源码

public class DiskbasedRequestPersistence implements RequestPersistence {
	// 负责缓存文件的存储管理
	private final SectionBasedDiskStorage storage = new SectionBasedDiskStorage();
	private final Logger logger = Logger
	                                 .getLogger(DiskbasedRequestPersistence.class);

	@Override
	public void store(MMSDeliverRequest request) {
		// 申请缓存文件的文件名
		String[] fileNameParts = storage.apply4Filename(request);
		File file = new File(fileNameParts[0]);
		try {
			ObjectOutputStream objOut = new ObjectOutputStream(
			new FileOutputStream(file));
			try {
				objOut.writeObject(request);
			} finally {
				objOut.close();
			}
		} catch (FileNotFoundException e) {
			storage.decrementSectionFileCount(fileNameParts[1]);
			logger.error("Failed to store request", e);
		} catch (IOException e) {
			storage.decrementSectionFileCount(fileNameParts[1]);
			logger.error("Failed to store request", e);
		}

	}

	class SectionBasedDiskStorage {
		private Deque sectionNames = new LinkedList();
		/*
		 * Key->value: 存储子目录名->子目录下缓存文件计数器
		 */
		private Map sectionFileCountMap 
						= new HashMap();
		private int maxFilesPerSection = 2000;
		private int maxSectionCount = 100;
		private String storageBaseDir = System.getProperty("user.dir") + "/vpn";

		private final Object sectionLock = new Object();

		public String[] apply4Filename(MMSDeliverRequest request) {
			String sectionName;
			int iFileCount;
			boolean need2RemoveSection = false;
			String[] fileName = new String[2];
			synchronized (sectionLock) {
				//获取当前的存储子目录名
				sectionName = this.getSectionName();
				AtomicInteger fileCount;
				fileCount = sectionFileCountMap.get(sectionName);
				iFileCount = fileCount.get();
				//当前存储子目录已满
				if (iFileCount >= maxFilesPerSection) {
					if (sectionNames.size() >= maxSectionCount) {
						need2RemoveSection = true;
					}
					//创建新的存储子目录
					sectionName = this.makeNewSectionDir();
					fileCount = sectionFileCountMap.get(sectionName);

				}
				iFileCount = fileCount.addAndGet(1);

			}

			fileName[0] = storageBaseDir + "/" + sectionName + "/"
			    + new DecimalFormat("0000").format(iFileCount) + "-"
			    + request.getTimeStamp().getTime() / 1000 + "-" 
                               + request.getExpiry()
			    + ".rq";
			fileName[1] = sectionName;

			if (need2RemoveSection) {
				//删除最老的存储子目录
				String oldestSectionName = sectionNames.removeFirst();
				this.removeSection(oldestSectionName);
			}

			return fileName;
		}

		public void decrementSectionFileCount(String sectionName) {
			AtomicInteger fileCount = sectionFileCountMap.get(sectionName);
			if (null != fileCount) {
				fileCount.decrementAndGet();
			}
		}

		private boolean removeSection(String sectionName) {
			boolean result = true;
			File dir = new File(storageBaseDir + "/" + sectionName);
			for (File file : dir.listFiles()) {
				result = result && file.delete();
			}
			result = result && dir.delete();
			return result;
		}

		private String getSectionName() {
			String sectionName;

			if (sectionNames.isEmpty()) {
				sectionName = this.makeNewSectionDir();

			} else {
				sectionName = sectionNames.getLast();
			}

			return sectionName;
		}

		private String makeNewSectionDir() {
			String sectionName;
			SimpleDateFormat sdf = new SimpleDateFormat("MMddHHmmss");
			sectionName = sdf.format(new Date());
			File dir = new File(storageBaseDir + "/" + sectionName);
			if (dir.mkdir()) {
				sectionNames.addLast(sectionName);
				sectionFileCountMap.put(sectionName, new AtomicInteger(0));
			} else {
				throw new RuntimeException(
                                 "Cannot create section dir " + sectionName);
			}

			return sectionName;
		}
	}
}

methodRequest的call方法的调用者代码是运行在ThreadPoolExecutor所维护的工作者线程中,这就保证了store方法的调用方和真正的执行方是分别运行在不同的线程中:服务器工作线程负责触发请求消息缓存,ThreadPoolExecutor所维护的工作线程负责将请求消息序列化到磁盘文件中。

DiskbasedRequestPersistence类的store方法中调用的SectionBasedDiskStorage类的apply4Filename方法包含了一些多线程同步控制代码(见清单5)。这部分控制由于是封装在DiskbasedRequestPersistence的内部类中,对于该类之外的代码是不可见的。因此,AsyncRequestPersistence的调用方代码无法知道该细节,这体现了Active Object模式对并发访问控制的封装。

小结

本篇介绍了Active Object模式的意图及架构,并以一个实际的案例展示了该模式的代码实现。下篇将对Active Object模式进行评价,并结合本文案例介绍实际运用Active Object模式时需要注意的一些事项。


感谢张龙对本文的审校。

给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(@InfoQ)或者腾讯微博(@InfoQ)关注我们,并与我们的编辑和其他读者朋友交流。

评价本文

专业度
风格

您好,朋友!

您需要 注册一个InfoQ账号 或者 才能进行评论。在您完成注册后还需要进行一些设置。

获得来自InfoQ的更多体验。

告诉我们您的想法

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

请问什么不直接用一个Queue by 李 斐然

既然对异步执行的结果不关心, 为什么不直接把MethodRequest放入一个Queue里, 另外一个工作线程负责存储?

相对于Queue,使用ActiveObject的优势是什么?

谢谢。

Re: 请问什么不直接用一个Queue by 李 斐然

又仔细阅读了代码, 发现本质上就是Queue。 ActiveObject模式在Queue的基础上加入了异步执行管理和调度的工作。

个人觉得, 如果仅仅是文中所提到的目的, 那么使用ActiveObject模式有点复杂了。 如果目的是提供一个 “通用的异步暂存机制”,那么ActiveObject会是很好的选择。

谢谢你的分享。

Re: 请问什么不直接用一个Queue by 黄 文海

正如你所表达的,我们不是为了用模式而用模式,是需要根据当前的场景和要实现的目标来作出权衡的。本文案例之所以不是直接使用Queue,然后写自己写线程去“消耗”Queue里的条目,是因为这里还涉及线程池的问题:即消耗Queue里条目的线程1个足够了还是需要多个,如需要多个那具体是多少?从这个角度出发,把整个操作放入队列(而非仅仅是数据),直接利用ExecutorService接口来执行队列条目表示的操作更加方便。另外,还有其它的考虑,如“错误隔离”等,这些会在本文的下篇(正式发布时间是2014年11月25日)介绍到。详见:
www.infoq.com/cn/articles/Java-multithreaded-pr...

Re: 请问什么不直接用一个Queue by xiong liu

请问这段代码:
private static class InstanceHolder {
final static RequestPersistence INSTANCE = new AsyncRequestPersistence();
}
public static RequestPersistence getInstance() {
return InstanceHolder.INSTANCE;
}
有何用意?

本人感觉这个的实质就是在线程池的基础上进行了一下封装就成了所谓的新模式(异步任务处理都是这么做的吧)这个有点缺乏说服力,作者说的错误隔离?什么意思?我直接在提交到任务队列的Runnable或者Callable里面Try Catch也能做吧?

Re: 请问什么不直接用一个Queue by xiong liu

看了Active Object模式(下),利用了动态代理,恩,理解作者的意图了!

Re: 请问什么不直接用一个Queue by cat tom

不明白这种写法有什么优势,或者又有什么通用性,总的感觉是在用线程实现异步调用的过程。

Re: 请问什么不直接用一个Queue by cat tom

不明白这种写法有什么优势,或者又有什么通用性,总的感觉是在用线程实现异步调用的过程。

Re: 请问什么不直接用一个Queue by 黄 文海

我认为,这个问题可以从面向对象编程中的“接口”(interface)和“实现”(implementation)这两个方面去考虑。正如本文开头所讲的,使用Active Object,从调用方代码的角度来看,Active Object与我们常见的对象并无太大区别,也就是说调用方代码看到的是一个“接口”。这个接口就是由Active Object模式对外呈现的。ThreadPoolExecutor(或者ExecutorService)也好,或者直接使用BlockingQueue也好,都是Active Object模式的一个具体实现(implementation),它(在Active Object模式中)对外是不可见的。
如本文的清单1代码所示,调用方代码根本看不出“躲在”ao背后的是什么,是一个Queue?,是一个ThreadPoolExecutor(是单线程的,还是多线程的,具体多少个线程)?这些细节调用方代码都不需要知道,这就带来一些便利。例如,便于编写测试代码、变换实现细节(如在单线程和多线程之间做变换)。
ActiveObject ao=...;
Future future = ao.doSomething("data");
//执行其它操作
String result = future.get();
System.out.println(result);

完全可以用JMS搞定的事情 by Wang Super

收到之后直接扔到某个待处理Queue或者Topic里,然后Consumer去查数据库做转换,失败了在丢进另一个Queue或者Topic里做持久化或者定期重试。完美屏蔽任何底层细节。

显然作者也没有考虑过这个框架跑在分布式环境下会遇到什么问题吧。

Re: 完全可以用JMS搞定的事情 by 黄 文海

JMS其实也只是对各种MQ套上一层外包装。最后还得对实际提供消息队列功能的软件进行选型。商用产品如IBM MQ我们在其它项目里用过,开源的MQ如RabbitMQ我们没有用过,也不想贸然去使用。其次,本文案例涉及的是其所在模块的一个故障处理,并非其核心功能所在。并且,该模块的JVM最大堆内存规划只有384MB,恐怕资源上供应不上某些MQ。所以,我们使用的是对于我们来说相对简单而“够用”的方案。

异步? by 陈 亮

一个store一个线程,这不是异步了吧

Re: 异步? by 黄 文海

store方法是在线程池中执行的:
scheduler.submit(methodRequest);
另外,store方法不管是在一个线程池中的一个还是多个线程中执行,这些线程和web容器的工作线程是各自在各自的时间线上执行的,因而是“异步”的。好比我们烧开水:把水灌入电热水壶,打开开关。然后,我们可能就在看网上的视频(而不是站在厨房等水开!)。烧开水和看视频这两个事件就是“异步”的(各自在各自的时间线上执行着)。

模式之上的架构思想 by guo jinfei

作者介绍的ActiveObject模式看起来很普通,就是一些成熟模式的混合应用,但从软件架构设计的角度来看,其中蕴含了很好的的架构思想。就像文章中的举例,在架构阶段,我们仅仅对问题域给出解决方案(如果短号转换为长号的过程中如果出现数据库异常,可以先将整个下发彩信请求消息缓存到磁盘中,等到数据库恢复后,再从磁盘中读取请求消息,进行重试),在概要设计阶段,明确了异步处理的设计(ActiveObject模式)并给出了明确的接口定义(RequestPersistence 的store就是明确的接口),在详细设计阶段我们才会考虑具体的异步实现(如选择独立Queue、ThreadPoolExecutor、还是JMS)。

Re: 模式之上的架构思想 by 黄 文海

:)这也是本系列文章采用实战案例的一个原因。不仅仅是分享某某模式是什么样的,更重要的是分享关于在什么场景下可以使用什么模式,具体使用时还有哪些细节需要考虑的。

Re: 模式之上的架构思想 by 黄 文海

文章对大家有帮助,对于作者而言是一种极大的鼓励!InfoQ上的这个系列我已经进行了扩充和完善,形成一本书《Java多线程编程实战指南(设计模式篇)》,现已出版上市,各大书店都可以买到。www.amazon.cn/dp/B016IW624G/ref=viscent-douban

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

15 讨论

登陆InfoQ,与你最关心的话题互动。


找回密码....

Follow

关注你最喜爱的话题和作者

快速浏览网站内你所感兴趣话题的精选内容。

Like

内容自由定制

选择想要阅读的主题和喜爱的作者定制自己的新闻源。

Notifications

获取更新

设置通知机制以获取内容更新对您而言是否重要

BT