使用Pyspark进行单元测试:未关闭的套接字警告

19
我想使用PySpark进行单元测试。测试本身可以正常运行,但是每个测试都会产生以下警告:
  • ResourceWarning: unclosed <socket.socket [...]>
  • ResourceWarning: unclosed file <_io.BufferedWriter [...]>
  • DeprecationWarning有关无效转义序列的警告。
我希望了解为什么会出现这些警告,以及如何解决这些警告,以避免在我的单元测试输出中混杂这些警告。
以下是一个最小化工作示例:
# filename: pyspark_unittesting.py
# -*- coding: utf-8 -*-

import unittest


def insert_and_collect(val_in):
    from pyspark.sql import SparkSession
    with SparkSession.builder.getOrCreate() as spark:
        col = 'column_x'
        df = spark.createDataFrame([(val_in,)], [col])

        print('one')
        print(df.count())
        print('two')
        collected = df.collect()
        print('three')
        return collected[0][col]


class MyTest(unittest.TestCase):
    def test(self):
        val = 1
        self.assertEqual(insert_and_collect(val), val)
        print('four')


if __name__ == '__main__':
    val = 1
    print('inserted and collected is equal to original: {}'
          .format(insert_and_collect(val) == val))
    print('five')

如果我使用python pyspark_unittesting.py运行此命令,输出结果如下:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
one
1  
two
three
inserted and collected is equal to original: True
five

如果我使用python -m unittest pyspark_unittesting调用此函数,输出结果为:
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:1890: DeprecationWarning: invalid escape sequence \*
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py:1890: DeprecationWarning: invalid escape sequence \*
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/readwriter.py:398: DeprecationWarning: invalid escape sequence \`
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/readwriter.py:759: DeprecationWarning: invalid escape sequence \`
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/readwriter.py:398: DeprecationWarning: invalid escape sequence \`
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/readwriter.py:759: DeprecationWarning: invalid escape sequence \`
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/streaming.py:618: DeprecationWarning: invalid escape sequence \`
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/streaming.py:618: DeprecationWarning: invalid escape sequence \`
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/functions.py:1519: DeprecationWarning: invalid escape sequence \d
/opt/spark/current/python/lib/pyspark.zip/pyspark/sql/functions.py:1519: DeprecationWarning: invalid escape sequence \d
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
/usr/lib/python3.6/subprocess.py:766: ResourceWarning: subprocess 10219 is still running
  ResourceWarning, source=self)
/usr/lib/python3.6/importlib/_bootstrap.py:219: ImportWarning: can't resolve package from __spec__ or __package__, falling back on __name__ and __path__
  return f(*args, **kwds)
one
1                                                                               
two
/usr/lib/python3.6/socket.py:657: ResourceWarning: unclosed <socket.socket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 49330), raddr=('127.0.0.1', 44169)>
  self._sock = None
three
four
.
----------------------------------------------------------------------
Ran 1 test in 7.394s

OK
sys:1: ResourceWarning: unclosed file <_io.BufferedWriter name=5>

编辑于2018年3月29日

关于@acue的回答,我尝试使用subprocess.Popen调用脚本 - 就像在unittest模块中所做的那样:

In [1]: import pathlib
      : import subprocess
      : import sys
      : 
      : here = pathlib.Path('.').absolute()
      : args = [sys.executable, str(here / 'pyspark_unittesting.py')]
      : opts = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd='/tmp')
      : p = subprocess.Popen(args, **opts)
      : out, err = [b.splitlines() for b in p.communicate()]
      : print(out)
      : print(err)
      : 
      : 
[b'one',
 b'1',
 b'two',
 b'three',
 b'inserted and collected is equal to original: True',
 b'five']

[b"Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties",
 b'Setting default log level to "WARN".',
 b'To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).',
 b'',
 b'[Stage 0:>                                                          (0 + 0) / 8]',
 b'[Stage 0:>                                                          (0 + 8) / 8]',
 b'                                                                                ']

资源警告未显示...

你找到了解决这个问题的方法吗? - adeandrade
@adeandrade 不好意思,我没有。 - akoeltringer
1
这个对我有用:https://dev59.com/a18d5IYBdhLWcg3wpzv4#41334523 - Rebekah Waterbury
@RebekahWaterbury 谢谢,是的,那解决了问题。不幸的是,装饰器必须添加到每个测试中 - 可以设置一次就好了 :-| - akoeltringer
1个回答

5
这些警告会消失,如果你在测试的setUp方法中添加忽略这些警告的指令。
import unittest
import warnings


class MyTest(unittest.TestCase):
    def test(self):
        val = 1
        self.assertEqual(insert_and_collect(val), val)
        print('four')

    def setUp(self):
        warnings.filterwarnings("ignore", category=ResourceWarning)
        warnings.filterwarnings("ignore", category=DeprecationWarning)


现在运行使用 python3 -m unittest pyspark_unittesting.py 命令会输出:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
one
1                                                                               
two
three
four
.
----------------------------------------------------------------------
Ran 1 test in 11.124s

OK

1
当然需要关闭,但你能说明为什么那个套接字不需要关闭吗?你认为我们可以忽略警告吗?既然是警告,那么它肯定有存在的理由。 - PHPirate
@PHPirate 抱歉,我没有网络编程经验来回答这个问题。我只是从抑制这些警告的角度回答了这个问题......不过我的猜测是,由pyspark/任何其他Python服务(比如flask)打开的套接字在进程结束后会被关闭? - Piyush Singh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接