« boost::asio の味をしめてきました | トップページ | 本日休みを頂いております »

2010年3月21日 (日)

boost::asio 使ったチャットソフトを作ってみました

というわけで、なんとなく動いてるっぽい段階のものですが、備忘録の意味も兼ねてここに。
ご意見、ご助言などございましたら、ぜひお願いいたします。

#pragma once
#include "stdafx.h"

// チャット用のクライアント側クラスとサーバー側クラス
// ・公開クラスを派生してハンドラ関数をオーバーライドするやり方で使う
// ・公開クラス内部でスレッドが走り、ハンドラ関数はそのスレッドから呼ばれるので要注意。
// ・公開クラスがスレッドセーフになってない。排他処理を追加すればOKなハズ。
// ・ヘッダーと実データを一組として通信する。ヘッダーのエンディアン考慮はされてない。
class CChat
{
    class CSocket
    {
        std::deque<boost::shared_ptr<const std::vector<char>>>  m_queueSend;    // 送信キュー
        struct{
            boost::uint32_t     size;       // サイズ
            std::vector<char>   vData;      // 実データ
        }m_read;
    public:
        boost::asio::ip::tcp::socket    m_socket;
        CSocket(boost::asio::io_service &ioService)
            :m_socket(ioService)
            {}

        virtual void OnDataReceive(const std::vector<char> &vData)=0;
        virtual void OnError(const boost::system::error_code &error)=0;

        // 読み込み待ち
        void Await()
            {
                struct F{
                    // データ読み込みハンドラ
                    static void OnReadBody(CSocket &This,const boost::system::error_code &error)
                        {
                            if( error ){    // Error
                                This.OnError(error);
                                This.m_socket.close();
                                return;
                            }
                            This.OnDataReceive(This.m_read.vData);  // 通知
                            This.Await();                           // 次の読み込み待ち
                        }

                    // ヘッダ読み込みハンドラ
                    static void OnReadHeader(CSocket &This,const boost::system::error_code &error)
                        {
                            if( error ){    // Error
                                This.OnError(error);
                                This.m_socket.close();
                                return;
                            }
                            // バッファ作成
                            This.m_read.vData.resize( This.m_read.size );
                            // データ読み込み待ち
                            boost::asio::async_read(
                                This.m_socket,
                                boost::asio::buffer(This.m_read.vData),
                                boost::bind(
                                    &F::OnReadBody,
                                    boost::ref(This),
                                    _1));
                        }
                };

                // ヘッダ読み込み待ち
                boost::asio::async_read(
                    m_socket,
                    boost::asio::buffer(&m_read.size,sizeof(m_read.size)),
                    boost::bind(
                        &F::OnReadHeader,
                        boost::ref(*this),
                        _1));
            }

        // 送信
        void Write(boost::shared_ptr<const std::vector<char>> spData)
            {

                struct F{
                    // データ送信ハンドラ
                    static void OnWrite(CSocket &This,const boost::system::error_code &error)
                        {
                            if( error ){    // Error
                                This.OnError(error);
                                This.m_socket.close();
                                return;
                            }

                            This.m_queueSend.pop_front();       // 送信し終えたものを削除
                            if( !This.m_queueSend.empty() ){        // 次があれば
                                F::Write(This);
                            }
                        }
                    // async_write
                    static void Write(CSocket &This)
                        {
                            boost::asio::async_write(
                                This.m_socket,
                                boost::asio::buffer(*This.m_queueSend.front()),
                                boost::bind(
                                    &F::OnWrite,
                                    boost::ref(This),
                                    boost::asio::placeholders::error));
                        }
                };

                const bool bInProgress = !m_queueSend.empty();
                m_queueSend.push_back( spData );
                if( !bInProgress ){
                    F::Write(*this);
                }
            }

        // 送信用データ作成
        static boost::shared_ptr<const std::vector<char>> CreateData(const char *pData,size_t size)
            {
                const boost::uint32_t t = size;
                std::vector<char> *p = new std::vector<char>( sizeof(t) + t );
                ::memcpy( &(*p)[0], &t, sizeof(t) );        // サイズ(エンディアン考慮はまだ)
                ::memcpy( &(*p)[sizeof(t)], pData, t );
                return boost::shared_ptr<const std::vector<char>>(p);
            }
    };

public:

    // クライアントクラス
    class CClient
    {
        class CInner : public CSocket
        {
        protected:
            void OnDataReceive(const std::vector<char> &vData)
                {
                    m_client.OnDataReceive(vData);  // 通知
                }
            void OnError(const boost::system::error_code &error)
                {
                    m_client.OnError(error);    // 通知
                }
        public:
            CClient     &m_client;
            CInner(CClient &client)
                :CSocket(client.m_ioService)
                ,m_client(client)
                {}
            ~CInner()
                {
                    if( m_client.m_thread.joinable() ){
                        m_client.m_ioService.post( boost::bind( &boost::asio::ip::tcp::socket::close, &m_socket ) );    // m_socket.close()
                        m_client.m_thread.join();
                    }
                }
        };
    private:
        boost::asio::io_service m_ioService;
        boost::thread           m_thread;
        std::auto_ptr<CInner>   m_apInner;
    public:
        CClient()
            {}
        virtual ~CClient()
            {
                BOOST_ASSERT( !m_thread.joinable() );   // Stopを明示的に呼ぶべし(ウラでハンドラ処理が動いている可能性を考慮)
                Stop();
            }

