BT

如何利用碎片时间提升技术认知与能力? 点击获取答案

Node.js软肋之回调大坑

| 作者 吴海星 关注 0 他的粉丝 发布于 2014年4月23日. 估计阅读时间: 42 分钟 | QCon上海2018 关注大数据平台技术选型、搭建、系统迁移和优化的经验。

Node.js需要按顺序执行异步逻辑时一般采用后续传递风格,也就是将后续逻辑封装在回调函数中作为起始函数的参数,逐层嵌套。这种风格虽然可以提高CPU利用率,降低等待时间,但当后续逻辑步骤较多时会影响代码的可读性,结果代码的修改维护变得很困难。根据这种代码的样子,一般称其为"callback hell"或"pyramid of doom",本文称之为回调大坑,嵌套越多,大坑越深。

坑的起源

后续传递风格

为什么会有坑?这要从后续传递风格(continuation-passing style--CPS)说起。这种编程风格最开始是由Gerald Jay SussmanGuy L. Steele, Jr.AI Memo 349上提出来的,那一年是1975年,Schema语言的第一次亮相。既然JavaScript的函数式编程设计原则主要源自Schema,这种风格自然也被带到了Javascript中。

这种风格的函数要有额外的参数:“后续逻辑体”,比如带一个参数的函数。CPS函数计算出结果值后并不是直接返回,而是调用那个后续逻辑函数,并把这个结果作为它的参数。从而实现计算结果在逻辑步骤之间的传递,以及逻辑的延续。也就是说如果要调用CPS函数,调用方函数要提供一个后续逻辑函数来接收CPS函数的“返回”值。

回调

在JavaScript中,这个“后续逻辑体”就是我们常说的回调(callback)。这种作为参数的函数之所以被称为回调,是因为它一般在主程序中定义,由主程序交给库函数,并由它在需要时回来调用。而将回调函数作为参数的,一般是一个会占用较长时间的异步函数,要交给另一个线程执行,以便不影响主程序的后续操作。如下图所示:

在JavaScript代码中,后续传递风格就是在CPS函数的逻辑末端调用传入的回调函数,并把计算结果传给它。但在不需要执行处理时间较长的异步函数时,一般并不需要用这种风格。我们先来看个简单的例子,编程求解一个简单的5元方程:

x+y+z+u+v=16
x+y+z+u-v=10
x+y+z-u=11
x+y-z=8
x-y=2

对于x+y=a;x-y=b这种简单的二元方程我们都知道如何求解,这个5元方程的运算规律和这种二元方程也没什么区别,都是两式相加除以2求出前一部分,两式相减除以2求出后一部分。5元方程的前一部分就是4元方程的和值,依次类推。我们的程序写出来就是:

代码清单1. 普通解法-calnorm.js

var res = new Int16Array([16,10,11,8,2]),l= res.length;
var variables = [];
for(var i = 0;i < l;i++) {
    if(i === l-1) {
        variables[i] = res[i];
    }else {
      variables[i] = calculateTail(res[i],res[i+1]);
      res[i+1] = calculateHead(res[i],res[i+1]);
    }
}
function calculateTail(x,y) {
    return (x-y)/2;
}
function calculateHead(x,y) {
    return (x+y)/2;
}

方程式的结果放在了一个整型数组中,我们在循环中依次遍历数组中的头两个值res[i]和res[i+1],用calculateTail计算最后一个单值,比如第一和第二个等式中的v;用calculateHead计算等式的"前半部分",比如第一和第二个等式中的x+y+z+u部分。并用该结果覆盖原来的差值等式,即用x+y+z+u的结果覆盖原来x+y+z+u-v的结果,以便计算下一个tail,直到最终求出所有未知数。

如果calculateTail和calculateHead是CPU密集型的计算,我们通常会把它放到子线程中执行,并在计算完成后用回调函数把结果传回来,以免阻塞主进程。关于CPU密集型计算的相关概念,可参考本系列的上一篇Node.js软肋之CPU密集型任务。比如我们可以把代码改成下面这样:

代码清单2. 回调解法-calcb.js

var res = new Int16Array([16,10,11,8,2]),l= res.length;
var variables = [];
(function calculate(i) {
    if(i === l-1) {
        variables[i] = res[i];
        console.log(i + ":" + variables[i]); 
        process.exit();
    }else {
        calculateTail(res[i],res[i+1],function(tail) {
            variables[i] = tail;
            calculateHead(res[i],res[i+1],function(head) {
                res[i+1] = head;
                console.log('-----------------'+i+'-----------------')
                calculate(i+1);
            });
        });
    }
})(0);
function calculateTail(x,y,cb) {
   setTimeout(function(){
        var tail = (x-y)/2;
        cb(tail);
    },300);
}
function calculateHead(x,y,cb) {
    setTimeout(function(){
        var head = (x+y)/2;
        cb(head);
    },400);
}

跟上一段代码相比,这段代码主要有两个变化。第一是calculateTail和calculateHead里增加了setTimeout,把它们伪装成CPU密集型任务;第二是弃用for循环,改用函数递归。因为calculateHead的计算结果会影响下一轮的calculateTail计算,所以calculateHead计算要阻塞后续计算。而for循环是无法阻塞的,会产生错误的结果。此外就是calculateTail和calculateHead都变成后续传递风格的函数了,通过回调返回最终计算结果。

这个例子比较简单,既不能充分体现回调在处理异步非阻塞操作时在性能上的优越性,坑的深度也不够恐怖。不过也可以说明“用后续传递风格实现几个异步函数的顺序执行是产生回调大坑的根本原因”。下面有一个更抽象的回调样例,看起来更有代表性:

module.exports = function (param, cb) {
  asyncFun1(param, function (er, data) {
    if (er) return cb(er);
    asyncFun2(data,function (er,data) {
      if (er) return cb(er);
      asyncFun3(data, function (er, data) {
        if (er) return cb(er);
        cb(data);
      })
    })
  })
}

像function(er,data)这种回调函数签名很常见,几乎所有的Node.js核心库及第三方库中的CPS函数都接收这样的函数参数,它的第一个参数是错误,其余参数是CPS函数要传递的结果。比如Node.js中负责文件处理的fs模块,我们再看一个实际工作中可能会遇到的例子。要找出一个目录中最大的文件,处理步骤应该是:

  1. fs.readdir获取目录中的文件列表;
  2. 循环遍历文件,获取文件的stat
  3. 找出最大文件;
  4. 以最大文件的文件名为参数调用回调。

这些都是异步操作,但需要顺序执行,后续传递风格的代码应该是下面这样的:

代码清单3. 寻找给定目录中最大的文件

var fs = require('fs')
var path = require('path')
module.exports = function (dir, cb) {
  fs.readdir(dir, function (er, files) { // [1]
    if (er) return cb(er)
    var counter = files.length
    var errored = false
    var stats = []
    files.forEach(function (file, index) {
      fs.stat(path.join(dir,file), function (er, stat) { // [2]
        if (errored) return
        if (er) {
          errored = true
          return cb(er)
        }
        stats[index] = stat // [3]
        if (--counter == 0) { // [4]
          var largest = stats
            .filter(function (stat) { return stat.isFile() }) // [5]
            .reduce(function (prev, next) { // [6]
              if (prev.size > next.size) return prev
              return next
            })
          cb(null, files[stats.indexOf(largest)]) // [7]
        }
      })
    })
  })
}

对这个模块的用户来说,只需要提供一个回调函数function(er,filename),用两个参数分别接收错误或文件名:

var findLargest = require('./findLargest')
findLargest('./path/to/dir', function (er, filename) {
  if (er) return console.error(er)
  console.log('largest file was:', filename)
})

介绍完CPS和回调,我们接下来看看如何平坑。

解套平坑

