BT

如何利用碎片时间提升技术认知与能力? 点击获取答案

简化异步操作(上):使用CCR和AsyncEnumerator简化异步操作

| 作者 赵劼 关注 5 他的粉丝 发布于 2009年2月20日. 估计阅读时间: 31 分钟 | AICon 关注机器学习、计算机视觉、NLP、自动驾驶等20+AI热点技术和最新落地成功案例。

在以前的文章中,我曾多次强调应用程序中异步化的重要性。尤其对于IO密集型操作来说,异步执行对于应用程序的响应能力和伸缩性有非常关键的影响。 正确使用异步编程能够使用尽可能少的线程来执行大量的IO密集型操作。可惜的是,即使异步编程有避免线程阻塞等诸多好处,但是这种编程方式至今没有被大量 采用。其原因有很多,其中最主要的一点可能就是异步模型在编程上较为困难,导致许多开发人员不愿意去做。

异步,则意味着一个任务至少要被拆分为“二段式”的调用方式:一个方法用于发起异步请求,另一个方法用于异步任务完成后的回调。与传统方法的调用方 式相比,异步调用时的中间数据不能存放在线程栈上,方法之间的也不能简单地通过参数传递的方式来共享数据。此外,传统方法调用中可使用的 try…catch…finally,using等关键字都无法跨越方法边界,因此异步编程在处理异常,保护资源等方面也需要花更大的精力才行。如果一不 小心,轻则造成资源泄露,重则使整个应用程序崩溃。

因此,无论是微软官方还是社区中都出现了一些简化异步编程方式的组件,例如微软的CCR和Wintellect's .NET Power Threading Library中的AsyncEnumerator。但是它们都有同样的局限性,例如操作之间存在依赖,则很难让它们并行执行。对于这样的场景,我们还需 要构建额外的解决方案,使多个有依赖关系的异步操作之间的协作调用得以尽可能的简化。

传统异步操作使用方式

.NET平台中的异步编程方式为APM(Asynchronous Programming Model,异步编程模型)模式。它使用BeginXxx和EndXxx两个方法形成了二段式调用,并且通过回调函数(AsyncCallback)和异 步状态(IAsyncResult)进行协作完成整个异步操作。例如,如果我们要异步读取一个文件的内容,我们可能会使用这种方法:

FileStream fs = new FileStream(
 @"C:\Sample.data",
 FileMode.Open,
 FileAccess.Read,
 FileShare.Read, 8192,
 FileOptions.Asynchronous);

Byte[] data = new Byte[fs.Length];
fs.BeginRead(data, 0, data.Length, result =>
{
 Int32 bytesRead = fs.EndRead(result);
 ProcessData(data); // 处理数据
 fs.Close(); // 关闭文件流
}, null);

在这段代码中,通过调用FileStream对象的BeginRead方法来发起一个异步的读取操作,并且使用Lambda表达式构造一个匿名 AsyncCallback对象。AsyncCallback回调函数将在异步读取完成之后由框架调用,其中会执行FileSystem对象的 EndRead方法来获取异步操作的结果。最后,还必须显式地关闭文件流,而不能使用using关键字来进行辅助,这是由于文件流在发起异步操作之前打 开,而在回调函数中才能被关闭。在上面的代码中还有一个关键,那就是利用了.NET 2.0中的匿名函数特性形成了一个闭包。由于闭包内部也可以访问外部方法的局部变量(例如AsyncCallback回调函数访问了fs对象),在一定程 度使得“二段式”的异步调用模型共享了“栈”上的数据——其实编译器已经将需要共享的字段和匿名函数放入了一个托管堆上的辅助对象里了。

从上面的代码看来,使用一个异步操作并不如想象中的困难,但是在实际生产中几乎不会出现如此简单的场景。一个比较常见的场景是:在从一个数据源中获 取了一篇文章的所有评论之后,需要根据每条评论的用户ID去另一个数据源获取用户信息,再加以组合并显示给用户查看。对于上层应用(如本例中的UI层)来 说,这两个异步操作为一个整体,只有两个异步操作完成后,一个完整的异步操作才算结束。这种组合往往会使代码进入一种匿名函数互相嵌套的关系:

