在ForEach循环中使用Async/Await Node-Postgres查询

3

编辑:我正在使用node v8.0.0

我刚开始学习如何使用node-postgres访问SQL数据库,但是在访问多个数据库以便以可操作的格式收集数据方面遇到了一些困难,特别是在forEach循环中执行多个查询。经过几次尝试,我正在尝试使用async/await,但是我遇到了以下错误:

await client.connect()
  ^^^^^^
SyntaxError: Unexpected identifier

当我尝试使用连接池或连续调用.query时,我会得到类似以下的结果:
1
[]
could not connect to postgres Error: Connection terminated

这是我的代码的简化版本:

const { Client } = require('pg');
const moment = require('moment');
const _ = require('lodash');
const turf = require('@turf/turf');

const connString = // connection string
var collected = []
const CID = 300
const snaptimes = // array of times
var counter=0;
const client = new Client(connString);

function createArray(i,j) {
     // return array of i arrays of length j
}

await client.connect()

snaptimes.forEach(function(snaptime){
  var info = {}; // an object of objects
  // get information at given snaptime from database 1
  const query1 = // parametrized query selecting two columns from database 1
  const result1 = await client.query(query1, [CID,snaptime]);
  var x = result1.rows;
  for (var i = 0; i < x.length; i++) {
     // store data from database 1 into info
     // each row is an object with two fields
  }

  // line up subjects on the hole
  const query2 = // parametrized query grabbing JSON string from database 2
  const result2 = await client.query(query2, [CID,snaptime]);
  const raw = result2.rows[0].JSON_col;
  const line = createArray(19,0); // an array of 19 empty arrays
  for (var i = 0; i < raw.length; i++) {
     // parse JSON object and record data into line 
  }

  // begin to collect data
  var n = 0;
  var g = 0;
  // walk down the line
  for (var i = 18; i > 0; i--) {
    // if no subjects are found at spot i, do nothing, except maybe update g
    if ((line[i] === undefined || line[i].length == 0) && g == 0){
      g = i;
    } else if (line[i] !== undefined && line[i].length != 0) {
      // collect data for each subject if subjects are found
      line[i].forEach(function(subject){
        const query 3 = // parametrized query grabbing data for each subject 
        const result3 = await client.query(query3,[CID,subject,snaptime]);
        x = result3.rows;
        const y = moment(x[0].end_time).diff(moment(snaptime),'minutes');
        var yhat = 0;
        // the summation over info depends on g
        if (g===0){
          for (var j = i; j <= 18; j++){
            yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
          }
        } else {
          for (var j = i; j <= 18; j++){
            if (i<j && j<g+1) {
              yhat = moment.duration(info[j].field2).add(yhat,'m').asMinutes();
            } else {
              yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
            }
          }
        }
        collected.push([y,yhat,n,i]);
      });
    }
    n+=line[i].length;
    g=0;
  }
  // really rough work-around I once used for printing results after a forEach of queries
  counter++;
  if (counter===snaptimes.length){
    console.log(counter);
    console.log(collected);
    client.end(); 
  }
});
3个回答

4

问题是由于你的forEach回调函数不是async造成的:

