boost库开发笔记(二):boost::asio异步网络编程
boost::asio
boost::asio是C++高级异步网络编程库,主要用于异步编程、调度和通讯,包括定时器、信号处理、异步执行、socket等基本功能。
本文围绕了asio入门的基础、常用接口部件等进行讨论,从简单的asio对象开始,分成异步编程和网络编程两大板块,并且相应地结合源码进行记录。本文会随着项目进展更新,相信会成为比较完备的ASIO入门体系。
boost::asio异步编程
asio::io_context
每个asio程序至少存在一个io_context对象(boost
1.66前称io_service),io_context是asio工作的核心,其提供了一个事件循环队列机制(类似Qt),作为调度器完成事件调度;io_context本身不直接与通信对象通信,而是管理一系列IO资源如定时器、socket等。
asio定时器
asio支持三种定时器,为asio::system_timer、asio::steady_timer和asio::high_resolution_timer,另一种旧版本的deadline_timer已经被弃用;其中:
- asio::system_timer:与系统时间相关,用于执行与系统定时相关任务; 
- asio::steady_timer:使用独立、不受系统时间影响的时钟,适合执行普通定时、超时任务; 
- asio::high_resolution_timer:高精度高分辨率定时时钟。 
同步等待
steady_timer是最常用的任务定时器,支持同步等待和异步执行两种定时操作:
同步等待: 1
2
3asio::io_context ioc;
asio::steady_timer timer(ioc, asio::chrono::seconds(5));
timer.wait();5s;
异步执行
该代码实用异步操作去执行一个handler函数,这个函数需满足void(*)(const boost::system::error_code&)类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
using namespace std;
namespace asio = boost::asio;
void time_handler(const boost::system::error_code& error){
    std::this_thread::sleep_for(std::chrono::seconds(5));
    if(!error){
        cout << "time_Handler Called!"<<endl;
    }
    else{
        cout << "ERROR Code:" << error.value() << "-Info:" << error.message() <<endl;
    }
}
int main(){
    asio::io_context ioc;
    asio::steady_timer timer(ioc, asio::chrono::seconds(5));
    cout<< "1111111"<< endl;
    timer.async_wait(&time_handler);
    for(int i=0; i<10; i++){
        cout << "i=" <<i << endl;
    }
    ioc.run();
    for(int i=10; i<20; i++){
        cout << "i=" <<i << endl;
    }
    return 0;
}
io_context管理一系列异步对象,定时器的异步执行是最简单的一种,当异步对象初始化完成,io_context(以下均简称ioc)计数器会加1,代表待处理的异步事件数加1,异步事件开始处理的标志是ioc.run(),当对应事件返回,计数器减1,当计数器归零,ioc.run()才会返回,因此ioc.run()的作用并不是启动一个线程去执行异步任务,而是在调用者线程开始去处理事件循环队列中的异步任务,所以ioc.run()阻塞了10到20的打印任务。
周期性定时
通过设置timer的过期时间,当定时器过期,执行过期函数,并且重置过期时间,可以实现周期性定时目的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
using namespace std;
namespace asio = boost::asio;
class PeriodicTimer{
public:
    PeriodicTimer(asio::io_context& ioc,int period): timer(ioc){
        start(period);
    }
    void start(int period){
        if(cnt>=10){
            timer.cancel();
            return;
        }
        timer.expires_after(asio::chrono::milliseconds(period)); //定时器设定1s过期
        timer.async_wait([this,period](const boost::system::error_code& ec){
            if(!ec){ //过期后执行
                cout << "Called times:"<< ++cnt <<endl;
                start(period); //递归设定新过期周期 
            }
            else{
                cout << "Error" << ec.value() <<"-" << ec.message() <<endl;
            }
        });
    }
private:
    asio::steady_timer timer;
    int cnt = 0;
};
int main(){
    asio::io_context ioc;
    PeriodicTimer pridTimer(ioc, 500) ; //0.5定时执行一次
    ioc.run();
    return 0;
}
asio::io_context的重启
当下一次还需要定时时,仅需重新构造一个定时类?当写下这段代码,你会发现ioc返回后,尽管再提交任务也没有处理:
1
2
3
4
5
6
7
8
9asio::io_context ioc;
PeriodicTimer pridTimer(ioc, 500); //0.5定时执行一次
ioc.run();
//doSomething else
for(int i=0; i<10; i++){
    cout<< "i=" << i <<endl;
}
PeriodicTimer pridTimer1(ioc, 500) ; //0.5定时执行一次
ioc.run();
所以,当始终使用一个ioc管理时,当它从run返回时,必须进行手动的重启,才能使其重新处理事件循环:
1
2
3
4
5
6
7
8
9
10
11
12
13
14int main(){
    asio::io_context ioc;
    PeriodicTimer pridTimer(ioc, 500);
    ioc.run();
    //doSomething else
    for(int i=0; i<10; i++){
        cout<< "i=" << i <<endl;
    }
    if(ioc.stopped()) //you have to restart
        ioc.restart(); 
    PeriodicTimer pridTimer1(ioc, 500) ; 
    ioc.run();
    return 0;
}
控制ioc精确的返回给代码和逻辑带来复杂度,因此ioc可以通过警醒的方式,使其在无事件循环任务时也不会直接在run返回,只需要在每次构造ioc时,为其构造一个guard对象即可:
1
2asio::io_context ioc;
asio::executor_work_guard<asio::io_context::executor_type> work_guard(asio::make_work_guard(ioc)); //使其在无事件时保持忙碌,而不返回
具体使用参考超时检查。
函数执行的超时检查
当发生超时,函数退出,事件循环返回,当无超时时,函数打印无超时,事件循环阻塞(因为work_guard的效果):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
using namespace std;
namespace asio = boost::asio;
//必须使用public继承,否则抛出空weak_ptr异常
class TimerTask : public std::enable_shared_from_this<TimerTask>{
public:
TimerTask(asio::io_context& ioc, int timeout):
work_(asio::make_work_guard(ioc)), 
timer_(ioc, asio::chrono::seconds(timeout)){}
void start(){
    auto self(shared_from_this());
    timer_.async_wait([this, self](const boost::system::error_code& ec){
        if(!ec){
            cout << "Time OUT Happen" <<endl;
            work_.reset(); //允许ioc.run()返回
        }
    });
    //以下是工作函数
    asio::post(timer_.get_executor(), [this, self](){
    for(int i=0; i<10; i++){ //模拟工作
        cout << "i=" << i <<endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(500)); //模拟超时
        //超时发生,终止执行
        if(timer_.expiry() <= asio::steady_timer::clock_type::now())
            return;
    }
    timer_.cancel(); //工作函数完无超时取消定时器
    cout << "Task finished without timeout!" <<endl;
    });
}
private:
    asio::steady_timer timer_;
    asio::executor_work_guard<asio::io_context::executor_type> work_;
};
int main(){
    asio::io_context ioc;
    //enable_from_this类对象必须使用共享指针管理
    std::shared_ptr<TimerTask> timerTask = std::make_shared<TimerTask>(ioc, 6); 
    timerTask->start();
    ioc.run(); 
    return 0;
}
asio::post/defer/dispatch
除了定时器的async_wait,还有若干种方法可以向ioc提交执行函数,其区别是:
- asio::post:将任务提交到事件循环队列排队,当 - ioc.run()被调用时开始消费队列,在事件被处理时,对应函数被执行,asio::post能保证事件的执行顺序按照post提交顺序进行(但多线程调度时顺序则还决定于线程调度器),post仅仅提交handler任务,不保留上下文信息;
- asio::defer:仍是将任务提交到事件循环排队处理,其和post最大的两个区别是:一是直接将defer提交到ioc不会保证事件处理顺序就是提交顺序(存在事件性能优化,所以一般结合strand使用),二是defer提交的不仅仅是handler函数,还保留调用者的上下文信息,对于配合strand串行执行/协程调度等有重要作用。 
- asio::dispatch:最大的特点是,当前线程正在执行 - ioc.run()的上下文,而恰巧有通过dispatch提交的handler(例如来自事件循环中某些事件调用了dispatch接口),dispatch线程检查到当前线程就是目标ioc所在线程,那么该handler会立刻被执行;反之,如果当前线程并非在执行- ioc.run的上下文,那么它和post一样都会进入到事件循环等待消费,同时也不保留上下文信息。
如: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17asio::io_context ioc;
asio::post(ioc, [&ioc](){
    cout << "Task Begin" << endl;
    asio::dispatch(ioc, [](){
        cout << "dispatch func Called!" <<endl;
    });
    cout << "Task End" << endl;
});
asio::post(ioc, [&ioc](){
    cout << "Task Begin1" << endl;
    asio::post(ioc, [](){
        cout << "post func1 Called!" <<endl;
    });
    cout << "Task End1" << endl;
});
ioc.run();post是按照队列顺序执行,而dispatch则立刻调度。
1
2
3
4
5
6Task Begin
dispatch func Called!
Task End
Task Begin1
Task End1
post func1 Called!
关于defer问题延后讨论。
asio::strand
ioc是一种事件循环调度器,上述所有例子在单线程下是没问题的,ioc能很好维护事件队列运行,但是多线程下,单单依赖ioc是有问题的,例如它无法解决两个线程同时访问共享数据或者同时进入执行同一个异步任务的问题。
此处我们使用多个线程同一个的ioc对象去竞争执行异步任务,如果异步任务被两个线程同时执行,可能表现在读取到同一个num,导致出现相同的num打印,至少能看出不规范的打印乱码。输出虽然没有看到num值相同的竞争情况,因为加法运算对线程而言太过于简单,但是也看到输出了乱码,说明线程间是存在同时调用异步任务的情况:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
using namespace std;
namespace asio = boost::asio;
class Task{
public:
Task(asio::io_context& ioc):ioc_(ioc){}
void start(){
    for(int i=0; i<30; i++){
        asio::post(ioc_, [this](){
            cout << "num=" << num++ << " id: "<<this_thread::get_id()<< endl;
        });
    }
}
private:
    asio::io_context& ioc_;
    int num = 0;
};
int main(){
    asio::io_context ioc;
    Task task(ioc);
    std::vector<std::thread> threadPool;
    auto lambda = [&ioc](){
        ioc.run();
    };
    task.start();
    for(int i=0; i<20; i++){
        threadPool.emplace_back(lambda);
    }
    for(auto& t:threadPool){
        t.join();
    }
    return 0;
}
asio::strand的作用在于会将executor(此处是指ioc)的事件回调函数也通过队列管理起来,确保同一时刻只有一个线程在执行异步回调函数,避免了竞态条件。这也可以看出strand实现串行的同时,是会导致性能衰减的,所以不具备race
condition的回调,应该使用不同的strand对象管理,strand基本使用:
1
2
3
4
5
6
7
8//成员声明:
asio::strand<asio::io_context::executor_type> strand_;
//使用ioc初始化:
Task(asio::io_context& ioc):ioc_(ioc), strand_(asio::make_strand(ioc)){}
//post时对象为strand而不是原生executor:
asio::post(strand_, [](){
    cout << "num =" << num++ << " id: "<<this_thread::get_id()<< endl;
});
boost::asio网络编程
基于TCP的异步双线程CS架构
通过asio完成异步操作(类似Qt的connect),在读线程中当连接请求、消息到达,server加入事件循环等待被消费处理,客户端同理,同时server和client都会通过写线程中轮询输入流,当数据输入时将字符信息发送出去,以下:
服务端代码: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
using namespace std; 
namespace asio = boost::asio;
class Server{
public:
    Server(asio::io_context& ioc, int port):
    acceptor_(ioc, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)),
    socket_(ioc){
    accept();
}
private:
void accept(){
    acceptor_.async_accept(socket_, [this](const boost::system::error_code& ec){
        if(!ec){
            cout << "Client connected" <<endl;
            start_read();
            start_write();
        }
    });
}
void start_read(){
    static int num =0;
    socket_.async_read_some(asio::buffer(readBuffer_),[this](const boost::system::error_code& ec, std::size_t length){
        if(!ec){
            cout << "Received:" << std::string(readBuffer_.data(), length) <<endl;
            start_read();
        }
    });
}
void start_write(){
    std::thread([this](){
        try{
            while(1){
                std::string message;
                std::getline(std::cin, message);
                asio::write(socket_, asio::buffer(message));
            }
        }catch(std::exception& e){
            cerr << "ERROR:" << e.what() <<endl;
        }
    }).detach();
}
private:
    asio::ip::tcp::acceptor acceptor_;
    asio::ip::tcp::socket socket_;
    std::array<char, MAXBUFFER> readBuffer_;
};
int main(){
    cout << "server start..." << endl;
    asio::io_context ioc;
    asio::executor_work_guard<asio::io_context::executor_type> work_guard(asio::make_work_guard(ioc));
    Server server(ioc,8888);
    ioc.run();
    return 0;
}
客户端: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
using namespace std;
namespace asio = boost::asio;
class Client{
public:
Client(asio::io_context& ioc, const std::string& host, int port):socket_(ioc){
    asio::ip::tcp::resolver resolver(ioc);
    auto endpoints = resolver.resolve(host, std::to_string(port));
    asio::async_connect(socket_, endpoints, \
        [this](const boost::system::error_code& ec, asio::ip::tcp::endpoint /*endpoint*/){
        if(!ec){
            cout << "Connected success!"<<endl;
            start_read();
            start_write();
        }
    });
}
void start_read(){
    socket_.async_read_some(asio::buffer(readBuffer_),\
        [this](const boost::system::error_code& ec, std::size_t length){
        if(!ec){
            cout << "Received:" << std::string(readBuffer_.data(), length) <<endl;
            start_read();
        }
    });
}
void start_write(){
    std::thread([this](){
        try{
            while(1){
                std::string message;
                std::getline(std::cin, message);
                asio::write(socket_, asio::buffer(message));
            }
        }catch(std::exception& e){
            cerr << "ERROR:" << e.what() <<endl;
        }
    }).detach();
}
private:
    asio::ip::tcp::socket socket_;
    std::array<char,MAXBUFFER> readBuffer_;
};
int main(){
    cout << "client start..." <<endl;
    asio::io_context ioc;
    asio::executor_work_guard<asio::io_context::executor_type> work_guard(asio::make_work_guard(ioc));
    Client client(ioc, "127.0.0.1",8888);
    ioc.run();
    return 0;
}
拓展:多客户端响应的server设计
基于上述server,引入客户端id记忆就可以异步响应多个客户端的连接和信息请求,也可以向指定TCP客户端发送消息或者广播,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
using namespace std; 
namespace asio = boost::asio;
class Server{
public:
    Server(asio::io_context& ioc, int port):
    acceptor_(ioc, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port))
    /*,socket_(ioc)*/{
    accept(ioc);
}
private:
void accept(asio::io_context& ioc){
    auto socket_ = std::make_shared<asio::ip::tcp::socket>(ioc);
    acceptor_.async_accept(*socket_, \
        [this, socket_, &ioc](const boost::system::error_code& ec){
        if(!ec){
            clientGroup.push_back(socket_);
            cout << "Client connected" << ++clientId <<endl;
            start_read(socket_,clientId);
            start_write();
        }
        accept(ioc);
    });
}
void start_read(std::shared_ptr<asio::ip::tcp::socket> socket_, int id){
    static int num =0;
    socket_->async_read_some(asio::buffer(readBuffer_),\
        [this,socket_,id](const boost::system::error_code& ec, std::size_t length){
        if(!ec){
            cout << "Received from " << id << " : " \
                <<std::string(readBuffer_.data(), length)\
                <<endl;
            start_read(socket_,id);
        }
    });
}
void start_write(){
    std::thread([this](){
        try{
            while(1){
                std::string message;
                std::getline(std::cin, message);
                stringstream ss(message);
                string temp;
                //要求服务器发送消息使用空格间隔:如0 xxxxx,代表向0客户端发送xxx消息
                std::getline(ss, temp, ' '); //第一个空格前数字作为发送客户端编号,-1代表广播,0起为有效数字
                message.erase(0, temp.length()+1); //过滤编号信息才是要发送的信息
                int id = stoi(temp);
                if(id<-1 || id > clientId){
                    cerr << "Invalid Input, No Such Id in clientGroup!"<<endl;
                }
                else if(id == -1){
                    for(const auto& pos : clientGroup){
                        asio::write(*pos, asio::buffer(message));
                    }
                }
                else
                    asio::write(*(clientGroup[id]), asio::buffer(message));
            }
        }catch(std::exception& e){
            cerr << "ERROR:" << e.what() <<endl;
        }
    }).detach();
}
private:
    asio::ip::tcp::acceptor acceptor_;
    //asio::ip::tcp::socket socket_;
    std::array<char, MAXBUFFER> readBuffer_;
    int clientId = -1;
    std::vector<std::shared_ptr<asio::ip::tcp::socket>> clientGroup;
};
int main(){
    cout << "server start..." << endl;
    asio::io_context ioc;
    asio::executor_work_guard<asio::io_context::executor_type> work_guard(asio::make_work_guard(ioc));
    Server server(ioc,8888);
    ioc.run();
    return 0;
}
后记
记录asio中额外的技术细节。
asio::thread_pool
boost::asio::thread_pool定义了一种线程池,内部仍然是使用ioc管理的线程对象,用法类似std::vector<std::thread>,通过post向其提交任务可加入到任务队列处理,往往使用这种提交去异步处理一些短暂的任务;如果只需要开单线程、且希望串行按需执行提交任务,使用strand(注释部分):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
using namespace std;
namespace asio = boost::asio;
int main(){
    const int ThreadNum = 4; 
    asio::thread_pool pool(ThreadNum);
    //asio::strand<asio::thread_pool::executor_type> strand(pool.get_executor());
    for(int i=0; i<ThreadNum; i++){
        asio::post(pool, [i](){ //参数换strand
            cout << std::this_thread::get_id() << " - " << i <<endl;
        });
    }
    pool.join();
    return 0;
}
asio::thread_pool与package_task
当asio::post接收了package_task参数,重载函数返回类型不是void,而是future1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
using namespace std;
namespace asio = boost::asio;
int sum(int a, int b){
    return a+b;
}
int main(){
    const int ThreadNum = 1; 
    asio::thread_pool pool(ThreadNum);
    asio::strand<asio::thread_pool::executor_type> strand(pool.get_executor());
    //封装异步任务,packaged_task介绍参考本站另外的文章
    std::packaged_task<int(void)> task(std::bind(sum, 1,2)); //bind指定参数,task无需
    //std::future<int> result = task.get_future(); ///无需手动get_future
    auto result = asio::post(strand, std::move(task));
    cout << result.get() << endl;
    pool.join();
    return 0;
}
asio::io_context指定线程
有时候会看到一些代码: 1
asio::io_context ioc(5);
1
unsigned int thread_count = std::thread::hardware_concurrency();
参考链接:
