在另一个 QThread 中处理 QTcpSocket

3
我需要在单独的QThread上处理传入的tcp连接。在成功客户端身份验证后,相应的套接字应存储在QList对象中。
[简化的主/服务器端应用程序]
class Server : public QObject
{
    Q_OBJECT

public:
    Server();

private:
    QList<QTcpSocket*> m_connections;
    QTcpServer m_server;
    void handleIncomingConnection();
    void handleWaiterThread();

private slots:
    void treatFinishedWaiterThread();
}

根据函数定义: handleIncomingConnection() 槽与服务器对象(m_server)的 newConnection() 信号相连接。
void Server::handleIncomingConnection()
{
    QThread *waiter = new QThread();
    connect(waiter, SIGNAL(started()), this, SLOT(handleWaiterThread()));
    connect(waiter, SIGNAL(finished()), this, SLOT(treatFinishedWaiterThread()));
    moveToThread(waiter);
    waiter->start();
}

void Server::handleWaiterThread()
{
    // fetch requesting socket
    QTcpSocket *socket = m_server->nextPendingConnection();

    // HANDLE PASSWORD AUTHENTICATION HERE ...
    // IF SUCCESSFUL, CONTINUE
    
    connect(socket, SIGNAL(disconnected()), this, SLOT(clientDisconnected()));

    // add to list
    m_connections.append(socket);
}

void Server::treatFinishedWaiterThread()
{
    QThread *caller = qobject_cast<QThread*>(sender());
    caller->deleteLater();
}

如果我尝试运行这个程序,线程会被创建但是当它们完成后没有发出SIGNAL信号,所以我无法在之后删除线程。此外,我还会收到以下消息:
QObject::moveToThread: Widgets cannot be moved to a new thread
如何解决这个问题?


[01.06.2016]

根据 QTcpServer::nextPendingConnection() 的说明:
返回的 QTcpSocket 对象不能从另一个线程中使用。如果您想要在另一个线程中使用传入的连接,需要重写 incomingConnection()。
因此最终我必须创建另一个继承自 QTcpServer 的类。


[01.07.2016 #1]

我修改了我的代码,并添加了自定义的服务器和服务员线程类。

[自定义服务器类]

class CustomServer : public QTcpServer
{
    Q_OBJECT

public:
    WServer(QObject* = nullptr) : QTcpServer(parent) {}

signals:
    void connectionRequest(qintptr);

protected:
    void incomingConnection(qintptr socketDescriptor)
    {
        emit connectionRequest(socketDescriptor);
    }
};

[自定义线程类]

class Waiter : public QThread
{
    Q_OBJECT

public:
    Waiter(qintptr socketDescriptor, QObject *parent = nullptr)
        : QThread(parent)
    {
        // Create socket
        m_socket = new QTcpSocket(this);
        m_socket->setSocketDescriptor(socketDescriptor);
    }

signals:
    void newSocket(QTcpSocket*);

protected:
    void run()
    {
        // DO STUFF HERE
        msleep(2500);
        
        emit newSocket(m_socket);
    }

private:
    QTcpSocket *m_socket;
};

新的主类
class ServerGUI : public QWidget
{
    Q_OBJECT

public:
    Server(QObject*);

private:
    QList<QTcpSocket*> m_connections;
    CustomServer m_server;

private slots:
    void handleConnectionRequest(qintptr);
    void handleNewSocket(QTcpSocket*);
}

void CustomServer::handleConnectionRequest(qintptr socketDescriptor)
{
    Waiter *nextWaiter = new Waiter(socketDescriptor, this);
    connect(nextWaiter, SIGNAL(newSocket(QTcpSocket*)), this, SLOT(handleNewSocket(QTcpSocket*)));
    connect(nextWaiter, SIGNAL(finished()), this, SLOT(deleteLater()));
    nextWaiter->start();
}

void CustomServer::handleNewSocket(QTcpSocket *socket)
{
    // DO STUFF HERE ...

    connect(socket, SIGNAL(disconnected()), this, SLOT(clientDisconnected()));

    // FINALLY ADD TO ACTIVE-CLIENT LIST ...
}

信号和槽特定设置:

由于CustomServer被定义为我的主窗口类的类成员(m_server)(处理GUI;称为ServerGUI), m_serverconnectionRequest(qintptr)信号与ServerGUI实例的handleConnectionRequest(qintptr)槽连接。

但现在我的应用程序在启动后立即崩溃,在调试窗口中显示以下消息:

HEAP[qtapp.exe]: Invalid address specified to RtlValidateHeap( 000002204F430000, 0000006E0090F4C0 )

可能是什么原因?




[01.10.2016 #2]

我根据user2014561的答案修改了我的代码。

对于CustomServer

class CustomServer : public QTcpServer
{
    Q_OBJECT

public:
    WServer(QHostAddress, quint16, quint16, QObject* = nullptr);
    ~WServer();

    void kickAll();
    void kickClient(qintptr);

    QHostAddress localAddress() const;
    quint16 serverPort() const;
    bool isReady() const;
    bool alreadyConnected(qintptr) const;
    bool clientLimitExhausted() const;

signals:
    void clientConnected(qintptr);
    void clientDisconnected(qintptr);

private slots:
    void destroyedfunc(QObject*);

    // JUST FOR TESTING PURPOSES
    void waiterFinished();

private:
    QList<ServerPeer*> m_connections;
    quint16 m_maxAllowedClients;
    bool m_readyState;

    void incomingConnection(qintptr);
};

对于kickAll()

void WServer::kickAll()
{
    while (!m_connections.isEmpty())
    {
        ServerPeer *peer = m_connections.first();
        QEventLoop loop;
        connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit())); // ### PROBLEM ENCOUNTERED HERE
        QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
        loop.exec();
    }
}

对于 kickClient(qintptr)

void WServer::kickClient(qintptr client_id)
{
    foreach (ServerPeer *peer, m_connections)
    {
        bool peerState;
        QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection,
            Q_RETURN_ARG(bool, peerState), Q_ARG(qintptr, client_id));
        if (peerState)
        {
            QEventLoop loop;
            connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
            QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
            loop.exec();
            break;
        }
    }
}

对于destroyedfunc(QObject*)函数:

void CustomServer::destroyedfunc(QObject *obj)
{
    ServerPeer *peer = static_cast<ServerPeer*>(obj);
    m_connections.removeAll(peer);
}

对于 incomingConnection(qintptr)

void WServer::incomingConnection(qintptr handle)
{
    ServerPeer *peer = new ServerPeer();
    QThread *waiter = new QThread();

    m_connections.append(peer); // add to list
    peer->moveToThread(waiter);
    
    // notify about client connect
    connect(peer, SIGNAL(connected(qintptr)), this, SIGNAL(clientConnected(qintptr)));
    // stop waiter thread by indirectly raising finished() signal
    connect(peer, SIGNAL(finished()), waiter, SLOT(quit()));
    // notify about client disconnect
    connect(peer, SIGNAL(disconnected(qintptr)), this, SIGNAL(clientDisconnected(qintptr)));
    // remove client from list
    connect(peer, SIGNAL(destroyed(QObject*)), this, SLOT(destroyedfunc(QObject*)));
    // notify about finished waiter thread; only for debug purposes
    connect(waiter, SIGNAL(finished()), this, SLOT(waiterFinished()));
    // remove waiter thread when finished
    connect(waiter, SIGNAL(finished()), waiter, SLOT(deleteLater()));

    QMetaObject::invokeMethod(peer, "start", Qt::QueuedConnection,
        Q_ARG(qintptr, handle));

    waiter->start();
}

对于 ServerPeer

class ServerPeer : public QObject
{
    Q_OBJECT

public:
    ServerPeer(QObject* = nullptr);
    ~ServerPeer();

    bool hasSocket(qintptr) const;

signals:
    void connected(qintptr);
    void disconnected(qintptr);
    void finished();

public slots:
    void start(qintptr);
    void disconnect();

private slots :
    void notifyConnect();
    void notifyDisconnect();

private:
    QTcpSocket *m_peer;
    qintptr m_id;
};

对于 ServerPeer(QObject*)

ServerPeer::ServerPeer(QObject *parent) : QObject(parent), m_peer(nullptr)
{

}

对于~ServerPeer()

ServerPeer::~ServerPeer()
{
    disconnect();
}

对于 start(qintptr)

void ServerPeer::start(qintptr handle)
{
    qDebug() << "New waiter thread has been started.";

    m_peer = new QTcpSocket(this);
    if (!m_peer->setSocketDescriptor(handle))
    {
        this->deleteLater();
        return;
    }

    if (true /*verification here*/)
    {
        connect(m_peer, SIGNAL(disconnected()), this, SLOT(notifyDisconnect()));
        connect(m_peer, SIGNAL(disconnected()), this, SLOT(deleteLater()));

        // manually do connected notification
        QTimer::singleShot(0, this, SLOT(notifyConnect()));
    }
    else
    {
        this->deleteLater();
    }
    
    emit finished();
}

