如何在多核和线程上运行TensorFlow

25

我应该先说明一下,我完全不了解任何形式的并行/多线程/多进程编程。

现在,我有机会在32个核心(每个核心具有2个超线程)上运行我的TensorFlow CNN。我花了很多时间试图理解如何修改我的代码(如果必要的话),以便充分利用所有的计算能力。不幸的是,我没有得到任何结果。我希望TF可以自动完成这项工作,但当我启动我的模型并使用 top 检查CPU使用情况时,我大部分时间看到100%的CPU使用率和少量200%的峰值。

如果所有核心都被使用,我期望看到100 * 64 = 6400%的使用率(正确吗?)。我如何实现这一点?

我是否应该执行类似于这里所解释的内容?

如果是这样,我是否正确地理解了所有的多线程仅适用于涉及队列的计算?

这真的是利用所有可用计算能力所能做的全部工作吗(因为在我看来队列仅在读取和批处理训练样本时使用)?

这是我的代码样子(如果需要): (main.py)

# pylint: disable=missing-docstring
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time

from six.moves import xrange  # pylint: disable=redefined-builtin
import tensorflow as tf
from pylab import *

import argparse
import cnn
import freader_2

training_feats_file = ["file_name"]
training_lbls_file = ["file_name"]
test_feats_file = 'file_name'
test_lbls_file = 'file_name'
learning_rate = 0.1
testset_size = 1000
batch_size = 1000
testset_size = 793
tot_samples = 810901
max_steps = 3300

def placeholder_inputs(batch_size):

    images_placeholder = tf.placeholder(tf.float32, shape=(testset_size, cnn.IMAGE_HEIGHT, cnn.IMAGE_WIDTH, 1))
    labels_placeholder = tf.placeholder(tf.float32, shape=(testset_size, 15))
    return images_placeholder, labels_placeholder

def reader(images_file, lbls_file, images_pl, labels_pl, im_height, im_width):

    images = loadtxt(images_file)
    labels_feed = loadtxt(lbls_file)
    images_feed = reshape(images, [images.shape[0], im_height, im_width, 1])

    feed_dict = {
        images_pl: images_feed,
        labels_pl: labels_feed,
    }

    return feed_dict

tot_training_loss = []
tot_test_loss = []
tot_grad = []

print('Starting TensorFlow session...')
with tf.Graph().as_default():

    DS = freader_2.XICSDataSet()
    images, labels = DS.trainingset_files_reader(training_feats_file, training_lbls_file)
    keep_prob = tf.placeholder(tf.float32) 
    logits = cnn.inference(images, batch_size, keep_prob)
    loss = cnn.loss(logits, labels)
    global_step = tf.Variable(0, trainable=False)
    train_op, grad_norm = cnn.training(loss, learning_rate, global_step)
    summary_op = tf.merge_all_summaries()   

    test_images_pl, test_labels_pl = placeholder_inputs(testset_size)
    test_pred = cnn.inference(test_images_pl, testset_size, keep_prob, True)
    test_loss = cnn.loss(test_pred, test_labels_pl)

    saver = tf.train.Saver()
    sess = tf.Session()
    summary_writer = tf.train.SummaryWriter("CNN", sess.graph)

    init = tf.initialize_all_variables()
    sess.run(init)
    tf.train.start_queue_runners(sess=sess)
    test_feed = reader(test_feats_file, test_lbls_file, test_images_pl, test_labels_pl, DS.height, DS.width)
    test_feed[keep_prob] = 1.    

    # Start the training loop.
    print('Starting training loop...')
    start_time = time.time()
    for step in xrange(max_steps):

        _, grad, loss_value= sess.run([train_op, grad_norm, loss], feed_dict = {keep_prob:0.5})  
        tot_training_loss.append(loss_value)
        tot_grad.append(grad)

        _, test_loss_val = sess.run([test_pred, test_loss], feed_dict=test_feed)
        tot_test_loss.append(test_loss_val)

        if step % 1 == 0:        
            duration = time.time() - start_time
            print('Step %d (%.3f sec):\n training loss = %f\n test loss = %f ' % (step, duration, loss_value, test_loss_val))
            print(' gradient = %f'%grad)
