Airflow DAG在PythonOperator时失败,出现错误"Negsignal.SIGKILL"。

13

我在Cloud Composer v1.16.16上运行Airflowv1.10.15。

我的DAG看起来像这样:

from datetime import datetime, timedelta

# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large

default_args = {
    'owner': 'xxxx',
    'depends_on_past': False,
    'start_date': datetime(2021, 9, 14),
    'email_on_failure': True,
    'email': ['xxxx'],
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
    'catchup': False
}


# Define the DAG with parameters
dag = DAG(
    dag_id='xxxx_v1',
    schedule_interval='0 20 * * *',
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    concurrency=1
)

def wd_to_bq(key, val, **kwargs):
    logger.info("workday to BQ ingestion")
    workday_extract.fetch_wd_load_bq(key, val)


start_load = DummyOperator(task_id='start', dag=dag)

end_load = DummyOperator(task_id='end', dag=dag)

for key, val in workday_config_large.endpoint_tbl_mapping.items():
    # Task 1: Process the unmatched records from the view
    workday_to_bq = PythonOperator(
        dag=dag,
        task_id=f'{key}',
        execution_timeout=timedelta(minutes=60),
        provide_context=True,
        python_callable=wd_to_bq,
        op_kwargs={'key': key, 'val': val}
    )
    start_load >> workday_to_bq >> end_load

任务失败并显示错误 - 任务以负信号.SIGKILL返回代码退出。在我的本地机器上,Python脚本运行良好并在15分钟内完成。有多个端点从中提取报告。但是,需要最长时间(约15分钟)的一个失败并显示此错误,其他端点成功。

我尝试了很多选项,但似乎没有一种有效。有人能帮忙吗?


Cloud Composer为您提供监控仪表板。我建议只运行失败的任务,并在此期间检查Airflow worker上的内存和CPU压力。这将告诉您需要增加哪些资源。 - GregK
如果我的答案解决了你的问题,请考虑接受并点赞它。如果没有,让我知道以便我可以改进我的答案。 - Shipra Sarkar
5个回答

10

1
仅提供链接的回答是无用的。您能详细解释一下吗? - Toto
要下载的数据文件大约有100 MB。我正在使用一个由3个节点组成的集群,每个节点都配备n1-standard-1,即3.75 G RAM。这是否足够?我应该将其增加到7.5 G吗? - saurabh saraff
你按照 Greg@ 的建议做了吗?这个错误与资源有关,所以你只需要增加资源。但是,在 Airflow 中,这是一个已知的问题,他们会在有解决方案时发布官方版本。 - Jaime Lopez

3

如果你在airflow任务日志中看到以下类似的信息,那么说明内核/操作系统已经杀死了你的进程。SIGKILL(信号9)是一个立即杀死进程的指令。

{{local_task_job.py:102}} INFO - Task exited with return code Negsignal.SIGKILL

很有可能您正在执行的任务(在本例中是函数 - workday_to_bq)正在工作容器上使用大量资源。我假设您正在摄取和处理某些数据,这可能会占用大量内存
您已经提到它在本地运行正常,但在Airflow云中失败。这可能是因为您的本地系统具有大量RAM或者您的云Composer Airflow worker正在处理占用worker内存的其他DAG。为确认这是内存问题,您可以检查云服务提供的仪表板。
Airflow在工作器上运行其任务,因此您需要升级带有更好硬件的工作器。尝试增加RAM。
  • 对于像Cloud Composer、MWAA这样的完全托管的服务,云提供商应该允许您增加底层硬件。
  • 如果您正在使用dockerdocker-desktop(增加docker桌面的总体内存链接)、swarm或kubernetes,则请检查为worker设置的容器/ pod内存限制。然后可以相应地在清单文件中进行增加。
请注意,Airflow的目的是调度ETL任务并编排管道。您不应该将大量数据加载到Airflow worker中并利用其全部CPU/内存。这将减慢整个Airflow环境或随机SIGKILL您的DAGS。在大多数情况下,只有使用过多内存的DAG/进程将被OOM killer杀死,但有时它会同时杀死同一工作程序上的其他DAG/进程。
要加载/处理/写入大量数据,请使用ETL工具(如fivetran、airbyte、databricks、nifi、azure data factory等),并使用Airflow进行调度和编排。

这个回答比被接受的回答好多了!我的问题并不是由于任何特定的内存密集型进程,而是因为我正在以异步方式启动大量的并行作业:50000个小计算 + 发布/订阅消息。有时候作业会失败并需要重新启动。 - undefined

2

当分配的资源不足时,就会出现此错误。DAG执行受RAM限制。根据DAG的性质,可能会消耗更多的内存。因此,最好使用具有更高内存的机器类型。由于您正在使用Cloud Composer 1,无法自动扩展资源。增加资源将是更好的选择。


1
我也遇到了这个问题,但采取了不同的方法。
您是否考虑过您的脚本如何使用更少的内存/更好地利用内存,而不仅仅是增加可用内存?
    with db_connector_warehouse.create_session() as session:
        query = session.query(offers_table)\
            .yield_per(chunk_size).enable_eagerloads(False)
        
        for df in pd.read_sql(query.statement, session.bind, chunksize=chunk_size):
            yield df

在上面的例子中,将chunksize传递给pandas的底部部分将使其以更小的块拉取数据框,但是pandas仍然会先将所有内容加载到内存中,然后再提供您请求的部分(对于read_sql和可能的其他加载函数,例如csv / xlsx,但我没有研究过这一点)。
因此,您必须确保不要加载整个数据集 - 如果使用SQL Alchemy的ORM,则需要使用yield_per参数。对于普通连接,您可以设置连接以流式传输结果
如果您希望减少内存使用量,以下是一些有用的资源: 如何从SQL查询创建大型pandas数据框,而不会耗尽内存?

https://pythonspeed.com/articles/pandas-sql-chunking/

如果您不熟悉yield流程控制“yield”关键字是什么?

0

我曾经在使用ThreadPoolExecutor时遇到过这个问题,该工具直到所有的futures都完成后才会释放任何资源。为了避免这些错误,我改为每次处理四个元素:

while True:
    chunk = itertools.islice(documents, 0, 4)
    if not chunk:
        break
    with ThreadPoolExecutor(max_workers=4) as executor:
        for each in executor.map(TextScraper(), chunk):
            pass

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接