编写正确的并发程序归根结底是要让尽可能多的操作同步进行,但各操作的先后顺序仍能正确无误。服务端的代码一般逻辑比较复杂,步骤多,此时用嵌套实现异步函数的顺序执行会比较痛苦,所以应该尽量避免嵌套,或者降低嵌套代码的复杂性,少用匿名函数。这一般有几种途径:

  1. 最简单的是把匿名函数拿出来定义成单独的函数,然后或者像原来一样用嵌套方式调用,或者借助流程控制模块放在数组里逐一调用;
  2. 用Promis;
  3. 如果你的Node版本>=0.11.2,可以用generator。

我们先介绍最容易理解的流程控制模块。

流程控制模块

Nimble是一个轻量、可移植的函数式流程控制模块。经过最小化和压缩后只有837字节,可以运行在Node.js中,也可以用在各种浏览器中。它整合了underscoreasync一些最实用的功能,并且API更简单。

nimble有两个流程控制函数,_.parallel和_.series。顾名思义,我们要用的是第二个,可以让一组函数串行执行的_.series。下面这个命令是用来安装Nimble的:

npm install nimble

如果用.series调度执行上面那个解方程的函数,代码应该是这样的:

...
var flow = require('nimble');
(function calculate(i) {
    if(i === l-1) {
        variables[i] = res[i];
        process.exit();
    }else {
        flow.series([
            function (callback) {
                calculateTail(res[i],res[i+1],function(tail) {
                    variables[i] = tail;
                    callback();
                });
            },
            function (callback) {
                calculateHead(res[i],res[i+1],function(head) {
                    res[i+1] = head;
                    callback();
                });
            },
            function(callback){
                calculate(i+1);
            }]);
    }
})(0);
...

.series数组参数中的函数会挨个执行,只是我们的calculateTail和calculateHead都被包在了另一个函数中。尽管这个用流程控制实现的版本代码更多,但通常可读性和可维护性要强一些。接下来我们介绍Promise。

Promise

什么是Promise呢?在纸牌屋的第一季第一集中,当琳达告诉安德伍德不能让他做国务卿后,他说:“所谓Promise,就是说它不会受不断变化的情况影响。”

Promise不仅去掉了嵌套,它连回调都去掉了。因为按照Promise的观点,回调一点也不符合函数式编程的精神。回调函数什么都不返回,没有返回值的函数,执行它仅仅是因为它的副作用。所以用回调函数编程天生就是指令式的,是以副作用为主的过程的执行顺序,而不是像函数那样把输入映射到输出,可以组装到一起。

最好的函数式编程是声明式的。在指令式编程中,我们编写指令序列来告诉机器如何做我们想做的事情。在函数式编程中,我们描述值之间的关系,告诉机器我们想计算什么,然后由机器(底层框架)自己产生指令序列完成计算。Promise把函数的结果变成了一个与时间无关的值,就像算式中的未知数一样,可以用它轻松描述值之间的逻辑计算关系。虽然要得出一个函数最终的结果需要先计算出其中的所有未知数,但我们写的程序只需要描述出各未知数以及未知数和已知数之间的逻辑关系。而CPS是手工编排控制流,不是通过定义值之间的关系来解决问题,因此用回调函数编写正确的并发程序很困难。比如在代码清单2中,caculateHead被放在caculateTail的回调中执行,但实际上在计算同一组值时,两者之间并没有依赖关系,只是进入下一轮计算前需要两者都给出结果,但如果不用回调嵌套,实现这种顺序控制比较麻烦。

当然,这和我们的处理方式(共用数组)有关,就这个问题本身而言,caculateHead完全不依赖于任何caculateTail。

这里用的Promis框架是著名的Q,可以用npm install q安装。虽然可用的Promis框架有很多,但在它们用法上都大同小异。我们在这里会用到其中的三个方法。

第一个负责将Node.js的CPS函数变成Promise。Node.js核心库和第三方库中有非常多的CPS函数,我们的程序肯定要用到这些函数,要解决回调大坑,就要从这些函数开始。这些函数的回调函数参数大多遵循一个相同的模式,即函数签名为function(err, result)。对于这种函数,可以用简单直接的Q.nfcall和Q.nfapply调用这种Node.js风格的函数返回一个Promise:

return Q.nfcall(FS.readFile, "foo.txt", "utf-8");
return Q.nfapply(FS.readFile, ["foo.txt", "utf-8"]);

也可以用Q.denodeify或Q.nbind创建一个可重用的包装函数,比如:

var readFile = Q.denodeify(FS.readFile);
return readFile("foo.txt", "utf-8");

var redisClientGet = Q.nbind(redisClient.get, redisClient);
return redisClientGet("user:1:id");

第二个是then方法,这个方法是Promise对象的核心部件。我们可以用这个方法从异步操作中得到返回值(履约值),或抛出的异常(拒绝的理由)。then方法有两个可选的参数,分别对应Promis对象的两种执行结果。成功时调用的onFulfilled函数,错误时调用onRejected函数:

var promise = asyncFun()
promise.then(onFulfilled, onRejected)

Promise被解决时(异步处理已经完成)会调用onFulfilled 或onRejected 。因为只会有一种可能,所以这两个函数中仅有一个会被触发。尽管then方法的名字让人觉得它跟某种顺序化操作有关,并且那确实是它所承担的职责的副产品,但你真的可以把它当作unwrap来看待。Promise对象是一个存放未知值的容器,而then的任务就是把这个值从Promise中提取出来,把它交给另一个函数。

var promise = readFile()
var promise2 = promise.then(readAnotherFile, console.error)

这个promise表示 onFulfilled 或 onRejected 的返回结果。既然结果只能是其中之一,所以不管是什么结果,Promise都会转发调用:

var promise = readFile()
var promise2 = promise.then(function (data) {
  return readAnotherFile() // if readFile was successful, let's readAnotherFile
}, function (err) {
  console.error(err) // if readFile was unsuccessful, let's log it but still readAnotherFile
  return readAnotherFile()
})
promise2.then(console.log, console.error) // the result of readAnotherFile

因为then 返回的是Promise,所以promise可以形成调用链,从而避免出现回调大坑:

readFile()
  .then(readAnotherFile)
  .then(doSomethingElse)
  .then(...)

第三个是all和spread方法。我们可以把几个Promise放到一个数组中,用all将它们变成一个Promise,而spread跟在all后面就相当于then,只是它同时接受几个结果。如果数组中的N个Promise都成功,那spread的onFulfilled参数就能收到对应的N个结果;如果有一个失败,它的onRejected就会得到第一个失败的Promise抛出的错误。

下面是用Q改写的解方程程序代码:

....
var Q = require('q');
var qTail = Q.denodeify(calculateTail);
var qHead = Q.denodeify(calculateHead);
(function calculate(i) {
    Q.all([
      qTail(res[i],res[i+1]),
      qHead(res[i],res[i+1])])
    .spread(function(tail,head){
        variables[i] = tail;
        res[i+1] = head;
        return i+1;
    })
    .then(function(i){
        if(i === l-1) {
            variables[i] = res[i];
            process.exit();
        }else {
          calculate(i);
        }           
    });
})(0);
function calculateTail(x,y,cb) {
    setTimeout(function(){
        var tail = (x-y)/2;
        cb(null,tail);
    },300);
}
function calculateHead(x,y,cb) {
    setTimeout(function(){
        var head = (x+y)/2;
        cb(null,head);
    },400);
}
...

注意calculateTail和calculateHead中的cb调用,为了满足denodeify的要求,我们给它增加了值为null的err参数。此外还用到了上面提到的denodeify、all和spread、then。其实除了流程控制,Promise在异常处理上也比回调做得好。甚至有些开发团队坚决反对在代码中使用CPS函数,将Promise作为编码规范强制推行。

再来看一下那个找最大文件的例子用Promise实现的样子:

var fs = require('fs')
var path = require('path')
var Q = require('q')
var fs_readdir = Q.denodeify(fs.readdir) // [1]
var fs_stat = Q.denodeify(fs.stat)
module.exports = function (dir) {
  return fs_readdir(dir)
    .then(function (files) {
      var promises = files.map(function (file) {
        return fs_stat(path.join(dir,file))
      })
      return Q.all(promises).then(function (stats) { // [2]
        return [files, stats] // [3]
      })
    })
    .then(function (data) { // [4]
      var files = data[0]
      var stats = data[1]
      var largest = stats
        .filter(function (stat) { return stat.isFile() })
        .reduce(function (prev, next) {
        if (prev.size > next.size) return prev
          return next
        })
      return files[stats.indexOf(largest)]
    })
}

这时这个模块的用法变成了:

var findLargest = require('./findLargest')
findLargest('./path/to/dir')
  .then(function (er, filename) {
    console.log('largest file was:', filename)
  })
  .fail(function(err){
    console.error(err);
  });

因为模块返回的是Promise,所以客户端也变成了Promise风格的,调用链中的所有异常都可以在这里捕获到。不过Q也有可以支持回调风格函数的nodeify方法。

generators

generator科普

在计算机科学中,generator实际上是一种迭代器。它很像一个可以返回数组的函数,有参数,可以调用,并且会生成一系列的值。然而generator不是把数组中的值都准备好然后一次性返回,而是一次yield一个,所以它所需的资源更少,并且调用者可以马上开始处理开头的几个值。简言之,generator看起来像函数,但行为表现像迭代器

Generator也被称为半协程,是协程的一种特例(别协程弱),它总是把控制权交回给调用者(同时返回一个结果值),而不是像协程一样跳转到指定的目的地。如果要说得具体一点儿,就是虽然它们两个都可以yield多次,暂停执行并允许多次进入,但协程可以指定yield之后的去向,而generator不行,它只能把控制权交回给调用者。因为generator主要是为了降低编写迭代器的难度的,所以generator中的yield语句不是用来指明程序要跳到哪里去的,而是用来把值传回给父程序的。

2008年7月,Eich宣布开始ECMAScript Harmony(即ECMAScript 6)项目,从2011年7月开始,ECMAScript Harmony草案开始定期公布,预计到2014年12月正式发布。generator就是在这一过程中出现在ECMAScript中的,随后不久就被引入了V8引擎中

Node.js对generator的支持是从v0.11.2开始的,但因为Harmony还没正式发布,所以需要指明--harmony或--harmony-generator参数启用它。我们用node --harmony进入REPL,创建一个generator:

function* values() {
  for (var i = 0; i < arguments.length; i++) {
    yield arguments[i];
  }
}

注意generator的定义,用的是像函数可又不是函数的function*,循环时每次遇到yield,程序就会暂停执行。那么暂停后,generator何时会再次执行呢?在REPL中执行o.next():

>var o = values(1, 2, 3);
> o.next();
{ value: 1, done: false }
> o.next();
{ value: 2, done: false }
> o.next();
{ value: 3, done: false }
> o.next();
{ value: undefined, done: true }
> 

第一次执行o.next(),返回了一个对象{ value: 1, done: false },执行到第四次时,value变成了undefined,状态done变成了true。表现得像迭代器一样。next()除了得到generator的下一个值并让它继续执行外,还可以把值传给generator。有些文章提到了send(),不过那是老黄历了,也许你看这篇文章的时候,本文中也有很多内容已经过时了,IT技术发展得就是这样快。我们再看一个例子,还是在REPL中:

function* foo(x) {
    yield x + 1;
    var y = yield null;
    return x + y;
}

再次执行next():

>var f = foo(2);
> f.next();
{ value: 3, done: false }
> f.next();
{ value: null, done: false }
> f.next(5);
{ value: 7, done: true }

