Linux socket编程(三) 简单的多线程聊天室

要用到多线程以及线程的读写锁,之前写的Socket类、ServerSocket要做相应的改变

因为服务器端要维持着一个存储客户端Socket信息到数据结构,当多个线程同时访问这个结构时,要做同步处理,所以要在适当的时候加上读锁或写锁。

新的ServerSocket类

#ifndef SERVERSOCKET_H
#define SERVERSOCKET_H

#include "Socket.h"
#include <list>
#include <semaphore.h>
#include "ThreadReadWriteLock.h"

using std::list;

class ServerSocket:public Socket
{
    public:
        ServerSocket(const int port);
        ServerSocket();
        virtual ~ServerSocket();

        void Accept(Socket& socket);
        //run server to connect multi-clients
        void Run();

    private:
        //accept multi-clients
        bool Accept();
        void AddClient(Socket* clientSocket);
        static void DeleteClient(Socket* clientSocket);
        static void* ProcessMessage(void* arg);
        static void SendMsgToAllUsers(const std::string& message);

        static list<Socket*> clientSockets;
        static bool serviceFlag;
        //use thread-read-write-lock to synchronize threads
        static ThreadReadWriteLock readWriteLock;
};

#endif

其中有static成员函数,因为创建一个新的线程时,要传递一个函数指针,不过类普通的成员函数的函数指针与一般的函数指针是不兼容的,所以要传递static成员函数的函数指针。

以下是ServerSocket的新实现:

ServerSocket.cpp

#include "ServerSocket.h"
#include "SocketException.h"
#include <pthread.h>
#include <iostream>

list<Socket*> ServerSocket::clientSockets;
ThreadReadWriteLock ServerSocket::readWriteLock;
bool ServerSocket::serviceFlag=true;

ServerSocket::ServerSocket(const int port)
{
      if ( ! Socket::Create() )
        {
          throw SocketException ( "Could not create server socket." );
        }

      if ( ! Socket::Bind ( port ) )
        {
          throw SocketException ( "Could not bind to port." );
        }

      if ( ! Socket::Listen() )
        {
          throw SocketException ( "Could not listen to socket." );
        }
}

ServerSocket::~ServerSocket()
{
    list<Socket*>::iterator iter;
    for(iter=clientSockets.begin();iter!=clientSockets.end();iter++)
        delete (*iter);
}

void ServerSocket::Accept(Socket& socket)
{
      if ( ! Socket::Accept ( socket ) )
        {
          throw SocketException ( "Could not accept socket." );
        }
}

bool ServerSocket::Accept()
{
    Socket* clientSocket=new Socket;
    Accept(*clientSocket);
    AddClient(clientSocket);

    //create new thread for a new client
    pthread_t newThread;
    int result=pthread_create(&newThread,NULL,ProcessMessage,static_cast<void*>(clientSocket));
    if(result!=0)
        return false;
    //detach the newThread    //so when newThread exits it can release it's resource    result=pthread_detach(newThread);    if(result!=0)        perror("Failed to detach thread");
    return true;
}

void ServerSocket::Run()
{
    while(serviceFlag)
    {
        if(clientSockets.size()>=static_cast<unsigned int>(MAXCONNECTION))
            serviceFlag=false;
        else
            serviceFlag=Accept();
        sleep(1);
    }
}

void* ServerSocket::ProcessMessage(void* arg)
{
    std::string message;
    Socket* clientSocket=static_cast<Socket*>(arg);

    Send(*clientSocket,"Welcome!");

    while(serviceFlag)
    {
        Receive(*clientSocket,message);
        if(message=="exit")
        {
            Send(*clientSocket,"user_exit");
            DeleteClient(clientSocket);
            break;
        }
        else
            SendMsgToAllUsers(message);
        sleep(1);
    }
    pthread_exit(NULL);
    return NULL;
}


void ServerSocket::AddClient(Socket* socket)
{
    if(readWriteLock.SetWriteLock())
    {
        clientSockets.push_back(socket);

        std::cout<<"Now "<<clientSockets.size()<<" users..";
        std::cout<<"New User: "<<socket->GetAddress()<<" "<<socket->GetPort()<<"n";

        readWriteLock.UnLock();
    }
    else
        serviceFlag=false;
}

void ServerSocket::DeleteClient(Socket* socket)
{
    if(readWriteLock.SetWriteLock())
    {
        list<Socket*>::iterator iter;
        for(iter=clientSockets.begin();iter!=clientSockets.end();iter++)
            if((*iter)->GetAddress()==socket->GetAddress()
               && (*iter)->GetPort()==socket->GetPort())
            {
                //delete socket* in list                delete (*iter);                clientSockets.erase(iter);
                std::cout<<"Now "<<clientSockets.size()<<" users..n";
                break;
            }
        readWriteLock.UnLock();
    }
    else
        serviceFlag=false;
}

接下来是读写锁操作的封装ThreadReadWriteLock.h

这个类封装了对线程读写锁pthread_rwlock_t的操作,这些操作包括pthread_rwlock_init,pthread_rwlock_rlock,pthread_rwlock_wrlock,pthread_rwlock_unlock

#ifndef THREADREADWRITELOCK_H
#define THREADREADWRITELOCK_H

#include <pthread.h>

class ThreadReadWriteLock
{
    public:
        ThreadReadWriteLock();
        ~ThreadReadWriteLock();

        bool SetReadLock();
        bool SetWriteLock();
        void UnLock();

    private:
        pthread_rwlock_t readWriteLock;
};

#endif

然后客户端做些许改变即可(开多一个线程接收服务器发来的信息,这样发送和接收就可以并行了)

以下是测试结果:

服务器:

Linux socket编程(三) 简单的多线程聊天室

客户端1:

Linux socket编程(三) 简单的多线程聊天室

客户端2:

Linux socket编程(三) 简单的多线程聊天室

客户端3:

Linux socket编程(三) 简单的多线程聊天室

可以看到服务器端显示的客户端数量的变化。。。。

测试一下当连接的客户端数量超过我们设置的最大连接数时的情况(在Socket.h中定义这个连接数等于5)

Linux socket编程(三) 简单的多线程聊天室

最后说一下要注意的地方

由于pthread 库不是 Linux 系统默认的库,连接时需要使用静态库 libpthread.a,否则在使用一些与线程有关的函数时会报错

如使用pthread_create会提示以下错误undefined reference to `pthread_create’,解决方法如下:

(1)使用gcc或g++要在编译中要加 -lpthread参数

(2)如果使用eclipse的话,要设置

Project->Properties->C/C++ Build->Settings->GCC C++ Linker->Libraries

在Libraries(-l)中添加pthread即可

在Libraries(-l)中添加crypto即可

最后说一下程序的不足:

(1)首先我把服务器处理消息的操作暂时都放在了ProcessMessage函数里面了,如果处理的消息很复杂的话,那么ServerSocket这个类就会很臃肿,

所以必要时要将这些功能拆分。

(2)像检测客户端是否非正常的掉线,这些操作还没做,可以通过设置Socket的keep alive来检测,就是通过一个心跳包,在服务器和客户端没有通信时,隔一段时间发送一个

数据包,若客户端没有反应则认为客户端已经掉线了。

(3)毕竟这只是个小程序,当然还有其他不足,如现在只有群聊功能,还可以加上私聊的功能。。。。。。
原文链接: https://www.cnblogs.com/-Lei/archive/2012/09/06/2672759.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/62019

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年2月9日 上午10:15
下一篇 2023年2月9日 上午10:15

相关推荐