使用MQ进行多线程

4

我在多线程环境下使用MQSeries Perl模块时遇到了问题。以下是我尝试过的方法:

  • 在不同的线程中使用$mqMgr = MQSeries::QueueManager->new()创建两个句柄。我以为这会给我两个连接到MQ,但实际上第二次调用MQOPEN()返回码为2219,这可能意味着我从两个单独的调用new()方法中得到了相同的底层连接。
  • 只声明一个$mqMgr作为全局共享变量。但我无法将MQSeries::QueueManager对象的引用分配给$mqMgr。原因是:"传递给threads::shared::share的参数1的类型必须是[$@%]之一(而不是子例程条目)"
  • 只声明一个$mqMgr作为全局变量。仍然得到相同的2219代码。
  • 尝试将MQCNO_HANDLE_SHARE_NO_BLOCK传递到MQSeries::QueueManager->new()中,以便可以在线程间共享单个连接。但我找不到一种方法来传递它。

我的问题是,使用Perl模块MQSeries:

  • 如何/能否从不同的线程获取到与MQ队列管理器的单独连接?
  • 如何/能否在不同的线程之间共享与MQ队列管理器的连接?

我已经搜索过,但运气不佳,任何信息将不胜感激。

相关问题:


更新1:添加一个例子,即在两个线程中使用两个本地的MQSeries :: QueueManager对象会导致MQ错误代码2219。
use threads;
use Thread::Queue;
use MQSeries;
use MQSeries::QueueManager;
use MQSeries::Queue;

# globals
our $jobQ = Thread::Queue->new();
our $resultQ = Thread::Queue->new();

# ----------------------------------------------------------------------------
# sub routines
# ----------------------------------------------------------------------------

sub worker {
    # fetch work from $jobQ and put result to $resultQ
    # ...
}

sub monitor {
    # fetch result from $resultQ and put it onto another MQ queue
    my $mqQMgr = MQSeries::QueueManager->new( ... );

    # different queue from the one in main
    # this would cause error with MQ code 2219
    my $mqQ = MQSeries::Queue->new( ... );

    while (defined(my $result = $resultQ->dequeue())) {
        # create an mq message and put it into $mqQ
        my $mqMsg = MQSeries::Message->new();
        $mqQ->put($mqMsg);
    }   
}

# main
unless (caller()) {
    # create connection to MQ
    my $mqQMgr = MQSeries::QueueManager->new( ... );
    my $mqQ = MQSeries::Queue->new( ... );

    # create worker and monitor thread
    my @workers;
    for (1 .. $nThreads) {
        push(@workers, threads->create('worker'));
    }
    my $monitor = threads->create('monitor');

    while (True) {
        my $mqMsg = MQSeries::Message->new ();

        my $retCode = $mqQ->get(
            Message => $mqMsg,
            GetMsgOptions => $someOption,
            Wait => $sometime
        );

        die("error") if ($retCode == 0);
        next if ($retCode == -1); # no message

        # not we have some job to do
        $jobQ->enqueue($mqMsg->Data);
    }
}

可能包含一些示例代码会很有用 - 例如 MVCE - Sobrique
@Sobrique 感谢您的建议,已添加示例。 - Tianwei Chen
好的。刚注意到如果在Win32上,mqseries会执行一些注册表操作。 - Sobrique
1个回答

2
尝试在模块中使用多线程时,存在非常真实的危险,因为该模块可能不是线程安全的。由于线程的工作方式,有很多事情可能会出现问题。你克隆当前进程状态,包括文件句柄、套接字等。但是,如果你以异步/线程化的方式使用它们,它们将表现得非常奇怪,因为这些操作不是(必然)原子的。因此,除非你知道其他情况,否则请假设不能在线程之间共享。它可能是线程安全的,也可能不是。即使它看起来正常,但在并发条件下,你仍可能遇到难以解决的错误,导致竞争条件。共享标量/列表在threads::shared中明确描述为基本安全的(即使如此,如果没有锁定,你仍可能遇到非原子性的问题)。因此,我建议你需要做的是:1.拥有一个“通信”线程,执行与模块相关的所有工作,并使其他线程使用IPC与其通信。Thread::Queue可以很好地完成这项工作。2.将每个线程视为模块目的上完全独立的。这包括加载(使用require和import-而不是use,因为use的作用更早)和实例化。你可能能够通过在线程启动之前“加载”模块来解决问题,但实例化会执行一些操作,例如创建描述符、套接字等。3.在进行原子操作时锁定资源。以上大部分内容也适用于fork并行性-但不完全相同,因为fork使“共享”东西变得更加困难,因此你不太可能遇到这种情况。编辑:查看你发布的代码并与MQSeries源进行交叉引用后:1.存在一个BEGIN块,在你使用MQSeries时设置了一些内容。虽然我不能确定这是否是你的问题,但它让我非常警惕-请记住,在它执行此操作时,它设置了一些内容-然后当你的线程开始时,它们继承了该“BEGIN”块期间“所做的任何事情”的非共享副本。

鉴于我之前的建议 - 我建议你尝试(因为我不能确定,因为我没有参考实现):

require MQSeries; 
MQSeries->import;

将以下代码放入您的代码中,在线程启动之后使用,代替use。例如,在您执行creates并在线程子程序中执行within后使用。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接