BT

如何利用碎片时间提升技术认知与能力? 点击获取答案

简化异步操作(下):构建AsyncTaskDispatcher简化多个异步操作之间的协作调用

| 作者 赵劼 关注 5 他的粉丝 发布于 2009年2月22日. 估计阅读时间: 51 分钟 | AICon 关注机器学习、计算机视觉、NLP、自动驾驶等20+AI热点技术和最新落地成功案例。

前言

在《简化异步操作(上):使用CCR和AsyncEnumerator简化异步操作》 一文中,我们谈到了异步编程的重要性,使用异步操作的难点,以及如何使用CCR和AsyncEnumerator来简化异步操作的调用。有了这些组件的辅 助,异步操作也可以真正使用传统的开发方式来编写了——这意味着各种缺陷,例如无法在“堆栈”中保存临时变量,无法try…catch…finally和 using等问题都不复存在了。这些组件让异步编程一下子美好了许多。

不过,现有的辅助还不足以面对一些复杂的场景。例如,要使多个有依赖的异步操作尽可能的“并行”,我们还需要构建额外的解决方案。在这片文章 里,我们将编写一个AsyncTaskDispatcher来简化此类场景下的开发。自然,您也可以下载它的代码后加以修改,使它能更进一步满足您的需 求。

AsyncTaskDispatcher的设计与实现

AsyncTaskDispatcher的目标是为了简化多个异步操作之间的协调调用,使得各异步操作之间能够尽可能的“并行化”。在我的设想中,AsyncTaskDispatcher应该这样使用:

  1. 首先创建一个AsyncTaskDispatcher对象。
  2. 在AsyncTaskDispatcher对象中注册各种异步任务,同时指定各任务之间的依赖关系。
  3. 使用AsyncTaskDispatcher对象的BeginDispatch和EndDispath方法组成了一个APM模式,把所有异步任务作为一个整体来调用。
  4. 如果任何一个异步任务抛出异常:
    • AsyncTaskDispatcher停止分派,并取消正在执行的任务,整个异步调用立即完成。
    • AsyncTaskDispatcher停止分派,并等待正在执行的任务完成后整个异步调用才算结束。

在经过了思考和对比之后,最终我选择基于AsyncEnumerator构造AsyncTaskDispatcher,因为:

  • AsyncEnumerator久经测试,可以让我把注意力集中在分配异步任务的逻辑上,而不需要过渡关注异步操作中的各种稀奇古怪的问题。
  • AsyncEnumerator的使用模型保证了一点:除了异步操作本身之外,其他的逻辑都可以使用串行的方式来执行。这样进一步避免了多线程环境下的各种问题,尤其是在控制各种状态的时候这点更为重要(当然CCR也有这个特点)。
  • AsyncEnumerator个头小巧,仅数十K大小,如果基于CCR开发,则需要一个几兆的再分发包(redistribution package)。
  • AsyncEnumerator自带了BeginExecute和EndExecute方法组成的APM模式,可以直接使用
  • AsyncEnumerator已经包含内置的“取消”功能。
  • AsyncEnumerator已经内置SynchronizationContext,能够在WinForm或WPF界面中直接使用。

那么我们就开始AsyncTaskDispatcher的实现。首先我们要确定一个异步任务包含哪些信息。的是“那些东西构成了一个异步任务”,或者说,“构造一个异步任务需要哪些信息”。AsyncTask的定义如下:

