我有一个运行/驱动程序,它监听 Kafka 主题并在收到主题上的新消息时使用 ThreadPoolExecutor 分派任务(如下所示):
consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
enable_auto_commit=False,
auto_offset_reset='latest',
max_poll_records=1,
max_poll_interval_ms=300000)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for message in consumer:
futures.append(executor.submit(SOME_FUNCTION, ARG1, ARG2))
这里有一些代码,但在这里它不重要,所以我跳过了。
SOME_FUNCTION来自于另一个Python脚本,该脚本被导入(事实上,在后续阶段会有一系列的导入操作)。重要的是在这些脚本中的某个点上,我调用了Multiprocessing
Pool,因为我需要对数据进行并行处理(SIMD-单指令多数据),并使用apply_async函数来实现。
for loop_message_chunk in loop_message_chunks:
res_list.append(self.pool.apply_async(self.one_matching.match, args=(hash_set, loop_message_chunk, fields)))
现在,我有两个版本的运行程序/驱动程序:
基于Kafka的(如上所示)
- 此版本会生成启动多进程的线程
监听Kafka -> 启动线程 -> 启动多进程
基于REST的(使用Flask通过REST调用完成相同任务)
- 此版本不会启动任何线程,并直接调用多进程
监听REST端点 -> 启动多进程
你问为什么要有两个运行程序/驱动程序?因为这个微服务将由多个团队使用,有些团队想要同步的基于REST的,而一些团队则想要基于KAFKA的实时异步系统。
当我从并行化函数(以上例子中的self.one_matching.match
)进行日志记录时,通过REST版本调用时可以工作,但是通过KAFKA版本调用时不起作用(即当多进程由线程启动时不起作用)。
同时注意到只有来自并行化函数的日志记录不起作用。从运行程序到调用apply_async的脚本的其他层次结构 - 包括从线程内调用的脚本 - 都能成功记录日志。
其他细节:
- 我使用yaml文件配置记录器
- 我在运行程序脚本本身中为KAFKA或REST版本配置记录器
- 我在每个在运行程序脚本之后被调用的其他脚本中进行
logging.getLogger
,以获取特定的记录器并将其记录到不同的文件中
记录器配置(值已替换为通用名称,因为我无法共享确切名称):
version: 1
formatters:
simple:
format: '%(asctime)s | %(name)s | %(filename)s : %(funcName)s : %(lineno)d | %(levelname)s :: %(message)s'
custom1:
format: '%(asctime)s | %(filename)s :: %(message)s'
time-message:
format: '%(asctime)s | %(message)s'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
handler1:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 5
formatter: simple
level: DEBUG
filename: logs/logfile1.log
handler2:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: custom1
level: INFO
filename: logs/logfile2.log
handler3:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: time-message
level: DEBUG
filename: logs/logfile3.log
handler4:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 30
formatter: time-message
level: DEBUG
filename: logs/logfile4.log
handler5:
class: logging.handlers.TimedRotatingFileHandler
when: midnight
backupCount: 5
formatter: simple
level: DEBUG
filename: logs/logfile5.log
loggers:
logger1:
level: DEBUG
handlers: [console, handler1]
propagate: no
logger2:
level: DEBUG
handlers: [console, handler5]
propagate: no
logger3:
level: INFO
handlers: [handler2]
propagate: no
logger4:
level: DEBUG
handlers: [console, handler3]
propagate: no
logger5:
level: DEBUG
handlers: [console, handler4]
propagate: no
kafka:
level: WARNING
handlers: [console]
propogate: no
root:
level: INFO
handlers: [console]
propogate: no
SOME_FUNCTION
不变(每次调用都创建自己的池,而不是回调到全局ProcessPoolExecutor),它仍然应该以相同的方式工作。我只是在想,不保持创建和销毁独立池可能会减少总开销。 - Aaronfork
方式创建的,它可能会工作,但如果是通过spawn
方式创建的,则不行。QueueHandler可能不够用,你需要使用SocketHandler才能确保正常工作。你可以阅读这个线程以了解更多信息:https://stackoverflow.com/questions/64335940/python-multiprocessing-returning-results-with-logging-and-running-frozen-on-wind - dragon2fly