// 发起GetComment异步操作
Service.BeginGetComments( 1, // 对象ID
commentAsyncResult => // GetComments操作的回调函数
{ // GetComments操作完成, 获得一个IEnumerable<Comment>对象
var comments = Service.EndGetComments(commentAsyncResult); // 发起GetUsers异步操作
Service.BeginGetUsers( comments.Select(c => c.UserID), // 得到用户ID
userAsyncResult => // GetUsers操作的回调函数
{ // GetUsers操作完成
var users = Service.EndGetUsers(userAsyncResult); // 处理数据
ProcessData(comments, users); }, null); }, null);

根据应用的复杂程度不同,异步操作的数量会越来越多,如果一味地进行嵌套,代码的维护性将会越来越差。但是如果您为了避免嵌套而把方法拆开,那么在 分散在各处的回调函数间共享或传递数据又会成为新的问题。因此,无论是微软官方还是社区中都出现了一些组件用于简化异步编程模型的使用。其中最著名的可能 就是微软的“并行与协调运行时”和Wintellect's .NET Power Threading Library中的AsyncEnumerator。

使用CCR简化异步编程模型的使用

并行与协调运行时(Concurrency and Coordination Runtime,CCR)是微软面向机器人平台开发的一套框架,包含在Microsoft Robotics Developer Studio中。Microsoft Robotics Developer Studio 2008 Expression Edition不允许再次分发,但是可以免费用于商业和非商业的目的,您可以在这里阅读它的授权协议

CCR虽然源于机器人平台,但是它其实是一个轻量级的基于消息传递机制的编程框架。在CCR所构建的“消息——端口——队列”的处理模型中,几乎不会形成任何的线程阻塞,因此可以被用于各种需要高度并发的场景中,您可以在它的官方站点Channel 9上获得它的各种案例。有关CCR的资料并不多,但是在它的用户手册中提到了基于迭代器(Iterator)的异步开发方式

C# 2.0中引入了yield关键字,开发人员可以利用这个新特性轻松实现编写一个迭代器。例如如下代码便是选择出数组中所有大于5的元素:

static IEnumerator<int> Get(int[] array)
{
 foreach (var a in array)
 { 
 if (a > 5) yield return a;
 }
}

编译器在这里又一次大显神威,它会根据如上寥寥数行代码自动构建一个复杂的,上百行代码的IEnumerator<int>对象,实现 了Reset,Current,MoveNext,Dispose等多个成员。在调用一次迭代器的MoveNext的方法之后,代码将在yield return语句之处返回(假设在没有其他退出的情况下),直到下次MoveNext方法被调用时才接着之前yield return语句的下一行继续执行。

一般来说,我们会使用C#的foreach关键字来遍历一个迭代器中的所有元素,它会自动生成对于MoveNext/Current成员的调用。但 是现在,yield特性被巧妙地用在了异步编程模型上。试想,开发人员可以在迭代器中发起一个异步操作,之后立即使用yield return语句返回,并且通过某个机制在异步操作结束之后(例如利用异步操作的AsyncCallback回调函数)再次调用迭代器的MoveNext 方法,接着刚才的逻辑继续执行。通过这种“发起操作——yield return——完成操作——发起下一次操作——yield return——完成下一次操作……”的方式,我们可以使用接近于传统的开发方式来进行异步操作。我们现在使用CCR Iterator的方式,将之前异步获取评论和用户的代码进行改写:

static IEnumerator<ITask> GetEnumerator()
{
 var resultPort = new Port<IAsyncResult>();

 // 发起GetComments异步操作
Service.BeginGetComments(1, resultPort.Post, null); // 中断,等待GetComments操作完成
yield return resultPort.Receive(); // GetComments操作完成,获取结果
var comments = Service.EndGetComments((IAsyncResult)resultPort.Test()); // 发起GetUsers异步操作
Service.BeginGetUsers(comments.Select(c => c.UserID), resultPort.Post, null); // 中断,等待GetUsers操作完成
yield return resultPort.Receive(); // GetUsers操作完成,获取结果
var users = Service.EndGetUsers((IAsyncResult)resultPort.Test()); // 处理数据
ProcessData(comments, users); }

然后,我们可以使用如下方式调用这个迭代器:

Dispatcher dispatcher = new Dispatcher();
DispatcherQueue queue = new DispatcherQueue("read queue", dispatcher);
Arbiter.Activate(queue, Arbiter.FromIteratorHandler(CreateEnumerator));

使用这样方式来执行异步操作,不仅免去层层嵌套之苦,更在于它真真正正地使用了传统的开发方式——这意味着之前所谈到的各种缺陷,例如无法使用 try…catch…finally和using的问题都不复存在了。异步世界一下子美好了许多。当然,CCR的功能远不止如此,这里只是使用它的一小部 分功能而已,感兴趣的朋友们可以去之前给出的链接中更进一步了解CCR的强大功能。

使用AsyncEnumerator简化异步模型的使用

Wintellect's .NET Power Threading Library是由Jeffrey Richter开 发的一套类库,包含了许多与多线程和异步编程相关的组件,而AsyncEnumerator则是其中之一。AsyncEnumerator对于异步编程模 型的支持,在原理上与CCR相同,但是由于它是直接面向这种异步机制的辅助,因此在功能上更加完善,使用也较为方便。例如,之前的例子可以改写为:

static IEnumerator<int> GetEnumerator(AsyncEnumerator enumerator)
{
 // 发起GetComments异步操作
Service.BeginGetComments(1, enumerator.End(), null); // 中断,等待GetComments操作完成
yield return 1; // GetComments操作完成,获取结果
var comments = Service.EndGetComments(enumerator.DequeueAsyncResult()); // 发起GetUsers异步操作
Service.BeginGetUsers(comments.Select(c => c.UserID), enumerator.End(), null); // 中断,等待GetUsers操作完成
yield return 1; // GetUsers操作完成,获取结果
var users = Service.EndGetUsers(enumerator.DequeueAsyncResult()); // 处理数据
ProcessData(comments, users); }

在使用时,开发人员需要构造一个IEnumerator<int>对象来指引AsyncEnumerator的调度。 AsyncEnumerator的End方法会返回一个AsyncCallback对象,需要交给每个发起异步操作的方法,用于在一个异步操作完成时进行 通知。在AsyncEnumerator中会维护一个队列,某个异步操作完成后,它的IAsyncResult对象就会放入这个队列中,而 DequeueAsyncResult方法便可将IAsyncResult对象从队列中取出。每次yield return的值,则表明需要等AsyncEnumerator中存在“多少个”未出队列的IAsyncResult对象才继续执行下一行代码。利用这个 特性,我们可以在yield return语句之前发起多个异步操作,并且使用一句yield return来“等待”多个异步操作完成。例如在以下的代码中,只有在所有异步操作(即所有的GetResponse操作)完成之后才能从yield return的下一条语句开始继续执行:

static IEnumerator<int> GetEnumerator(AsyncEnumerator enumerator, IEnumerable<string> urls)
{
 int count = 0;
 foreach (string url in urls)
 {
 count++;
 WebRequest request = HttpWebRequest.Create(url);
 request.BeginGetResponse(enumerator.End(), request);
 }

 yield return count;

 for (int i = 0; i < count; i++)
 {
 IAsyncResult asyncResult = enumerator.DequeueAsyncResult();
 WebRequest request = (WebRequest)asyncResult.AsyncState;
 WebResponse response = request.EndGetResponse(asyncResult);

 ProcessResponse(response);
 }
}

在构建完IEnumerator之后,您可以使用AsyncEnumerator的Execute方法执行整个异步操作:

AsyncEnumerator asyncEnumerator = new AsyncEnumerator();
asyncEnumerator.Execute(GetEnumerator(asyncEnumerator, ...));

不过Execute方法会阻塞调用线程,因此,AsyncEnumerator也同样提供了BeginExecute和EndExecute方法组成了一个标准的APM模式。

如果您希望对AsyncEnumerator有更多了解,可以参考Jeffrey Richter在MSDN Magazine上的Concurrent Affairs专栏里的文章:《Simplified APM with C#》、《Simplified APM with the AsyncEnumerator》以及《More AsyncEnumerator Features》。

CCR或AsyncEnumerator的局限