public class AsyncTask
{ private AsyncTask[] m_dependencies; private HashSet<AsyncTask> m_successors; public object Id { get; private set; } public object Context { get; private set; } public AsyncTaskStatus Status { get; internal set; } internal Func<object, DispatchPolicy> Predicate { get; private set; } internal Func<AsyncCallback, object, object, IAsyncResult> Begin { get; private set; } internal Action<IAsyncResult, bool, object> End { get; private set; } private AsyncTask() { } internal AsyncTask( object id, Func<object, DispatchPolicy> predicate, Func<AsyncCallback, object, object, IAsyncResult> begin, Action<IAsyncResult, bool, object> end, object context, IEnumerable<AsyncTask> dependencies) { this.Id = id; this.Predicate = predicate; this.Begin = begin; this.End = end; this.Context = context; this.Status = AsyncTaskStatus.Pending; this.m_successors = new HashSet<AsyncTask>(); this.m_dependencies = dependencies.Where(d => d != null).Distinct().ToArray(); foreach (var task in this.m_dependencies) { task.m_successors.Add(this); } } internal IEnumerable<AsyncTask> Successors { get
{ return this.m_successors; } } internal bool DependenciesSucceeded { get
{ return this.m_dependencies.All(d => d.Status == AsyncTaskStatus.Succeeded || d.Status == AsyncTaskStatus.MarkedAsSucceeded); } } internal void Close() { this.Predicate = null; this.Begin = null; this.End = null; this.m_dependencies = null; this.m_successors = null; } internal AsyncTask MakeSnapshot() { return new AsyncTask
{ Id = this.Id, Status = this.Status }; } }

以下是构造AsyncTask对象时需要提供的信息:

  • id:一个object类型对象,用于标识一个异步任务,在同一个AsyncTaskDispatcher中唯一。
  • context:一个object类型的上下文对象,它是注册异步任务时提供的任意对象,AsyncTaskDispatcher会在其他操作中提供该对象。
  • predicate:一个Func
  • begin:一个Func
  • end:一个Action
  • dependencies:当前任务所依赖的其他异步任务

此外,还有一些成员用于辅助开发:

  • Status属性:表明该异步任务的状态,它们是:
    • MarkedAsSucceeded:表示该任务在predicate委托执行时被标记为“成功”
    • Succeeded:表示该任务正常运行成功
    • Failed:表示该任务执行失败
    • Pending:表示该任务还处于等待状态
    • MarkedAsCancelled:表示该任务在predicate委托执行时被标记为“取消”
    • Cancelling:表明该任务已经发起,但因为其他任务的失败正处于取消阶段,等待结束。
    • Cancelled:表明该任务已经发起,但因为其他任务的失败而被取消。
    • Executing:表明该任务正在执行
  • Successors属性:该任务的所有后继异步任务
  • DependenciesSucceeded属性:表示该任务所依赖的异步任务是否全部成功
  • Close方法:清除该任务中所有非公开数据,使它们在AsyncTask被外部引用的情况下也能尽早回收
  • MakeSnapshot方法:返回表示当前AsyncTask对象状态的快照对象

AsyncTask的大部分成员非常容易理解。只有一点值得一提,就是在某个异步任务执行失败的情况下AsyncTaskDispatcher 的行为如何。我实现的AsyncTaskDispatcher在构造时可以接受一个参数cancelOnFailure,表明某个任务出错之后是立即结束 整个分派过程(cancelOnFailure为true),还是需要等待正在运行的任务完成(cancelOnFailure为false)。

public class AsyncTaskDispatcher
{ private Dictionary<object, AsyncTask> m_tasks; public bool CancelOnFailure { get; private set; } public DispatchStatus Status { get; private set; } ... public AsyncTaskDispatcher() : this(false) { } public AsyncTaskDispatcher(bool cancelOnFailure) { this.CancelOnFailure = cancelOnFailure; this.m_tasks = new Dictionary<object, AsyncTask>(); this.Status = DispatchStatus.NotStarted; } ... }

如果cancelOnFailure为true,作为一个整体的异步分派操作在遇到错误时会立即完成,外部逻辑便可继续执行下去。但是对于那些 已经开始的异步任务,它们的End委托在接下来的某一时刻依旧会被调用,否则就可能会造成资源泄露。一个异步任务的End委托可以通过它的 cancelling参数来判断当前调用是一个正常的结束,还是一个取消过程。如果cancelling为true,则说明整个分派过程已经结束,而其中 某个异步任务出现错误;如果cancelling为false,则表明整个分派过程还在进行,当前异步任务正常结束——但是这并不表示此时整体分派过程中 没有出现问题,有可能某个异步任务已经出错,但是由于cancelOnFailure为false,AsyncTaskDispatcher正在等待其他 异步任务(也包括当前任务)的“正常结束”。