注意最后的f.next(5),value变成了7,因为最后一个next将5压进了这个generator的栈中,y变成了5。如果要总结一下,那么generator就是:

  • yield可以出现在任何表达式中,所以可以随时暂停执行,比如foo(yield x, yield y)或在循环中。
  • 调用generator只是看起来像函数,但实际上是创建了一个generator对象。只有调用next才会再次启动generator。next还可以向generator对象中传值。
  • generator返回的不是原始值,而是有两个属性的对象:value和done。当generator结束时,done会变成true,之前则一直是false。

可是generator和回调大坑有什么关系呢?因为yield可以暂停程序,next可以让程序再次执行,所以只需稍加控制,就能让异步回调代码顺序执行。

用generator平坑

Node.js社区中有很多借助generator实现异步回调顺序化的库,比如suspendco等,不过我们重点介绍的还是Q。它提供了一个spawn方法。这个方法可以立即运行一个generator,并将其中未捕获的错误发给Q.onerror。比如前面那个解方程的函数,用spawn和generator实现就是:

....
(function calculate(i) {
    Q.spawn(function* () {
        i = yield Q.all([qTail(res[i],res[i+1]),qHead(res[i],res[i+1])])
            .spread(function(tail,head){
                variables[i] = tail;
                res[i+1] = head;
                return i+1;
            })
        if(i === l-1) {
            variables[i] = res[i];
            console.log(i + ":" + variables[i]); 
            process.exit();
        }else {
          calculate(i);
        }           
    });
})(0);
...

代码和前面用Promise实现时并没有太大变化,只是去掉了then,看起来更简单了。不过记得执行时要用>=v0.11.2版本的Node.js,并且要加上--harmony或--harmony-generator。你会看到和前面相同的结果。至于寻找最大文件那个例子,在spawn里定义一个generator,然后在有then的地方放上yield就行了。具体实现就交给你了。

由于篇幅所限,本文没有展开介绍Promise和generator在错误处理上的优势,这在实际工作中也是很重要的部分,应该认真研究。参考资料7中对此介绍得比较详细,建议认真阅读。

参考资料

  1. Managing Node.js Callback Hell with Promises, Generators and Other Approaches,Marc Harter,2014.02.03
  2. 在Node.js 中用 Q 实现Promise – Callbacks之外的另一种选择
  3. 指令式Callback,函数式Promise:对node.js的一声叹息
  4. How-to Compose Node.js Promises with Q,Marc Harter,2013.08.14
  5. Analysis of generators and other async patterns in node
  6. generators in v8,Andy Wingo,2013.05.08
  7. A Study on Solving Callbacks with JavaScript Generators,James Long,2013.06.05
  8. V8 JavaScript Engine-Issue 2355:Implement generators
  9. 本文源码

评价本文

专业度
风格

您好,朋友!

您需要 注册一个InfoQ账号 或者 才能进行评论。在您完成注册后还需要进行一些设置。

获得来自InfoQ的更多体验。

告诉我们您的想法

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

不错 by 张章 鸥翔鱼游

还真的不错的 支持支持!

赞一个 by 张章 鸥翔鱼游

好文章。总是支持一下的

额。 by 袁 锋

“是协程的一种特例(别协程弱)”,这里应该是“比协程弱”吧。

这篇文章值得收藏 by 曦 杨

这篇文章值得收藏,对写 Node.js 的人帮助很大

123 by lin yf

var findLargest = require('./findLargest')
findLargest('./path/to/dir')
.then(function (er, filename) {
console.log('largest file was:', filename)
})
.fail(function(err){
console.error(err);
});

.then 那里的er

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

允许的HTML标签: a,b,br,blockquote,i,li,pre,u,ul,p

当有人回复此评论时请E-mail通知我

5 讨论

登陆InfoQ,与你最关心的话题互动。


找回密码....

Follow

关注你最喜爱的话题和作者

快速浏览网站内你所感兴趣话题的精选内容。

Like

内容自由定制

选择想要阅读的主题和喜爱的作者定制自己的新闻源。

Notifications

获取更新

设置通知机制以获取内容更新对您而言是否重要

BT