有了CCR或AsyncEnumerator的支持,开发由多个异步操作组合而成的异步调用并非难事,因为现在的异步开发从编码方式上就已经与普通 的方法非常接近了。无论从逻辑控制还是资源管理,都可以使用传统的手段进行开发,异步操作似乎从来没有那么容易过。但是光靠这样的辅助并不能够在某些场景 下得到最好的解决方案。试想您在开发一个ASP.NET页面用于展示一篇文章,其中需要显示各种信息:

  1. 文章内容
  2. 评论信息
  3. 对评论内容进行打分的用户
  4. 打分者的收藏

由于程序架构的原因,数据需要从各个不同服务或数据源中获取(这是个很常见的情况)。因此,程序中已经准备了如下的数据读取接口:

  1. Begin/EndGetContent:根据文章ID(Int32),获取文章内容(String)
  2. Begin/EndGetComments:根据文章ID(Int32),获取所有评论(IEnumerable<Comment>)
  3. Begin/EndGetUsers:根据多个用户ID(IEnumerable<int>),获取一批用户(Dictionary<int, User>)
  4. Begin/EndGetCommentRaters:根据多个评论ID(IEnumerable<int>),获取所有打分者(IEnumerable<User>)
  5. Begin/EndGetFavorites:根据多个用户ID(IEnumerable<int>),获取所有收藏(IEnumerable<string>)

如果使用AsyncEnumerator辅助开发,您可能会写出如下的代码:

private IEnumerator<int> GetSerialEnumerator(AsyncEnumerator enumerator, int articleId)
{
 // 获取文章内容
Service.BeginGetContent(articleId, enumerator.End(), null); yield return 1; this.Content = Service.EndGetContent(enumerator.DequeueAsyncResult()); // 获取评论
Service.BeginGetComments(articleId, enumerator.End(), null); yield return 1; var comments = Service.EndGetComments(enumerator.DequeueAsyncResult()); // 获取评论者信息,并结合评论绑定至控件
Service.BeginGetUsers(comments.Select(c => c.UserID), enumerator.End(), null); yield return 1; var users = Service.EndGetUsers(enumerator.DequeueAsyncResult()); this.rptComments.DataSource = from c in comments select new
{ Comment = c, User = users[c.UserID] }; this.rptComments.DataBind(); // 获取评论的打分者,并绑定至控件
Service.BeginGetCommentRaters(comments.Select(c => c.CommentID), enumerator.End(), null); yield return 1; var raters = Service.EndGetCommentRaters(enumerator.DequeueAsyncResult()); this.rptRaters.DataSource = raters; this.rptRaters.DataBind(); // 获取打分者的收藏,并绑定至控件
Service.BeginGetFavorites(raters.Select(u => u.UserID), enumerator.End(), null); yield return 1; this.rptFavorites.DataSource = Service.EndGetFavorites(enumerator.DequeueAsyncResult()); this.rptFavorites.DataBind(); }

似乎一切正常,不是吗?为了发现问题,我们做一个略为夸张的假设:“每个操作都会使用2秒钟实现才能完成”,并且使用示意图来表现所有操作的运行时段:

诚然,由于充分地并且合理地利用了异步操作,因此在整个执行过程中只有极小部分时间才会占用线程进行运算。但是,相信您也已经发现了问题:由于 所有操作都是串行的,因此总共需要10秒钟时间才能完成全部操作。这大可不必。导致所有操作串行的原因往往是它们的之间存在着的依赖关系(自然也有可能是 其他原因,例如“资源的竞争”,但是我们这里暂时不考虑这些因素),在我们的示例中最明显的依赖关系,便是一个操作的输出将作为另一个操作的输入。如果我 们将这种关系绘制成图示,那么操作之间的依赖便一目了然了:

这五个操作刚好可以分为三个阶段,其中A和B,C和D均可同时运行。因此,在理想情况下,五个操作的执行阶段应该如下图所示:

在资源充足的情况下,并行的性能往往优于串行,这是不争的事实,但是要做到这一点其实并不容易。CCR或AsyncEnumerator的优势 在于把各异步操作使用普通编程方式串连了起来,因此我们才能使用try…catch…finally和using等关键字来简化我们的逻辑实现。如果一旦 要求并行,那用传统的编程方式则又无法实现了。如果看了之前的内容,您可能会觉得使用AsyncEnumerator也可以实现并行,只要“在yield return语句之前发起多个异步操作”不就可以了吗?其实不然,因为无论是CCR还是AsyncEnumerator都有个“硬伤”:在获取一个 IAsyncResult对象之后,必须由开发人员来指定这个对象的归属问题,这样才能将它交由合适的End方法来完成一个异步操作。在“串行的异步”中 做到这点并不困难,因为yield return语句后获得的IAsyncResult对象必然属于之前发起的异步操作。如果在“并行”的情况下,则需要通过额外的机制来保持这种异步关系。

例如,在之前WebRequest的示例中,我们使用asyncState来保存IAsyncResult对象所对应的WebRequest。 但是我们这么做需要一个前提:并行的操作完全相同,只是从不同的对象发起。也只有如此,才能让开发人员确定IAsyncRequst对象的操作方式,否则 繁琐的if…else无可避免。就拿上例五个异步操作来说,虽然操作A肯定比操作E要提前开始,但是我们很可能无法保证A比E要提前完成。此外,一个异步 操作可能会依赖于其他多个异步操作,因此一个异步操作完成之后,并不能说明依赖于它的异步操作已经能够开始。我们几乎可以肯定,直接使用 AsyncEnumerator会编写出混乱而难以维护的“并行”代码。

在下一片文章中,我们将构建一个组件来解决这方面的问题,使多个有依赖关系的异步操作之间的协作调用得以尽可能的简化。

总结

对于IO密集型操作来说,异步执行对于应用程序的响应能力和伸缩性有非常关键的影响。正确使用异步编程能够使用尽可能少的线程来执行大量的 IO密集型操作。可惜的是异步模型在编程上较为困难,导致许多开发人员不愿意去做。微软推出的CCR,以及Wintellect's .NET Power Threading Library中的AsyncEnumerator都能够在一定程度上简化异步程序的开发。不过,现有的辅助还不足以面对一些复杂的场景。例如,要使多个 有依赖的异步操作尽可能的“并行”,我们还需要构建额外的解决方案。


给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家加入到InfoQ中文站用户讨论组中与我们的编辑和其他读者朋友交流。

评价本文

专业度
风格

您好,朋友!

您需要 注册一个InfoQ账号 或者 才能进行评论。在您完成注册后还需要进行一些设置。

获得来自InfoQ的更多体验。

告诉我们您的想法

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

不错 by Wei Todd

异步的难点在于多个异步的协调同步,期待看到你的解决方案。

by 春涛 管

不错,好东西。可惜不能用于并行处理。
有好的解决方案了,早日发上来

相见恨晚 by Liu Xiaosonl

最近在做SilverLight 处处用于异步编程 一直苦于思考有没有一种简化的编程机制 没想到今天刚好看好 受益匪浅

Re: 相见恨晚 by Jeffrey Zhao

这篇文章里提到的CCR和AsyncEnumerator不一定适合运用在Silverlight上(因为我没有测试过),不过建议您可以去了解一下Power Threading类库中的AsyncEnumerator(在Wintellect.Threading.Silverlight.dll中),而不是这篇文章里的内容(在Wintellect.Threading.dll中)。

异步 by 朱 才华

在.net4里面这个问题能很好的解决吗?

Re: 异步 by Jeffrey Zhao

据我了解,.NET 4.0没有关注这方面的问题。.NET 4.0主要在多线程操作上进行了增强,而这篇文章讲述的是“异步操作”,两者还是有区别的,尤其是在异步IO这一方面。

好像有点小错误 by yang yang

fs.BeginRead(data, 0, data.Length, result =>
{
Int32 bytesRead = fs.EndRead(result);
ProcessData(data); // 处理数据
fs.Close(); // 关闭文件流
}, null);
这个里面 ProcessData(data)这个方法里面传进去的应该是异步请求返回的结果吧 ProcessData(bytesRead );

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

7 讨论

登陆InfoQ,与你最关心的话题互动。


找回密码....

Follow

关注你最喜爱的话题和作者

快速浏览网站内你所感兴趣话题的精选内容。

Like

内容自由定制

选择想要阅读的主题和喜爱的作者定制自己的新闻源。

Notifications

获取更新

设置通知机制以获取内容更新对您而言是否重要

BT