在AsyncTaskDispatcher中,我们将维护一个字典,以便通过id来查找一个AsyncTask对象。开发人员可以通过RegisterTask方法来创建一个AsyncTask对象:

public AsyncTask RegisterTask(
 object id,
 Func<object, DispatchPolicy> predicate,
 Func<AsyncCallback, object, object, IAsyncResult> begin,
 Action<IAsyncResult, bool, object> end,
 object context,
 params object[] dependencies)
{
 lock (this.m_tasks)
 {
 if (this.Status != DispatchStatus.NotStarted)
 {
 throw new InvalidOperationException("Task can only be registered before dispatching.");
 }

 this.CheckRegisterTaskArgs(id, begin, end, dependencies);

 AsyncTask task = new AsyncTask(
 id,
 predicate, 
 begin, 
 end, 
 context, 
 dependencies.Select(d => this.m_tasks[d]));
 this.m_tasks.Add(id, task);

 return task;
 }
}

AsyncTaskDispatcher在各方法内部使用了lock,这是种非常粗略的锁行为——虽然确保了它在多线程环境下也能保持状态的一 致,但是其性能却比精打细算的锁控制要差一些。不过这毕竟不是目前的关键,更何况需要在多线程环境下使用AsyncTaskDispatcher的场景少 之又少。RegisterTask方法有多个重载,它们都将调用直接发送给上面的RegisterTask实现。这个方法会根据传入的依赖id找出新任务 所依赖的异步任务,构造一个AsyncTask对象,并将其放入字典中。

为了形成一个标准的APM模式,AsyncTaskDispatcher提供了BeginDispatch和EndDispatch方法:

private Dictionay<object, Exception> m_taskExceptions;

public IAsyncResult BeginDispatch(AsyncCallback asyncCallback, object asyncState)
{
 lock (this.m_tasks)
 {
 if (this.Status != DispatchStatus.NotStarted)
 {
 throw new InvalidOperationException("An AsyncTaskDispatcher can be started only once.");
 }

 this.Status = DispatchStatus.Dispatching;
 }

 this.m_taskExceptions = new Dictionay<object, Exception>();
 var taskToStart = this.m_tasks.Values.Where(t => t.DependenciesSucceeded).ToList();
 IEnumerator<int> enumerator = this.GetWorkerEnumerator(taskToStart);

 this.m_asyncEnumerator = new AsyncEnumerator();
 return this.m_asyncEnumerator.BeginExecute(enumerator, asyncCallback, asyncState);
}

public void EndDispatch(IAsyncResult asyncResult)
{
 this.m_asyncEnumerator.EndExecute(asyncResult);
 if (this.m_taskExceptions.Count > 0)
 {
 throw new DispatchException(this.GetTaskSnapshots(), this.m_taskExceptions);
 }
}

在BeginDispatch方法中,将会选择出所有不存在依赖的任务,它们会被作为首批发起的异步任务。它们会被传入 GetWorkerEnumerator方法用于构造一个IEnumerator对象,并将其交给AsyncEnumerator 执行。整个执行过程中遇到的异常都会被收集起来,并且在EndDispatch执行中抛出一个包含这些异常信息的DispatchException。异 步任务抛出的每个异常都会使用HandleTaskFailure方法进行处理:

private bool HandleTaskFailure(AsyncTask task, Exception ex)
{
 this.m_taskExceptions.Add(task.Id, ex);
 task.Status = AsyncTaskStatus.Failed;
 task.Close();

 if (this.CancelOnFailure)
 {
 lock (this.m_tasks)
 {
 var runningTasks = this.m_tasks.Values.Where(t => t.Status == AsyncTaskStatus.Executing);
 foreach (AsyncTask t in runningTasks)
 {
 t.Status = AsyncTaskStatus.Cancelling;
 }
 }

 this.Status = DispatchStatus.Cancelling;
 this.m_asyncEnumerator.Cancel(null);
 return true;
 }
 else
{ this.Status = DispatchStatus.Waiting; return false; } }

HandleTaskFailure接受两个参数,一是出错的AsyncTask对象,二便是被抛出的异常,并返回一个值表示是否应该立即中止 任务分派。在收集了异常信息,设定了任务状态并将其关闭之后。则会根据构造AsyncTaskDispatcher对象时所指定的 CancelOnFailure值来确定接下来的行为。如果CancelOnFailure为true(表示取消正在运行的对象),便将正在执行的异步任 务全部标记为Cancelling状态,并调用AsyncEnumerator的Cancel方法取消正在执行的异步任务。根据 CancelOnFailure的不同,AsyncTaskDispatcher自身状态将会变成Cancelling(表示正在取消已经分派的任务,这 个状态不会维持长久)或Waiting(表示正在等待已经分派的任务完成),而HandleTaskFailure方法也会返回对应的结果。

之前提到,如果一个异步任务的End委托对象被执行时,其cancelling参数为false,并不能说明其他任务没有遇到任何错误。不过从 这段实现中便可得知,开发人员只要配合AsyncTaskDispatcher的状态便可确认更多细节:如果此时AsyncTaskDispatcher 的状态为Waiting,则表示之前已经有任务失败了。

public IEnumerator<int> GetWorkerEnumerator(IEnumerable<AsyncTask> tasksToStart)
{
 this.m_runningTaskCount = 0;

 foreach (AsyncTask task in tasksToStart)
 {
 try
{ this.Start(task); } catch (AsyncTaskException ex) { if (this.HandleTaskFailure(ex.Task, ex.InnerException)) { this.Status = DispatchStatus.Finished; yield break; } break; } } ... }

以上代码片断为GetWorkerEnumerator方法的一部分,用于启动首批被分派的异步任务。我们将遍历每个异步任务,并调用 Start方法将其启动。Start方法可能会抛出一个类型为AsyncTaskException的内部异常,这个异常不对外释放,它的作用仅仅是为了 方法间的“通信”——“异常”的确是用来表示错误的一种优秀方式,它能够轻易地从逻辑中跳出,并且在合适的地方进行处理。在捕获 AsyncTaskException异常之后,就会调用之前的HandleTaskFailure方法进行处理,如果它返回true(表示需要立即结束 分派)则使用yiled break来跳出GetWorkerEnumerator方法,否则便使用break来跳出循环——仅仅是跳出循环而并非整个 GetWorkerEnumerator方法,因为此时并不要求整个分派立即结束,接下来的代码将会等待某个已经发起的任务完成,并进行处理:

public IEnumerator<int> GetWorkerEnumerator(IEnumerable<AsyncTask> tasksToStart)
{
 ...

 while (this.m_runningTaskCount > 0)
 {
 yield return 1;

 this.m_runningTaskCount--;
 IAsyncResult asyncResult = this.m_asyncEnumerator.DequeueAsyncResult();
 AsyncTask finishedTask = (AsyncTask)asyncResult.AsyncState;

 try
{ finishedTask.End(asyncResult, false, finishedTask.Context); finishedTask.Status = AsyncTaskStatus.Succeeded; if (this.Status == DispatchStatus.Dispatching) { this.StartSuccessors(finishedTask); } finishedTask.Close(); } catch (AsyncTaskException ex) { if (this.HandleTaskFailure(ex.Task, ex.InnerException)) break; } catch (Exception ex) { if (this.HandleTaskFailure(finishedTask, ex)) break; } } this.Status = DispatchStatus.Finished; }

在Start方法中,如果成功发起了这个异步请求,则会将m_runningTaskCount的数值加一,以此表示正在执行(确切地说,是已 经“开始”但还没有进行“结束”处理)的异步任务数目。while循环体每执行一遍,则表示处理完一个异步请求。因此一般情况下,在没有处理完所有异步请 求的时候while循环体会不断执行。与之前WebRequest的示例相似,AsyncTask对象作为AsyncState保存在 IAsyncResult对象中——由于增加了AsyncTask这一抽象层,因此我们成功地将所有异步操作的处理方式统一了起来。