对于 disconnect()

void ServerPeer::disconnect()
{
    if (m_peer != nullptr)
    {
        if (m_peer->state() != QAbstractSocket::SocketState::ClosingState
            && m_peer->state() != QAbstractSocket::SocketState::UnconnectedState)
            m_peer->abort();

        delete m_peer;
        m_peer = nullptr;
    }
}

对于notifyConnect()

void ServerPeer::notifyConnect()
{
    emit connected(m_peer);
}

对于notifyDisconnect()

void ServerPeer::notifyDisconnect()
{
    emit disconnected(m_peer);
}

针对ServerGUI

class ServerGUI : public QWidget
{
    Q_OBJECT

public:
    ServerGUI(QWidget* = nullptr);

private:
    Ui::ServerWindow ui;
    CustomServer *m_server;

private slots:
    // For further handling, e.g. updating client view
    void handleNewClient(qintptr);
    void handleRemovedClient(qintptr);
}

对于 ServerGUI(QWidget*):

ServerGUI::ServerGUI(QWidget *parent) : QWidget(parent)
{
    // initialize gui elements;
    // GENERATED WITH ACCORDING *.ui FILE
    ui.setupUi(this);
    
    m_server = new WServer(QHostAddress::LocalHost, 1234, 2, this);
    if (!m_server->isReady())
    {
        qDebug() << "Server could not start!";
        delete m_server;
        m_server = nullptr;
        return;
    }
    
    connect(m_server, SIGNAL(clientConnected(qintptr)), this, SLOT(handleNewClient(qintptr)));
    connect(m_server, SIGNAL(clientDisconnected(qintptr)), this, SLOT(handleRemovedClient(qintptr)));
}

这是我的主函数:

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    ServerGUI w;
    
    w.show();
    return a.exec();
}

在给定的代码中,如果我尝试踢出(选定的)客户端,则会弹出以下消息:

QMetaObject::invokeMethod: 没有 ServerPeer::hasSocket(qintptr) 方法

QObject::connect: 无法将 (null)::destroyed() 连接到 QEventLoop::quit()

如何解决这个问题?


class Server { 你不应该继承QObject吗? - Arpegius
你说得对,已经修复编辑问题。 - neuronalbit
@neuronalbit,你的编辑是朝错误的方向迈出的一步。需要在单独的线程中完成哪些处理?服务器对象是否有子项?这将解释movetothread失败的原因。 - UmNyobe
@UmNyobe请参考我于2016年1月7日的新内容。新的服务员线程的目的是单独处理每个传入的连接,并完成客户端身份验证(包括密码检查)。如果身份验证失败(例如由于提交了错误的密码或客户端超时),则服务员通过不发出connectionRequest(qintptr)信号来丢弃连接。否则,“ServerGUI”实例最终必须使用相应的套接字描述符创建套接字。 - neuronalbit
1个回答

1
如果我理解正确,您想在单独的线程上运行服务器的每个对等体,如果是这样,那么以下内容可能会对您有所帮助:
  1. 创建一个QTcpServer子类
  2. 重新实现incomingConnection()方法
  3. 创建一个QThreadServerPeer的实例(无父对象)并启动线程
  4. 进行SIGNAL - SLOT连接,以从列表中删除对等体并删除线程和对等体实例
  5. ServerPeer添加到您的QList
  6. 一旦开始,进行凭据验证; 如果拒绝它们,则中止连接

编辑,注意事项:

你没有得到connectedSIGNAL,因为当你将socketDescriptor设置为已连接时,所以你可以简单地假设在setSocketDescriptor之后,socket已连接并且可以按照你的意愿进行操作。

关于关闭时出现的错误,这是因为你没有正确释放线程,请查看我的编辑,了解如何解决这个问题。

最后,QTcpSocket不能被不同的线程访问,如果你需要从另一个线程调用ServerPeer,请使用QMetaObject::invokeMethodQueuedConnectionBlockingQueuedConnectionSIGNALSLOT机制。

编辑2:

现在,服务器和它的对等体将在MainWindow::closeEvent上被删除,这样你就可以看到断开连接的函数被调用。我猜问题取决于类的删除顺序。

您可以通过套接字与其交互,包括向其发送数据,但我认为最好使用已经提到的Qt跨线程调用方法,这样更加简便。在我的示例中,您可以轻松地向特定对等方或所有对等方编写内容。

customserver.h:

//Step 1

#include <QtCore>
#include <QtNetwork>
#include "serverpeer.h"

class CustomServer : public QTcpServer
{
    Q_OBJECT
public:
    explicit CustomServer(const QHostAddress &host, quint16 port, quint16 maxconnections, QObject *parent = nullptr);
    ~CustomServer();