#            summary_str = sess.run(summary_op)#, feed_dict=feed_dict)
#            summary_writer.add_summary(summary_str, step)
#            summary_writer.flush()

        if (step+1) % 100 == 0:
            print('Saving checkpoint...')
            saver.save(sess, "chkpts/medias-res", global_step = global_step)

        if test_loss_val < 0.01:# or grad < 0.01:
            print("Stopping condition reached.")
            break

    print('Saving final network...')
    saver.save(sess, "chkpts/final.chkpt")
    print('Total training time: ' + str((time.time() - start_time)/3600) + ' h')
cnn.py:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import math

import tensorflow as tf

NUM_OUTPUT = 15

IMAGE_WIDTH = 195
IMAGE_HEIGHT = 20
IMAGE_PIXELS = IMAGE_WIDTH * IMAGE_HEIGHT

def inference(images, num_samples, keep_prob, reuse=None):

    with tf.variable_scope('conv1', reuse=reuse):
        kernel = tf.get_variable(name='weights', shape=[3, 30, 1, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))        
        weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        conv = tf.nn.conv2d(images, kernel, [1, 1, 5, 1], padding='VALID')
        # output dim: 18x34
        biases = tf.Variable(tf.constant(0.0, name='biases', shape=[5]))
        bias = tf.nn.bias_add(conv, biases)
        conv1 = tf.nn.relu(bias, name='conv1')

    pool1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool1')    
    #output dim: 9x17

    with tf.variable_scope('conv2', reuse=reuse):
        kernel = tf.get_variable(name='weights', shape=[2, 2, 5, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
        weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        conv = tf.nn.conv2d(pool1, kernel, [1, 1, 1, 1], padding='VALID')
        #output dim: 8x16
        biases = tf.Variable(tf.constant(0.1, name='biases', shape=[5]))
        bias = tf.nn.bias_add(conv, biases)
        conv2 = tf.nn.relu(bias, name='conv2')


    pool2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool2')
    #output dim: 4x8

    h_fc1_drop = tf.nn.dropout(pool2, keep_prob)

    with tf.variable_scope('fully_connected', reuse=reuse):
        reshape = tf.reshape(h_fc1_drop, [num_samples, -1])
        dim = reshape.get_shape()[1].value
        weights = tf.get_variable(name='weights', shape=[dim, 20], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
        weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        biases = tf.Variable(tf.zeros([20], name='biases'))
        fully_connected = tf.nn.relu(tf.matmul(reshape, weights) + biases, name='fully_connected')

    with tf.variable_scope('identity', reuse=reuse):
        weights = tf.get_variable(name='weights', shape=[20,NUM_OUTPUT], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
        weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        biases = tf.Variable(tf.zeros([NUM_OUTPUT], name='biases'))
        output = tf.matmul(fully_connected, weights) + biases

    return output


def loss(outputs, labels):

    rmse = tf.sqrt(tf.reduce_mean(tf.square(tf.sub(labels, outputs))), name="rmse")
    loss_list = tf.get_collection('losses')
    loss_list.append(rmse)
    rmse_tot = tf.add_n(loss_list, name='total_loss')  
    return rmse_tot


def training(loss, starter_learning_rate, global_step):

      tf.scalar_summary(loss.op.name, loss)
#      optimizer = tf.train.AdamOptimizer()
      learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step, 200, 0.8, staircase=True)
      optimizer = tf.train.MomentumOptimizer(learning_rate, 0.8)
      grads_and_vars = optimizer.compute_gradients(loss)
      grad_norms = [tf.nn.l2_loss(g[0]) for g in grads_and_vars]      
      grad_norm = tf.add_n(grad_norms)
      train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step)
#      train_op = optimizer.minimize(loss, global_step=global_step)
      return train_op, grad_norm

freader_2.py:

# -*- coding: utf-8 -*-

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os 
import collections
import numpy as np

from six.moves import xrange  
import tensorflow as tf

class XICSDataSet:    
    def __init__(self, height=20, width=195, batch_size=1000, noutput=15):
        self.depth = 1
        self.height = height
        self.width = width
        self.batch_size = batch_size
        self.noutput = noutput

    def trainingset_files_reader(self, im_file_name, lb_file_name, nfiles=1):

        im_filename_queue = tf.train.string_input_producer(im_file_name, shuffle=False)
        lb_filename_queue = tf.train.string_input_producer(lb_file_name, shuffle=False)

        imreader = tf.TextLineReader()
        lbreader = tf.TextLineReader()
        imkey, imvalue = imreader.read(im_filename_queue)
        lbkey, lbvalue = lbreader.read(lb_filename_queue)
        im_record_defaults = [[.0]]*self.height*self.width
        lb_record_defaults = [[.0]]*self.noutput
        im_data_tuple = tf.decode_csv(imvalue, record_defaults=im_record_defaults, field_delim = ' ')
        lb_data_tuple = tf.decode_csv(lbvalue, record_defaults=lb_record_defaults, field_delim = ' ')
        features = tf.pack(im_data_tuple)
        label = tf.pack(lb_data_tuple)

        depth_major = tf.reshape(features, [self.height, self.width, self.depth])

        min_after_dequeue = 10
        capacity = min_after_dequeue + 3 * self.batch_size
        example_batch, label_batch = tf.train.shuffle_batch([depth_major, label], batch_size=self.batch_size, capacity=capacity,
                                                            min_after_dequeue=min_after_dequeue)

        return example_batch, label_batch

2
类似的问题已经在这里得到了回答:https://dev59.com/yFkS5IYBdhLWcg3w1JlK - Sergii Gryshkevych
哦,太好了。我花了几个小时寻找答案,却从未接触过那个问题。我会尝试应用那里所说的内容,看看是否对我也起作用。谢谢。 - bored_to_death
1
你可能会遇到瓶颈问题。也就是说,如果数据无法快速进入多线程操作,那么您的操作将无法在所有核心上运行。调试此问题的一种方法是查看时间轴,如此处所示:https://github.com/tensorflow/tensorflow/issues/1824#issuecomment-225754659 - Yaroslav Bulatov
我现在无法检查时间。你能猜测一下瓶颈可能的原因吗?是不是因为我的所有训练数据都在一个大文件中,而我应该使用多个较小的文件? - bored_to_death
瓶颈可能是您的GPU带宽。它决定了您可以在RAM、CPU和GPU之间快速发送数据的速度。https://dev59.com/PW7Xa4cB1Zd3GeqPut0o - Kershaw
3个回答

11

这是一条评论,但我将其发布为答案,因为我没有足够的声望来发布评论。Marco D.G.的答案是正确的,我只想添加有趣的事实,即 with tf.device('/cpu:0') 自动尝试使用所有可用的核心。祝工作愉快!


11
根据Tensorflow

The two configurations listed below are used to optimize CPU performance by adjusting the thread pools.

  • intra_op_parallelism_threads: Nodes that can use multiple threads to parallelize their execution will schedule the individual pieces into this pool.
  • inter_op_parallelism_threads: All ready nodes are scheduled in this pool.

These configurations are set via the tf.ConfigProto and passed to tf.Session in the config attribute as shown in the snippet below. For both configuration options, if they are unset or set to 0, will default to the number of logical CPU cores. Testing has shown that the default is effective for systems ranging from one CPU with 4 cores to multiple CPUs with 70+ combined logical cores. A common alternative optimization is to set the number of threads in both pools equal to the number of physical cores rather than logical cores

config = tf.ConfigProto()
config.intra_op_parallelism_threads = 44
config.inter_op_parallelism_threads = 44
tf.session(config=config)

在TensorFlow 1.2之前的版本中,建议使用基于多线程和队列的输入流水线来提高性能。然而,从TensorFlow 1.4开始,建议改用tf.data模块。

在Linux中,您可以使用top命令检查CPU使用情况,并按下1以显示每个CPU的使用情况。注意:百分比取决于Irix/Solaris模式。


2
请您更新这个答案,并提供适用于tensorflow 2.0的信息,这会帮助我! - Dan Rayson
tf.config.threading.set_inter_op_parallelism_threads(8) tf.config.threading.set_intra_op_parallelism_threads(8) - Akshar

4

对我来说,它是这样工作的:

from multiprocessing.dummy import Pool as ThreadPool 
....
pool = ThreadPool()
outputs = pool.starmap(run_on_sess,[(tf_vars,data1),(tf_vars,data2),])
pool.close()
pool.join()

您需要初始化会话并将与会话相关的变量作为tf_vars的一部分全局可用。创建一个名为run_on_sess的函数,它将在Python多线程环境中执行sess.run步骤和其他后续计算,针对单个批次命名为data1和data2。


线程不会加速任何东西。 - Stefan Falk

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接