接着便依次调用End委托(请注意cancelling参数为false)、设置属性、并且在AsyncTaskDispatcher为 Dispatching状态时使用StartSuccessors方法来分派可执行的后继任务——AsyncTaskDispatcher的状态也有可能 是Waiting,这表示已经有任务出错,此时正在等待已经开始的任务完成,这种情况下就不应该发起新的异步任务了。值得注意的是,用户实现的End委托 可能会抛出异常,而StartSuccessors方法也可能抛出AsyncTaskException以表示某个异步任务启动失败。这时就要使用 HandleTaskFailure方法来处理异常,并且在合适的时候立即终止整个分派操作(跳出循环)。

跳出while循环则表示整个派送操作已经完成,此时自然要将AsyncTaskDispatcher的状态设为Finished。

而最后剩下的,只是用于启动任务的Start方法,用于取消任务的CancelTask方法,以及发起后继任务的StartSuccessors方法了:

private void Start(AsyncTask task)
{
 DispatchPolicy policy;
 try
{ policy = task.Predicate == null ? DispatchPolicy.Normal : task.Predicate(task.Context); } catch (Exception ex) { throw new AsyncTaskException(task, ex); } if (policy == DispatchPolicy.Normal) { try
{ task.Begin(this.m_asyncEnumerator.EndVoid(0, this.CancelTask), task, task.Context); this.m_runningTaskCount++; task.Status = AsyncTaskStatus.Executing; } catch (Exception ex) { throw new AsyncTaskException(task, ex); } } else if (policy == DispatchPolicy.MarkAsCancelled) { task.Status = AsyncTaskStatus.MarkedAsCancelled; task.Close(); } else // policy == DispatchPolicy.Succeeded
{ task.Status = AsyncTaskStatus.MarkedAsSucceeded; this.StartSuccessors(task); task.Close(); } } private void StartSuccessors(AsyncTask task) { Func<AsyncTask, bool> predicate = t => t.Status == AsyncTaskStatus.Pending && t.DependenciesSucceeded; foreach (AsyncTask successor in task.Successors.Where(predicate)) { this.Start(successor); } } private void CancelTask(IAsyncResult asyncResult) { AsyncTask task = (AsyncTask)asyncResult.AsyncState; try
{ task.End(asyncResult, true, task.Context); } catch { } finally
{ this.m_runningTaskCount--; task.Status = AsyncTaskStatus.Cancelled; task.Close(); } }

在Start方法中,首先获取异步任务的Predicate委托的执行结果,并根据这个结果选择是将当前异步任务进行正常处理,还是标记为“取 消”或“成功”。如果应该正常执行,则调用Begin委托以发起异步任务,把任务状态标记为Executing,并将 m_runningTaskCount计数加一。如果选择标记为“成功”,那么还需要调用StartSuccessors来发起后继任务,自然在 StartSuccessors方法中也是使用Start方法来发起单个任务——这是一种间接递归,使用“深度优先”的方式发起所有依赖已经全部完成的异 步请求。由于Start方法需要抛出AsyncTaskException来表示任务出错,因此在每段由用户实现的逻辑(即Predicate和 Begin委托)执行时都要小心地捕获异常。

值得一提的是,如果需要使用AsyncEnumerator的Cancel方法来取消已经发起的异步任务,那么在从 AsyncEnumerator获取异步回调委托(AsyncCallback对象)的时候还必须提供一个委托作为取消这个任务时的回调函数。这便是 CancelTask方法的作用,在CancelTask方法中也会调用End委托(请注意cancelling参数为true),以确保资源能够被正确 释放。执行End委托时抛出的任何异常都会被默默“吞下”。一个任务被取消之后,它的状态会被修改为Cancelled,最后则被关闭。

至此,AsyncTaskDispatcher已经全部实现到这里就告一段落了。可见,有了AsyncEnumerator的协助,实现这样一 个组件并不那么困难,从头至尾总共只有200多行代码。事实上,写目前这篇文章所消耗的时间和精力已经数倍于实现一个完整的 AsyncTaskDispatcher。当然,目前的实现远不完美,它虽然较好地实现了有依赖关系的多个异步操作之间的协作调用,但是还缺少一些有用的 功能。例如,您可能需要在分派过程中动态添加新的任务,或是改变任务之间的依赖关系;而且,对于异步操作往往都会指定一个超时时间,可惜目前的 AsyncTaskDispatcher并不包含超时功能。