snaptimes.forEach(function(snaptime){

should be:

snaptimes.forEach(async function (snaptime) {

需要注意的是,async函数会立即返回一个promise对象,该promise对象最终会通过async函数中的return语句被解析(或者在async函数内部未捕获的异常时被拒绝)。

但是,您还需要确保您的Node版本支持async/await

  • 自从Node 7.6之后,可以不使用--harmony标志。
  • 在Node 7.x的7.6之前,您必须使用--harmony标志。
  • 在Node 7.0之前,它不可用。

请参见:http://node.green/#ES2017-features-async-functions

此外,请注意您只能在使用async关键字声明的函数内部使用await。如果要在脚本或模块的顶层使用它,则需要将其包装在立即调用的函数表达式中:

// cannot use await here
(async () => {
  // can use await here
})();
// cannot use await here

例子:

const f = () => new Promise(r => setTimeout(() => r('x'), 500));

let x = await f();
console.log(x);

打印:

$ node t1.js 
/home/rsp/node/test/prom-async/t1.js:3
let x = await f();
              ^
SyntaxError: Unexpected identifier

但是这个:
const f = () => new Promise(r => setTimeout(() => r('x'), 500));

(async () => {
  let x = await f();
  console.log(x);
})();

打印:

$ node t2.js 
x

经过0.5秒的延迟,如预期所示。

对于不支持async/await的Node版本,第一个(错误的)示例将打印:

$ ~/opt/node-v6.7.0/bin/node t1.js 
/home/rsp/node/test/prom-async/t1.js:3
let x = await f();
              ^
SyntaxError: Unexpected identifier

而第二个(正确的)例子将会输出一个不同的错误:

$ ~/opt/node-v6.7.0/bin/node t2.js 
/home/rsp/node/test/prom-async/t2.js:3
(async () => {
       ^
SyntaxError: Unexpected token (

这很有用,因为不支持async/await的Node版本将不会给出有意义的错误提示,例如“不支持async/await”之类的内容,遗憾的是。


在任何函数调用await之前加上async非常好地解决了问题。然而,当我尝试将更多的snaptimes添加到第一个forEach调用的snaptime数组中时,我要么得到“Error: Connection terminated unexpectedly”或“Error: read ECONNRESET”。根据打印语句,这些错误立即发生在query1上。将client.end()移动到forEach循环之后只会导致代码不运行。 - Hansy Piou
@HansyPiou 当你使用async/await时,我建议使用for (let snaptime of snaptimes) { ... }而不是snaptimes.forEach(function (snaptime) { ... });这样你就不会调用其他函数,迭代步骤的同步也更容易。 - rsp
整个东西现在运行得非常好!使用for()循环意味着我必须将它们全部包装到一个更大的(异步)函数中,但这最终解决了node-postgres的问题,因为我可以在收集数据之前连接客户端,并在完成后关闭它。感谢您的所有帮助! - Hansy Piou

0

请确保您在外部使用async块,例如:

async function() {
  return await Promise.resolve('')
}

它在 Node 7.6.0 版本之后默认支持。在 7.6.0 版本之前,你应该使用 --harmony 选项来启用它。

首先运行 node -v 来检查你的版本。


0

首先,您还不够了解异步等待。别担心,它实际上很容易;但是您需要阅读文档才能使用这些内容。

更重要的是,您代码中的问题在于只能在async函数内部使用await;而您正在任何函数之外使用它。

首先,这里是最接近您编写的代码的解决方案:

const { Client } = require('pg');
const moment = require('moment');
const _ = require('lodash');
const turf = require('@turf/turf');

const connString = // connection string
var collected = []
const CID = 300
const snaptimes = // array of times
var counter=0;
const client = new Client(connString);

function createArray(i,j) {
    // return array of i arrays of length j
}

async function processSnaptime (snaptime) {
    var info = {}; // an object of objects
    // get information at given snaptime from database 1
    const query1 = // parametrized query selecting two columns from database 1
    const result1 = await client.query(query1, [CID,snaptime]);
    var x = result1.rows;
    for (var i = 0; i < x.length; i++) {
      // store data from database 1 into info
      // each row is an object with two fields
    }

    // line up subjects on the hole
    const query2 = // parametrized query grabbing JSON string from database 2
    const result2 = await client.query(query2, [CID,snaptime]);
    const raw = result2.rows[0].JSON_col;
    const line = createArray(19,0); // an array of 19 empty arrays
    for (var i = 0; i < raw.length; i++) {
      // parse JSON object and record data into line
    }

    // begin to collect data
    var n = 0;
    var g = 0;
    // walk down the line
    for (var i = 18; i > 0; i--) {
      // if no subjects are found at spot i, do nothing, except maybe update g
      if ((line[i] === undefined || line[i].length == 0) && g == 0){
        g = i;
      } else if (line[i] !== undefined && line[i].length != 0) {
        // collect data for each subject if subjects are found
        line[i].forEach(function(subject){
          const query 3 = // parametrized query grabbing data for each subject
          const result3 = await client.query(query3,[CID,subject,snaptime]);
          x = result3.rows;
          const y = moment(x[0].end_time).diff(moment(snaptime),'minutes');
          var yhat = 0;
          // the summation over info depends on g
          if (g===0){
            for (var j = i; j <= 18; j++){
              yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
            }
          } else {
            for (var j = i; j <= 18; j++){
              if (i<j && j<g+1) {
                yhat = moment.duration(info[j].field2).add(yhat,'m').asMinutes();
              } else {
                yhat = moment.duration(info[j].field1).add(yhat,'m').asMinutes();
              }
            }
          }
          collected.push([y,yhat,n,i]);
        });
      }
      n+=line[i].length;
      g=0;
    }
    // really rough work-around I once used for printing results after a forEach of queries
    counter++;
    if (counter===snaptimes.length){
      console.log(counter);
      console.log(collected);
    }
}

async function run () {
  for (let snaptime of snaptimes) {
    await processSnaptime(snaptime);
  }
}

/* to run all of them concurrently:
function run () {
  let procs = [];
  for (let snaptime of snaptimes) {
    procs.push(processSnaptime(snaptime));
  }
  return Promise.all(procs);
}
*/

client.connect().then(run).then(() => client.end());

client.connect 返回一个 Promise,我使用 then 来调用 run 一旦它被解决。当 那个 部分结束后,可以安全地调用 client.end()

run 是一个 async 函数,因此它可以使用 await 使代码更易读。对于 processSnaptime 也是如此。

当然,我实际上无法运行您的代码,所以我只能希望我没有犯任何错误。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接