        // 接続開始
        void Run(const std::string &sDomain="127.0.0.1",const std::string &sPort="80")
            {
                Stop();
                m_apInner = std::auto_ptr<CInner>(new CInner(*this));

                struct F{
                    static void Connect(CClient &client,const std::string sDomain,const std::string sPort)
                        {
                            boost::asio::ip::tcp::resolver resolver(client.m_ioService);
                            boost::asio::ip::tcp::resolver::query query(sDomain,sPort);
                            boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);
                            CInner *pInner = client.m_apInner.get();
                            while(true){
                                try{
                                    pInner->m_socket.connect(*iterator);        // 途中で処理が挟まらないように同期関数を使う
                                    pInner->Await();                            // 読み込み待ち状態
                                    break;
                                }catch( std::exception& ){
                                    iterator++;
                                    if( iterator != boost::asio::ip::tcp::resolver::iterator() ){   // 次があれば
                                        pInner->m_socket.close();
                                        continue;
                                    }
                                    throw;
                                }
                            }
                        }
                    static void Run(CClient &client)
                        {
                            try{
                                client.m_ioService.run();
                            }catch( std::exception &e ){
                                ATLTRACE(_T("\n%s"),CString(e.what()));
                                client.OnErrorException(e);
                            }
                        }
                };
                m_ioService.post( boost::bind(&F::Connect,boost::ref(*this),sDomain,sPort) );
                m_thread = boost::thread( boost::bind(&F::Run,boost::ref(*this)) );
            }

        // 破棄
        // ・スレッドが終了されるまで待機する
        // ・破棄後は全ての呼び出しが失敗する
        // ・内部スレッドからのコールは御法度
        void Stop()
            {
                if( boost::this_thread::get_id() != m_thread.get_id() ){    // 呼び出しスレッドチェック
                    m_apInner.reset();
                }else{
                    BOOST_ASSERT(false);
                }
            }