    void kickAll();
    void kickClient(qintptr id);
    void writeData(const QByteArray &data, qintptr id);
    void writeData(const QByteArray &data);

    QHostAddress localAddress();
    quint16 serverPort();
    bool isReady();

signals:
    void clientConnected(qintptr);
    void clientDisconnected(qintptr);

private slots:
    void destroyedfunc(QObject *obj);

private:
    void incomingConnection(qintptr handle);

    QList<ServerPeer*> m_connections;
    int m_maxAllowedClients;
};

customserver.cpp:

#include "customserver.h"

CustomServer::CustomServer(const QHostAddress &host, quint16 port, quint16 maxconnections, QObject *parent) :
    m_maxAllowedClients(maxconnections), QTcpServer(parent)
{
    listen(host, port);
}

CustomServer::~CustomServer()
{
    kickAll();
}

//Step 2
void CustomServer::incomingConnection(qintptr handle)
{
    // handle client limit
    if (m_connections.size() >= m_maxAllowedClients)
    {
        qDebug() << "Can't allow new connection: client limit reached!";
        QTcpSocket *socket = new QTcpSocket();
        socket->setSocketDescriptor(handle);
        socket->abort();
        delete socket;
        return;
    }

    //Step 3
    ServerPeer *peer = new ServerPeer();
    QThread *waiter = new QThread();

    peer->moveToThread(waiter);

    //Step 4
    connect(peer, SIGNAL(connected(qintptr)), this, SIGNAL(clientConnected(qintptr)));
    connect(peer, SIGNAL(disconnected(qintptr)), this, SIGNAL(clientDisconnected(qintptr)));
    connect(peer, SIGNAL(destroyed()), waiter, SLOT(quit()));
    connect(peer, SIGNAL(destroyed(QObject*)), this, SLOT(destroyedfunc(QObject*)));
    connect(waiter, SIGNAL(finished()), waiter, SLOT(deleteLater()));

    QMetaObject::invokeMethod(peer, "start", Qt::QueuedConnection, Q_ARG(qintptr, handle));

    waiter->start();

    //Step 5
    m_connections.append(peer);
}

void CustomServer::kickAll()
{
    while (!m_connections.isEmpty())
    {
        ServerPeer *peer = m_connections.first();
        QEventLoop loop;
        connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
        QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
        loop.exec();
    }
}

void CustomServer::kickClient(qintptr id)
{
    foreach (ServerPeer *peer, m_connections)
    {
        ServerPeer::State hassocket;
        QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection, Q_RETURN_ARG(ServerPeer::State, hassocket), Q_ARG(qintptr, id));
        if (hassocket == ServerPeer::MyTRUE)
        {
            QEventLoop loop;
            connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
            QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
            loop.exec();
            break;
        }
    }
}

void CustomServer::writeData(const QByteArray &data)
{
    foreach (ServerPeer *peer, m_connections)
        QMetaObject::invokeMethod(peer, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, data));
}

void CustomServer::writeData(const QByteArray &data, qintptr id)
{
    foreach (ServerPeer *peer, m_connections)
    {
        ServerPeer::State hassocket;
        QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection, Q_RETURN_ARG(ServerPeer::State, hassocket), Q_ARG(qintptr, id));
        if (hassocket == ServerPeer::MyTRUE)
        {
            QMetaObject::invokeMethod(peer, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, data));
            break;
        }
    }
}

QHostAddress CustomServer::localAddress()
{
    return QTcpServer::serverAddress();
}

quint16 CustomServer::serverPort()
{
    return QTcpServer::serverPort();
}

bool CustomServer::isReady()
{
    return QTcpServer::isListening();
}

void CustomServer::destroyedfunc(QObject *obj)
{
    ServerPeer *peer = static_cast<ServerPeer*>(obj);
    m_connections.removeAll(peer);
}

serverpeer.h:

#include <QtCore>
#include <QtNetwork>