您可以在这里获得AsyncTaskDispatcher的完整代码,并且根据需要添加任何功能——尤其是刚才提到的缺陷,实现起来实际并不困难。

AsyncTaskDispatcher的使用

现在我们基于AsyncTaskDispatcher来重新实现上一篇文章中提到的场景。假设您在开发一个ASP.NET页面用于展示一篇文章,其中需要显示各种信息:

  1. 文章内容
  2. 评论信息
  3. 对评论内容进行打分的用户
  4. 打分者的收藏

由于程序架构的原因,数据需要从各个不同服务或数据源中获取(这是个很常见的情况)。因此,程序中已经准备了如下的数据读取接口:

  1. Begin/EndGetContent:根据文章ID(Int32),获取文章内容(String)
  2. Begin/EndGetComments:根据文章ID(Int32),获取所有评论(IEnumerable
  3. Begin/EndGetUsers:根据多个用户ID(IEnumerable),获取一批用户(Dictionary
  4. Begin/EndGetCommentRaters:根据多个评论ID(IEnumerable),获取所有打分者(IEnumerable
  5. Begin/EndGetFavorites:根据多个用户ID(IEnumerable),获取所有收藏(IEnumerable

我们可以轻易得出五个异步操作之间的依赖状况:

因此,我们可以编写如下的代码。请注意,我们使用异步任务的id来表示它们之间的依赖关系:

private void RegisterAsyncTasks(AsyncTaskDispatcher dispatcher, int articleId)
{
 // 获取文章内容
string taskGetContent = "get content"; dispatcher.RegisterTask( taskGetContent, // 任务ID (cb, state, context) => // Begin委托对象 { return Service.BeginGetContent(articleId, cb, state); }, (ar, cancelling, context) => // End委托对象 { this.Content = Service.EndGetContent(ar); }); // 获取评论
string taskGetComments = "get comments"; IEnumerable<Comment> comments = null; dispatcher.RegisterTask( taskGetComments, (cb, state, context) => { return Service.BeginGetComments(articleId, cb, state); }, (ar, cancelling, context) => { comments = Service.EndGetComments(ar); }); // 获取评论者信息,并结合评论绑定至控件
string taskGetCommentUsers = "get comment users"; dispatcher.RegisterTask( taskGetCommentUsers, (cb, state, context) => { return Service.BeginGetUsers(comments.Select(c => c.UserID), cb, state); }, (ar, cancelling, context) => { var users = Service.EndGetUsers(ar); this.rptComments.DataSource = from c in comments select new
{ Comment = c, User = users[c.UserID] }; this.rptComments.DataBind(); }, taskGetComments); // 指定任务之间的依赖关系 // 获取评论的打分者,并绑定至控件
string taskGetCommentRaters = "get comment raters"; IEnumerable<User> raters = null; dispatcher.RegisterTask( taskGetCommentRaters, (cb, state, context) => { return Service.BeginGetCommentRaters(comments.Select(c => c.CommentID), cb, state); }, (ar, cancelling, context) => { raters = Service.EndGetCommentRaters(ar); this.rptRaters.DataSource = raters; this.rptRaters.DataBind(); }, taskGetComments); // 获取打分者的收藏内容,并绑定至控件
string taskGetFavorites = "get favorites"; dispatcher.RegisterTask( taskGetFavorites, (cb, state, context) => { return Service.BeginGetFavorites(raters.Select(u => u.UserID), cb, state); }, (ar, cancelling, context) => { this.rptFavorites.DataSource = Service.EndGetFavorites(ar); this.rptFavorites.DataBind(); }, taskGetCommentRaters); }

与之前的作法相比,似乎代码量提高了,但是观察后可以发现,多出来的代码其实都是在创建匿名的委托对象,而一个个匿名的委托对象将代码进行了有 条理的分割,并充分利用“匿名方法”形成的闭包,使各委托对象能够共享“调用堆栈”上的数据。现在的实现使用了一种直观的方式表现了各异步操作之间的依赖 关系,代码一下子变得条理清晰,易于维护了。此外还有一点非常重要:虽然异步任务为“并行”执行,但是其中所有的委托对象只会依次调用,因此开发人员可以 放心地编写代码,而不用担心线程安全方面的问题。

我们可以把上面的代码结合ASP.NET WebForm页面的异步特性一起使用:

protected void Page_Load(object sender, EventArgs e)
{
 this.AddOnPreRenderCompleteAsync(
 new BeginEventHandler(BeginAsyncOperation),
 new EndEventHandler(EndAsyncOperation));
}

private IAsyncResult BeginAsyncOperation(
 object sender,
 EventArgs e,
 AsyncCallback callback,
 object extraData)
{ 
 this.m_dispatcher = new AsyncTaskDispatcher();
 this.RegisterAsyncTasks(this.m_dispatcher, 1);
 return this.m_dispatcher.BeginDispatch(callback, extraData);
}

private void EndAsyncOperation(IAsyncResult ar)
{
 this.m_dispatcher.EndDispatch(ar);
}

这个页面与AsyncTaskDispatcher的源代码同时发布,并且增加了一些简单的时间统计代码。您会发现,原本使用“串行”方式需要10秒钟才能完成的任务,使用如今的“并行”方式只需6秒钟即可完成:

  1. Get Content: 00:00:00 - 00:00:02.0010000
  2. Get Comments: 00:00:00.0010000 - 00:00:02.0010000
  3. Get Comment Users: 00:00:02.0010000 - 00:00:04.0010000
  4. Get Comment Raters: 00:00:02.0010000 - 00:00:04.0010000
  5. Get Favorites: 00:00:04.0010000 - 00:00:06.0010000

这与上一篇文章中提到的“理想情况” 完全一致:

总结

由于CCR和AsyncEnumerator难以“并行”地执行异步代码,因此我们需要提出新的解决方案来满足这方面的需求。在 AsyncEnumerator的基础上开发一个AsyncTaskDispatcher并不困难,但是这个组件能够有效地简化多个异步操作之间的协作调 用。一般来说,这样的做法能够使应用程序的性能与伸缩性得到比较明显的提高。AsyncTaskDispatcher的代码在MSDN Code Gallery上完全公开,您可以自由修改,使它更好地满足您的需求。

相关文章简化异步操作(上)──使用CCR和AsyncEnumerator简化异步操作


给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家加入到InfoQ中文站用户讨论组中与我们的编辑和其他读者朋友交流。

评价本文

专业度
风格

您好,朋友!

您需要 注册一个InfoQ账号 或者 才能进行评论。在您完成注册后还需要进行一些设置。

获得来自InfoQ的更多体验。

告诉我们您的想法

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

关于事务的处理 by 张 逸

好文章。

文中提到在执行异步操作时,如果抛出异常,可能会取消异步操作,或等待当前操作执行完毕后,然后取消。不过这里可能会牵涉到事务的问题。老赵提到的场景中,步骤之间除了存在依赖关系外,其实可以看作是一个完整的事务。目前的组件,似乎只有取消,而没有会滚。

Re: 关于事务的处理 by Jeffrey Zhao

End委托有个cancelling参数,如果它是true的话,则表明这个异步操作正在取消,则可以会滚事务。如果把所有操作认为是一个整体的事务话,可以在EndDispatch抛出异常后进行回滚。
实际上这个组件在设计和使用上只考虑了异步读取的情况……希望可以多多讨论,把这个组件的功能补充完整。:)

如果AysncTask的Predicate返回的是DispatchPolicy.MarkedAsSucceeded by 周 威

如果AysncTask的Predicate返回的是DispatchPolicy.MarkedAsSucceeded,在当前Task执行完成之后会执行:dispatcher.StartSuccessors(task),即执行当前任务的后继任务,但是好像没有真正执行吧。还是在dispatcher.GetWorkerEnumerator()方法得以执行当前Task的后继任务的。

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

3 讨论

登陆InfoQ,与你最关心的话题互动。


找回密码....

Follow

关注你最喜爱的话题和作者

快速浏览网站内你所感兴趣话题的精选内容。

Like

内容自由定制

选择想要阅读的主题和喜爱的作者定制自己的新闻源。

Notifications

获取更新

设置通知机制以获取内容更新对您而言是否重要

BT