如何解决PyTorch在CPU上的多进程问题?

4

我正在使用CPU对pytorch进行推理。 我发现pytorch在预测时没有充分利用所有CPU的核心。 如何在pytorch中使用所有核心?


你能添加一些你使用的参考资料吗?甚至可以为其他搜索者添加代码片段。我正在测试我的答案,所以回答很容易,但问题也很重要! :-) - Clemens Tolboom
@rajamohan,你解决了这个问题吗?我现在也遇到了同样的问题。如果你有任何发现,那真的会很有帮助。 - asanoop24
2个回答

3

框架

使用下面的框架,我看到有4个进程在运行。你应该调整 n_train_processes。我将其设置为10,但这太多了,因为我的电脑只有8个核心。将其设置为6就可以正常工作。

...
import torch.multiprocessing as mp

class MyModel(nn.Module):
    ...


def train(model, rank):
    ...


def test(model):
    ...



n_train_processes = 3


if __name__ == '__main__':
    model = MyModel()
    model.share_memory()

    processes = []
    for rank in range(n_train_processes + 1):  # + 1 for test process
        if rank == 0:
            p = mp.Process(target=test, args=(model,))
        else:
            p = mp.Process(target=train, args=(model, rank,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

完整的例子

这个例子来自于https://github.com/seungeunrho/minimalRL,里面还有其他不错的强化学习例子。这是a3c.py

# a3c.py
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import torch.multiprocessing as mp
import time

n_train_processes = 6

# Hyperparameters
learning_rate = 0.0002
update_interval = 5
gamma = 0.98
max_train_ep = 300
max_test_ep = 400


class ActorCritic(nn.Module):
    def __init__(self):
        super(ActorCritic, self).__init__()
        self.fc1 = nn.Linear(4, 256)
        self.fc_pi = nn.Linear(256, 2)
        self.fc_v = nn.Linear(256, 1)

    def pi(self, x, softmax_dim=0):
        x = F.relu(self.fc1(x))
        x = self.fc_pi(x)
        prob = F.softmax(x, dim=softmax_dim)
        return prob

    def v(self, x):
        x = F.relu(self.fc1(x))
        v = self.fc_v(x)
        return v


def train(model, rank):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    env = gym.make('CartPole-v1')

    for n_epi in range(max_train_ep):
        done = False
        s = env.reset()
        while not done:
            s_lst, a_lst, r_lst = [], [], []
            for t in range(update_interval):
                prob = model.pi(torch.from_numpy(s).float())
                m = Categorical(prob)
                a = m.sample().item()
                s_prime, r, done, info = env.step(a)

                s_lst.append(s)
                a_lst.append([a])
                r_lst.append(r/100.0)

                s = s_prime
                if done:
                    break

            R = 0.0
            R_lst = []
            for reward in r_lst[::-1]:
                R = gamma * R + reward
                R_lst.append([R])
            R_lst.reverse()

            done_mask = 0.0 if done else 1.0
            s_batch, a_batch, R_batch, s_final = \
                torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
                torch.tensor(R_lst), torch.tensor(s_prime, dtype=torch.float)

            td_target = R_batch + gamma * model.v(s_final) * done_mask
            advantage = td_target - model.v(s_batch)
            pi = model.pi(s_batch, softmax_dim=1)
            pi_a = pi.gather(1, a_batch)
            loss = -torch.log(pi_a) * advantage.detach() + \
                F.smooth_l1_loss(td_target.detach(), model.v(s_batch))

            optimizer.zero_grad()
            loss.mean().backward()
            optimizer.step()

    env.close()
    print("Training process {} reached maximum episode.".format(rank))


def test(model):
    env = gym.make('CartPole-v1')
    score = 0.0
    print_interval = 20

    for n_epi in range(max_test_ep):
        done = False
        s = env.reset()
        while not done:
            prob = model.pi(torch.from_numpy(s).float())
            a = Categorical(prob).sample().item()
            s_prime, r, done, info = env.step(a)
            s = s_prime
            score += r

        if n_epi % print_interval == 0 and n_epi != 0:
            print("# of episode :{}, avg score : {:.1f}".format(
                n_epi, score/print_interval))
            score = 0.0
            time.sleep(1)
    env.close()


if __name__ == '__main__':
    model = ActorCritic()
    model.share_memory()
    processes = []
    for rank in range(n_train_processes + 1):  # + 1 for test process
        if rank == 0:
            p = mp.Process(target=test, args=(model,))
        else:
            p = mp.Process(target=train, args=(model, rank,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

0

我只有一张GPU,当我在进行归一化和推理时,pytorch代码卡在了这一行,并没有返回任何结果,最终以超时错误结束。以下方法对我有效:

  1. 使用spawn启动方法 import torch.multiprocessing as mp mp.use_start_method('spawn', force=True) # 强制使用是必要的,因为它会返回另一个错误,即上下文已经被设置

  2. 在导入之前使用主函数(if __name__ == '__main__':)(许多stackoverflow上的答案显示start()和join()方法应该在主函数中使用,而且效果很好。但我想我正在使用几个脚本和模块,所以它无法识别正确的主函数,所以我不得不将其包含在第一个文件的第一行中)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接