class ServerPeer : public QObject
{
    Q_OBJECT
public:
    explicit ServerPeer(QObject *parent = nullptr);
    ~ServerPeer();

    enum State
    {
        MyTRUE,
        MyFALSE
    };

signals:
    void connected(qintptr id);
    void disconnected(qintptr id);

public slots:
    ServerPeer::State hasSocket(qintptr id);
    void start(qintptr handle);
    void writeData(const QByteArray &data);

private slots:
    void readyRead();
    void notifyConnect();
    void notifyDisconnect();

private:
    QTcpSocket *m_peer;
    qintptr m_id;
};

serverpeer.cpp:

#include "serverpeer.h"

ServerPeer::ServerPeer(QObject *parent) : QObject(parent), m_peer(nullptr)
{

}

ServerPeer::~ServerPeer()
{
    if (m_peer)
        m_peer->abort();
}

ServerPeer::State ServerPeer::hasSocket(qintptr id)
{
    if (m_id == id)
        return MyTRUE;
    else
        return MyFALSE;
}
void ServerPeer::start(qintptr handle)
{
    m_peer = new QTcpSocket(this);
    m_peer->setSocketDescriptor(handle);

    //Step 6
    if (true /*verification here*/)
    {
        m_id = handle;
        QTimer::singleShot(0, this, SLOT(notifyConnect()));
        connect(m_peer, SIGNAL(readyRead()), this, SLOT(readyRead()));
        connect(m_peer, SIGNAL(disconnected()), this, SLOT(notifyDisconnect()));
        connect(m_peer, SIGNAL(disconnected()), this, SLOT(deleteLater()));
    }
    else
    {
        m_peer->abort();
        this->deleteLater();
    }
}

void ServerPeer::readyRead()
{
    qDebug() << m_peer->readAll() << QThread::currentThread();
}

void ServerPeer::writeData(const QByteArray &data)
{
    m_peer->write(data);
    m_peer->flush();
}

void ServerPeer::notifyConnect()
{
    emit connected(m_id);
}

void ServerPeer::notifyDisconnect()
{
    emit disconnected(m_id);
}

mainwindow.cpp:

MainWindow::MainWindow(QWidget *parent) :
    QMainWindow(parent),
    ui(new Ui::MainWindow)
{
    ui->setupUi(this);

    qRegisterMetaType<qintptr>("qintptr");

    m_server = new CustomServer(QHostAddress::LocalHost, 1024, 2, this);

    if (!m_server->isReady())
    {
        qDebug() << "Server could not start!";
        delete m_server;
        m_server = nullptr;
        return;
    }

    connect(m_server, SIGNAL(clientConnected(qintptr)), this, SLOT(handleNewClient(qintptr)));
    connect(m_server, SIGNAL(clientDisconnected(qintptr)), this, SLOT(handleRemovedClient(qintptr)));
}

MainWindow::~MainWindow()
{
    delete ui;
}

void MainWindow::closeEvent(QCloseEvent *)
{
    if (m_server)
    {
        delete m_server;
        m_server = nullptr;
    }
}

void MainWindow::handleNewClient(qintptr id)
{
    qDebug() << __FUNCTION__ << id;
    m_server->writeData(QString("Hello client id: %0\r\n").arg(id).toLatin1(), id);
    m_server->writeData(QString("New client id: %0\r\n").arg(id).toLatin1());
}

void MainWindow::handleRemovedClient(qintptr id)
{
    qDebug() << __FUNCTION__ << id;
}

我已经根据你的答案修改了我的代码,但是出现了一个错误,也许你可以快速看一下这个问题 :) - neuronalbit
看起来很吸引人,我现在正在手动发出 connected(QTcpSocket*) 信号。目前我正在尝试弄清楚为什么如果我在 ServerPeer 析构函数中使用 abort(),我的套接字不会抛出 disconnected() 信号!只是为了澄清:我只需要在开始时处理多个客户端连接请求的线程,以便单独处理它们的身份验证/超时。之后,我只需要在列表中使用普通的 ServerPeer 对象,因此只应删除线程。稍后我需要向客户端发送消息,因此我肯定必须使用他们的套接字。 - neuronalbit
我已经添加了一个如何在 QMetaObject::invokeMethod 中接收枚举的示例。 - Antonio Dias
@neuronalbit:QMetaObject::invokeMethod 只能调用 SIGNALSLOT,所以在你的情况下,hasSocket 必须放在 public slots 中而不是只放在 public 中。更多信息请参见文档 - Antonio Dias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接