Cronjob 停止执行 Node.js

5
大家好,我有一个问题非常困扰我。基本上,我有一个 Node.js 应用程序,它向电商网站发出请求以检索订单,我们的流程基本如下:我们向该网站发出 GET 请求,然后检索等待接受的订单并“接受”它们,以便我们可以读取运输地址,之后我们解析订单、其产品和运输地址,以便我们的系统可以注册它。现在每5分钟都会执行此操作,之前是每10分钟一次,但我们将其降低以减少丢失订单的数量 :(,所以这是通过我们在 NodeJS 上安排的 cron 作业完成的。这是 cron:*/5 * * * * 此应用程序使用 NodeJs LTS 编写,并在 Ubuntu Docker 容器内运行。 以下是安排 cron 的代码:
    export function createOrderCrons(): void
{
        cron.schedule(
                process.env.ORDER_REGISTER_TIMELAPSE,
                async () =>
                {
                        await consumeRegisterOrders();
                        cronLogger.info('Runing get orders task');
                },
                cronOptions
        );
        cron.schedule(..........);//redacted
}

这里是cron调用的函数:
export async function consumeRegisterOrders(): Promise<void>
{
        try
        {
                await Axios.post(`${process.env.HOST}/orders/`);
                cronLogger.info(`${process.env.HOST}/orders/`);
        }
        catch (error)
        {
                if (!HTTP.TOO_MANY_REQUESTS)
                {
                        errorLogger.info('Consume Register Orders endpoint error:', error);
                }
        }
}

这是读取订单的代码:
export async function getOrders(auth: any): Promise<any[]>
   {
       try
       {
           const orders: IOrder[] = [];
           const ordersToGet: string[] = [];

           const res = await miraklApi.get(
               '/orders?order_state_codes=WAITING_ACCEPTANCE&paginate=false',
               {
                   headers: auth.headers
               }
           );
           errorLogger.info(`orders retrieved from waiting_acceptance ${res.data.orders.length}`);

           for(let i = 0; i < res.data.orders.length; i++)
           {
               const order = res.data.orders[i];
               order.warehouseSialId = auth.warehouseSialId;

               await validateOrder(order, true, auth);
               await ordersToGet.push(order.order_id);
           }
           errorLogger.info(`orders to get with folios ${ordersToGet.length}`);

           if(ordersToGet.length > 0)
           {
               const response = await miraklApi.get(
                   `/orders?order_ids=${ordersToGet.join(',')}&paginate=false`,
                   {
                       headers: auth.headers
                   }
               );

               errorLogger.info(`orders retrieved with address ${response.data.orders.length}`);

               await orders.push(...response.data.orders);
           }
           errorLogger.info(`orders to return ${orders.length}`);
           return await orders;
       }
       catch (error)
       {
           errorLogger.info(`error inside of get orders function ${error}`);
           return [];
       }
   }

这是端点级别的代码:

ordersRouter.post(
  '/',
  async(ctx: Context): Promise < void > => {
    try {
      const auth = {
        headers: {
          // eslint-disable-next-line @typescript-eslint/naming-convention
          Authorization: `${
                                                process.env.MIRAKL_API_TOKEN
                                        }`
        },
        warehouseSialId: process.env.WAREHOUSE_ID
      };

      const orders = await getOrders(auth);

      for (let i = 0; i < orders.length; i++) {
        const order = orders[i];
        let task: ITask = null;

        try {

          order.warehouseSialId = auth.warehouseSialId;
          const products: IProduct[] = await mapToMindiProducts(order.order_lines);
          const mappedOrder = await mapToMindiOrder(order, products);
          await registerOrder(mappedOrder);

          if (onfleetIsEnabled) {
            task = await getOnfleetTaskById(order.shipping.externalId);
          }

        } catch (error) {
          errorLogger.info(`error inside of one cycle to register an order ${JSON.stringify(error)}`);
          continue;
        }

        errorLogger.info(`before change status to shipped ${order.order_id}`);
        await sendTaskToMirakl(task, order, auth);
      }
    } catch (e) {
      errorLogger.info(`error at endpoint /orders ${JSON.stringify(e)}`);
    }

    ctx.status = HTTP.OK;
  }
);

这是一个快速浏览,您可以看到我在第一次迭代中接受了两个订单,然后在第二次迭代中只接受了一个订单,它从未超出此getOrders函数,并且日志中也没有错误:( 似乎没有确定的模式,在我们将cron配置为10分钟时,我注意到每隔一段时间cron运行时,订单无法从getOrders函数中退出,也永远不会注册。

enter image description here


1
在Node中,async/await有一个主要缺点:未捕获的Promise。直到最新的Node(v15),这个问题才被“解决”,但实际上:如果其中任何一个await失败,它们将默默地失败,就像你所看到的那样。我强烈建议你使用try/catch块包装你的await,作为一种快速而简单的“解决”方法。 - Patrick Mascari
1
有可能其中一个函数本身内部存在未捕获的异步调用。我可能错了,但我认为在所有调用的根处仅使用简单的try/catch是不够的。我会进行一些验证。 - Patrick Mascari
1
沿着同样的思路,你不需要在这里使用 await:return await orders; - Marcin
1
@CarlosFranco 这可能不太可能,但如果你的 cron 在你的主机上运行,你能确认你的主机操作系统和容器的系统时间和时区是否相同吗? - Gaurav Agarwal
1
不要在 Node.js 代码中使用代码片段。如果它无法直接在浏览器上运行,它应该是一个代码块,而不是代码片段。 - ElectricShadow
显示剩余5条评论
1个回答

1

你应该使用try catch包装函数consumeRegisterOrders,因为它可能会抛出错误,导致计划任务被终止。

export function createOrderCrons(): void {
    cron.schedule(process.env.ORDER_REGISTER_TIMELAPSE, async () => {
        try {
            await consumeRegisterOrders();
            cronLogger.info('Runing get orders task');
        } catch (err) {
            console.error(err);
        }
    }, cronOptions);
}

谢谢,我从没想过那个。 - Carlos Franco

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接