        // メッセージ送信(送信キューに溜める)
        bool Post(const std::string &s)
            {
                return Post(s.c_str(),s.size());
            }
        // メッセージ送信(送信キューに溜める)
        bool Post(const char *pData,size_t size)
            {
                if( CInner *pInner = m_apInner.get() ){
                    if( size > 0 ){
                        m_ioService.post( boost::bind( &CInner::Write, pInner, CInner::CreateData(pData,size) ) );
                    }
                    return true;
                }
                return false;
            }
    protected:
        // データ受信ハンドラ。内部スレッドでコールされる。オーバーライドすべし。
        virtual void OnDataReceive(const std::vector<char> &vData){}
        // 例外ハンドラ。内部スレッドでコールされる。オーバーライドすべし。
        virtual void OnErrorException(const std::exception &error){}
        // エラーハンドラ。内部スレッドでコールされる。オーバーライドすべし。
        virtual void OnError(const boost::system::error_code &error){}
    };

    // サーバークラス
    class CServer
    {
        class CSession
            :public CSocket
            ,public boost::enable_shared_from_this<CSession>
        {
        public:
            CServer &m_server;
            CSession(CServer &server)
                :CSocket(server.m_ioService)
                ,m_server(server)
                {}
        protected:
            void OnDataReceive(const std::vector<char> &vData)
                {
                    m_server.OnDataReceive(m_socket,vData); // 通知
                }
            void OnError(const boost::system::error_code &error)
                {
                    {// 退出(delete)
                        struct F{
                            static void f(boost::shared_ptr<CSession> spSession)
                                {
                                    spSession->m_server.m_apInner->m_setSessions.erase( spSession );
                                }
                        };
                        m_server.m_ioService.post( boost::bind( &F::f, shared_from_this() ) );
                    }
                    m_server.OnError(m_socket,error);   // 通知
                }
        };

        class CInner
        {
        public:
            CServer                                         &m_server;
            std::auto_ptr<boost::asio::ip::tcp::acceptor>   m_apAcceptor;
            std::set<boost::shared_ptr<CSession>>           m_setSessions;

            CInner(CServer &server)
                :m_server(server)
                {}
            ~CInner()
                {
                    if( m_server.m_thread.joinable() ){
                        struct F{
                            static void f(CInner &inner)
                                {
                                    BOOST_ASSERT(inner.m_apAcceptor.get());
                                    inner.m_apAcceptor->close();
                                    for(std::set<boost::shared_ptr<CSession>>::iterator i=inner.m_setSessions.begin(); i!=inner.m_setSessions.end(); i++ ){
                                        (*i)->m_socket.close();
                                    }
                                }
                        };
                        m_server.m_ioService.post( boost::bind( &F::f, boost::ref(*this) ) );
                        m_server.m_thread.join();
                    }
                }
        };

    private:
        boost::thread           m_thread;
        boost::asio::io_service m_ioService;
        std::auto_ptr<CInner>   m_apInner;
    public:
        CServer()
            {}
        virtual ~CServer()
            {
                BOOST_ASSERT( !m_thread.joinable() );   // Stopを明示的に呼ぶべし(ウラでハンドラ処理が動いている可能性を考慮)
                Stop();
            }

        // 破棄
        // ・スレッドが終了されるまで待機する
        // ・破棄後は全ての呼び出しが失敗する
        // ・内部スレッドからのコールは御法度
        void Stop()
            {
                if( boost::this_thread::get_id() != m_thread.get_id() ){    // 呼び出しスレッドチェック
                    m_apInner.reset();
                    m_ioService.reset();
                }else{
                    BOOST_ASSERT(false);
                }
            }

        // 開始
        void Run(unsigned short nPort=80)
            {
                Stop();
                m_apInner = std::auto_ptr<CInner>(new CInner(*this));

                struct F{
                    static void OnAccept(boost::shared_ptr<CSession> spSession,const boost::system::error_code &error)
                        {
                            if( error ){
                                spSession->m_server.OnError(spSession->m_socket,error); // 通知
                                return;
                            }
                            spSession->m_server.m_apInner->m_setSessions.insert(spSession); // 入室
                            spSession->Await();                 // 読み込み待ち状態
                            F::Start(spSession->m_server);      // 次の接続待ち開始
                            spSession->m_server.OnAccept(spSession->m_socket);      // 通知
                        }

                    static void Start(CServer &server)
                        {
                            boost::shared_ptr<CSession> spSession(new CSession(server));
                            server.m_apInner->m_apAcceptor->async_accept(
                                spSession->m_socket,
                                boost::bind(&F::OnAccept,spSession,_1));
                        }

                    static void Init(CServer &server,unsigned short nPort)
                        {
                            server.m_apInner->m_apAcceptor = std::auto_ptr<boost::asio::ip::tcp::acceptor>(
                                new boost::asio::ip::tcp::acceptor( server.m_ioService, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),nPort)));
                            Start(server);
                        }

                    static void Run(CServer &server)
                        {
                            try{
                                server.m_ioService.run();
                            }catch( std::exception &e ){
                                ATLTRACE(_T("\n%s"),CString(e.what()));
                                server.OnErrorException(e);
                            }
                        }
                };
                m_ioService.post( boost::bind(&F::Init,boost::ref(*this),nPort) );
                m_thread = boost::thread( boost::bind(&F::Run,boost::ref(*this)) );
            }

        // 全clientに送信
        bool Broadcast(const char *pData,size_t size)
            {
                if( CInner *pInner = m_apInner.get() ){
                    struct F{
                        static void f(CServer &server,boost::shared_ptr<const std::vector<char>> spData)
                            {
                                std::for_each(
                                    server.m_apInner->m_setSessions.begin(),
                                    server.m_apInner->m_setSessions.end(),
                                    boost::bind( &CSession::Write, _1, spData ) );
                            }
                    };
                    m_ioService.post( boost::bind( &F::f, boost::ref(*this), CSession::CreateData(pData,size)) );
                    return true;
                }
                return false;
            }
    protected:
        // データ受信ハンドラ。内部スレッドでコールされる。オーバーライドすべし。
        virtual void OnDataReceive(boost::asio::ip::tcp::socket &socket,const std::vector<char> &vData){}
        // 接続されたハンドラ。内部スレッドでコールされる。オーバーライドすべし。
        virtual void OnAccept(boost::asio::ip::tcp::socket &socket){}
        // エラーハンドラ。内部スレッドでコールされる。オーバーライドすべし。
        virtual void OnError(boost::asio::ip::tcp::socket &socket,const boost::system::error_code &error){}
        // 例外ハンドラ。内部スレッドでコールされる。オーバーライドすべし。
        virtual void OnErrorException(const std::exception &error){}
    };

};

このクラスを使ったチャットソフトのソース一式と実行ファイルはこちらです。
VisualStudio2008のStandard版以上用です。
MDIのテンプレコードが大量に残っているので見難いのはすみません。
実行にはMFCのruntimeが必要になると思いますので、なければダウンロードお願いします。

|

« boost::asio の味をしめてきました | トップページ | 本日休みを頂いております »

コメント

この記事へのコメントは終了しました。

トラックバック


この記事へのトラックバック一覧です: boost::asio 使ったチャットソフトを作ってみました:

« boost::asio の味をしめてきました | トップページ | 本日休みを頂いております »