boost::asio 使ったチャットソフトを作ってみました
というわけで、なんとなく動いてるっぽい段階のものですが、備忘録の意味も兼ねてここに。
ご意見、ご助言などございましたら、ぜひお願いいたします。
#pragma once
#include "stdafx.h"
// チャット用のクライアント側クラスとサーバー側クラス
// ・公開クラスを派生してハンドラ関数をオーバーライドするやり方で使う
// ・公開クラス内部でスレッドが走り、ハンドラ関数はそのスレッドから呼ばれるので要注意。
// ・公開クラスがスレッドセーフになってない。排他処理を追加すればOKなハズ。
// ・ヘッダーと実データを一組として通信する。ヘッダーのエンディアン考慮はされてない。
class CChat
{
class CSocket
{
std::deque<boost::shared_ptr<const std::vector<char>>> m_queueSend; // 送信キュー
struct{
boost::uint32_t size; // サイズ
std::vector<char> vData; // 実データ
}m_read;
public:
boost::asio::ip::tcp::socket m_socket;
CSocket(boost::asio::io_service &ioService)
:m_socket(ioService)
{}
virtual void OnDataReceive(const std::vector<char> &vData)=0;
virtual void OnError(const boost::system::error_code &error)=0;
// 読み込み待ち
void Await()
{
struct F{
// データ読み込みハンドラ
static void OnReadBody(CSocket &This,const boost::system::error_code &error)
{
if( error ){ // Error
This.OnError(error);
This.m_socket.close();
return;
}
This.OnDataReceive(This.m_read.vData); // 通知
This.Await(); // 次の読み込み待ち
}
// ヘッダ読み込みハンドラ
static void OnReadHeader(CSocket &This,const boost::system::error_code &error)
{
if( error ){ // Error
This.OnError(error);
This.m_socket.close();
return;
}
// バッファ作成
This.m_read.vData.resize( This.m_read.size );
// データ読み込み待ち
boost::asio::async_read(
This.m_socket,
boost::asio::buffer(This.m_read.vData),
boost::bind(
&F::OnReadBody,
boost::ref(This),
_1));
}
};
// ヘッダ読み込み待ち
boost::asio::async_read(
m_socket,
boost::asio::buffer(&m_read.size,sizeof(m_read.size)),
boost::bind(
&F::OnReadHeader,
boost::ref(*this),
_1));
}
// 送信
void Write(boost::shared_ptr<const std::vector<char>> spData)
{
struct F{
// データ送信ハンドラ
static void OnWrite(CSocket &This,const boost::system::error_code &error)
{
if( error ){ // Error
This.OnError(error);
This.m_socket.close();
return;
}
This.m_queueSend.pop_front(); // 送信し終えたものを削除
if( !This.m_queueSend.empty() ){ // 次があれば
F::Write(This);
}
}
// async_write
static void Write(CSocket &This)
{
boost::asio::async_write(
This.m_socket,
boost::asio::buffer(*This.m_queueSend.front()),
boost::bind(
&F::OnWrite,
boost::ref(This),
boost::asio::placeholders::error));
}
};
const bool bInProgress = !m_queueSend.empty();
m_queueSend.push_back( spData );
if( !bInProgress ){
F::Write(*this);
}
}
// 送信用データ作成
static boost::shared_ptr<const std::vector<char>> CreateData(const char *pData,size_t size)
{
const boost::uint32_t t = size;
std::vector<char> *p = new std::vector<char>( sizeof(t) + t );
::memcpy( &(*p)[0], &t, sizeof(t) ); // サイズ(エンディアン考慮はまだ)
::memcpy( &(*p)[sizeof(t)], pData, t );
return boost::shared_ptr<const std::vector<char>>(p);
}
};
public:
// クライアントクラス
class CClient
{
class CInner : public CSocket
{
protected:
void OnDataReceive(const std::vector<char> &vData)
{
m_client.OnDataReceive(vData); // 通知
}
void OnError(const boost::system::error_code &error)
{
m_client.OnError(error); // 通知
}
public:
CClient &m_client;
CInner(CClient &client)
:CSocket(client.m_ioService)
,m_client(client)
{}
~CInner()
{
if( m_client.m_thread.joinable() ){
m_client.m_ioService.post( boost::bind( &boost::asio::ip::tcp::socket::close, &m_socket ) ); // m_socket.close()
m_client.m_thread.join();
}
}
};
private:
boost::asio::io_service m_ioService;
boost::thread m_thread;
std::auto_ptr<CInner> m_apInner;
public:
CClient()
{}
virtual ~CClient()
{
BOOST_ASSERT( !m_thread.joinable() ); // Stopを明示的に呼ぶべし(ウラでハンドラ処理が動いている可能性を考慮)
Stop();
}
// 接続開始
void Run(const std::string &sDomain="127.0.0.1",const std::string &sPort="80")
{
Stop();
m_apInner = std::auto_ptr<CInner>(new CInner(*this));
struct F{
static void Connect(CClient &client,const std::string sDomain,const std::string sPort)
{
boost::asio::ip::tcp::resolver resolver(client.m_ioService);
boost::asio::ip::tcp::resolver::query query(sDomain,sPort);
boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);
CInner *pInner = client.m_apInner.get();
while(true){
try{
pInner->m_socket.connect(*iterator); // 途中で処理が挟まらないように同期関数を使う
pInner->Await(); // 読み込み待ち状態
break;
}catch( std::exception& ){
iterator++;
if( iterator != boost::asio::ip::tcp::resolver::iterator() ){ // 次があれば
pInner->m_socket.close();
continue;
}
throw;
}
}
}
static void Run(CClient &client)
{
try{
client.m_ioService.run();
}catch( std::exception &e ){
ATLTRACE(_T("\n%s"),CString(e.what()));
client.OnErrorException(e);
}
}
};
m_ioService.post( boost::bind(&F::Connect,boost::ref(*this),sDomain,sPort) );
m_thread = boost::thread( boost::bind(&F::Run,boost::ref(*this)) );
}
// 破棄
// ・スレッドが終了されるまで待機する
// ・破棄後は全ての呼び出しが失敗する
// ・内部スレッドからのコールは御法度
void Stop()
{
if( boost::this_thread::get_id() != m_thread.get_id() ){ // 呼び出しスレッドチェック
m_apInner.reset();
}else{
BOOST_ASSERT(false);
}
}
// メッセージ送信(送信キューに溜める)
bool Post(const std::string &s)
{
return Post(s.c_str(),s.size());
}
// メッセージ送信(送信キューに溜める)
bool Post(const char *pData,size_t size)
{
if( CInner *pInner = m_apInner.get() ){
if( size > 0 ){
m_ioService.post( boost::bind( &CInner::Write, pInner, CInner::CreateData(pData,size) ) );
}
return true;
}
return false;
}
protected:
// データ受信ハンドラ。内部スレッドでコールされる。オーバーライドすべし。
virtual void OnDataReceive(const std::vector<char> &vData){}
// 例外ハンドラ。内部スレッドでコールされる。オーバーライドすべし。
virtual void OnErrorException(const std::exception &error){}
// エラーハンドラ。内部スレッドでコールされる。オーバーライドすべし。
virtual void OnError(const boost::system::error_code &error){}
};
// サーバークラス
class CServer
{
class CSession
:public CSocket
,public boost::enable_shared_from_this<CSession>
{
public:
CServer &m_server;
CSession(CServer &server)
:CSocket(server.m_ioService)
,m_server(server)
{}
protected:
void OnDataReceive(const std::vector<char> &vData)
{
m_server.OnDataReceive(m_socket,vData); // 通知
}
void OnError(const boost::system::error_code &error)
{
{// 退出(delete)
struct F{
static void f(boost::shared_ptr<CSession> spSession)
{
spSession->m_server.m_apInner->m_setSessions.erase( spSession );
}
};
m_server.m_ioService.post( boost::bind( &F::f, shared_from_this() ) );
}
m_server.OnError(m_socket,error); // 通知
}
};
class CInner
{
public:
CServer &m_server;
std::auto_ptr<boost::asio::ip::tcp::acceptor> m_apAcceptor;
std::set<boost::shared_ptr<CSession>> m_setSessions;
CInner(CServer &server)
:m_server(server)
{}
~CInner()
{
if( m_server.m_thread.joinable() ){
struct F{
static void f(CInner &inner)
{
BOOST_ASSERT(inner.m_apAcceptor.get());
inner.m_apAcceptor->close();
for(std::set<boost::shared_ptr<CSession>>::iterator i=inner.m_setSessions.begin(); i!=inner.m_setSessions.end(); i++ ){
(*i)->m_socket.close();
}
}
};
m_server.m_ioService.post( boost::bind( &F::f, boost::ref(*this) ) );
m_server.m_thread.join();
}
}
};
private:
boost::thread m_thread;
boost::asio::io_service m_ioService;
std::auto_ptr<CInner> m_apInner;
public:
CServer()
{}
virtual ~CServer()
{
BOOST_ASSERT( !m_thread.joinable() ); // Stopを明示的に呼ぶべし(ウラでハンドラ処理が動いている可能性を考慮)
Stop();
}
// 破棄
// ・スレッドが終了されるまで待機する
// ・破棄後は全ての呼び出しが失敗する
// ・内部スレッドからのコールは御法度
void Stop()
{
if( boost::this_thread::get_id() != m_thread.get_id() ){ // 呼び出しスレッドチェック
m_apInner.reset();
m_ioService.reset();
}else{
BOOST_ASSERT(false);
}
}
// 開始
void Run(unsigned short nPort=80)
{
Stop();
m_apInner = std::auto_ptr<CInner>(new CInner(*this));
struct F{
static void OnAccept(boost::shared_ptr<CSession> spSession,const boost::system::error_code &error)
{
if( error ){
spSession->m_server.OnError(spSession->m_socket,error); // 通知
return;
}
spSession->m_server.m_apInner->m_setSessions.insert(spSession); // 入室
spSession->Await(); // 読み込み待ち状態
F::Start(spSession->m_server); // 次の接続待ち開始
spSession->m_server.OnAccept(spSession->m_socket); // 通知
}
static void Start(CServer &server)
{
boost::shared_ptr<CSession> spSession(new CSession(server));
server.m_apInner->m_apAcceptor->async_accept(
spSession->m_socket,
boost::bind(&F::OnAccept,spSession,_1));
}
static void Init(CServer &server,unsigned short nPort)
{
server.m_apInner->m_apAcceptor = std::auto_ptr<boost::asio::ip::tcp::acceptor>(
new boost::asio::ip::tcp::acceptor( server.m_ioService, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(),nPort)));
Start(server);
}
static void Run(CServer &server)
{
try{
server.m_ioService.run();
}catch( std::exception &e ){
ATLTRACE(_T("\n%s"),CString(e.what()));
server.OnErrorException(e);
}
}
};
m_ioService.post( boost::bind(&F::Init,boost::ref(*this),nPort) );
m_thread = boost::thread( boost::bind(&F::Run,boost::ref(*this)) );
}
// 全clientに送信
bool Broadcast(const char *pData,size_t size)
{
if( CInner *pInner = m_apInner.get() ){
struct F{
static void f(CServer &server,boost::shared_ptr<const std::vector<char>> spData)
{
std::for_each(
server.m_apInner->m_setSessions.begin(),
server.m_apInner->m_setSessions.end(),
boost::bind( &CSession::Write, _1, spData ) );
}
};
m_ioService.post( boost::bind( &F::f, boost::ref(*this), CSession::CreateData(pData,size)) );
return true;
}
return false;
}
protected:
// データ受信ハンドラ。内部スレッドでコールされる。オーバーライドすべし。
virtual void OnDataReceive(boost::asio::ip::tcp::socket &socket,const std::vector<char> &vData){}
// 接続されたハンドラ。内部スレッドでコールされる。オーバーライドすべし。
virtual void OnAccept(boost::asio::ip::tcp::socket &socket){}
// エラーハンドラ。内部スレッドでコールされる。オーバーライドすべし。
virtual void OnError(boost::asio::ip::tcp::socket &socket,const boost::system::error_code &error){}
// 例外ハンドラ。内部スレッドでコールされる。オーバーライドすべし。
virtual void OnErrorException(const std::exception &error){}
};
};
このクラスを使ったチャットソフトのソース一式と実行ファイルはこちらです。
VisualStudio2008のStandard版以上用です。
MDIのテンプレコードが大量に残っているので見難いのはすみません。
実行にはMFCのruntimeが必要になると思いますので、なければダウンロードお願いします。
| 固定リンク
この記事へのコメントは終了しました。












コメント