Boost.asio的简单使用(timer,thread,io_service类)

Boost.asio的简单使用(timer,thread,io_service类) - 简单的日志 - 网易博客

Boost.asio的简单使用(timer,thread,io_service类)

2010-05-17 18:20:06| 分类:

C/C++之boost

| 标签:



|字号订阅


2. 同步Timer

本章介绍asio如何在定时器上进行阻塞等待(blocking wait).

实现,我们包含必要的头文件.

所有的asio类可以简单的通过include "asio.hpp"来调用.

  1. #include
  2. #include

此外,这个示例用到了timer,我们还要包含Boost.Date_Time的头文件来控制时间.

  1. #include

使用asio至少需要一个boost::asio::io_service对象.该类提供了访问I/O的功能.我们首先在main函数中声明它.

  1. intmain()
  2. {
  3. boost::asio::io_service io;

下一步我们声明boost::asio::deadline_timer对象.这个asio的核心类提供I/O的功能(这里更确切的说是定时功能),总是把一个io_service对象作为他的第一个构造函数,而第二个构造函数的参数设定timer会在5秒后到时(expired).

  1. boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));

这个简单的示例中我们演示了定时器上的一个阻塞等待.就是说,调用boost::asio::deadline_timer::wait()的在创建后5秒内(注意:不是等待开始后),timer到时之前不会返回任何值.

一个deadline_timer只有两种状态:到时,未到时.

如果boost::asio::deadline_timer::wait()在到时的timer对象上调用,会立即return.

  1. t.wait();

最后,我们输出理所当然的"Hello, world!"来演示timer到时了.

  1. std::cout <<"Hello, world! ";
  2. return0;
  3. }

完整的代码:

  1. #include
  2. #include
  3. #include
  4. intmain()
  5. {
  6. boost::asio::io_service io;
  7. boost::asio::deadline_timer t(io, boost::posix_time::seconds(5));
  8. t.wait();
  9. std::cout <<"Hello, world! ";
  10. return0;
  11. }

3. 异步Timer

  1. #include
  2. #include
  3. #include

asio的异步函数会在一个异步操作完成后被回调.这里我们定义了一个将被回调的函数.

  1. voidprint(constasio::error&/e/)
  2. {
  3. std::cout <<"Hello, world! ";
  4. }
  5. intmain()
  6. {
  7. asio::io_service io;
  8. asio::deadline_timer t(io, boost::posix_time::seconds(5));

这里我们调用asio::deadline_timer::async_wait()来异步等待

  1. t.async_wait(print);

最后,我们必须调用asio::io_service::run().

asio库只会调用那个正在运行的asio::io_service::run()的回调函数.

如果asio::io_service::run()不被调用,那么回调永远不会发生.

asio::io_service::run()会持续工作到点,这里就是timer到时,回调完成.

别忘了在调用asio::io_service::run()之前设置好io_service的任务.比如,这里,如果我们忘记先调用asio::deadline_timer::async_wait()asio::io_service::run()会在瞬间return.

  1. io.run();
  2. return0;
  3. }

完整的代码:

  1. #include
  2. #include
  3. #include
  4. voidprint(constasio::error&/e/)
  5. {
  6. std::cout <<"Hello, world! ";
  7. }
  8. intmain()
  9. {
  10. asio::io_service io;
  11. asio::deadline_timer t(io, boost::posix_time::seconds(5));
  12. t.async_wait(print);
  13. io.run();
  14. return0;
  15. }



4. 回调函数的参数

这里我们将每秒回调一次,来演示如何回调函数参数的含义

  1. #include
  2. #include
  3. #include
  4. #include

首先,调整一下timer的持续时间,开始一个异步等待.显示,回调函数需要访问timer来实现周期运行,所以我们再介绍两个新参数

  • 指向timer的指针
  • 一个int*来指向计数器

  • voidprint(constasio::error&/e/,

  • asio::deadline_timer t,int count)
  • {

我们打算让这个函数运行6个周期,然而你会发现这里没有显式的方法来终止io_service.不过,回顾上一节,你会发现当 asio::io_service::run()会在所有任务完成时终止.这样我们当计算器的值达到5时(0为第一次运行的值),不再开启一个新的异步等待就可以了.

  1. if(*count < 5)
  2. {
  3. std::cout << *count <<" ";
  4. ++(*count);
  5. ...

然后,我们推迟的timer的终止时间.通过在原先的终止时间上增加延时,我们可以确保timer不会在处理回调函数所需时间内的到期.

(原文:By calculating the new expiry time relative to the old, we can ensure that the timer does not drift away from the whole-second mark due to any delays in processing the handler.)

  1. t->expires_at(t->expires_at() + boost::posix_time::seconds(1));

然后我们开始一个新的同步等待.如您所见,我们用把print和他的多个参数用boost::bind函数合成一个的形为void(const asio::error&)回调函数(准确的说是function object).

在这个例子中,boost::bindasio::placeholders::error参数是为了给回调函数传入一个error对象.当进行一个异步操作,开始boost::bind时,你需要使用它来匹配回调函数的参数表.下一节中你会学到回调函数不需要error参数时可以省略它.

  1. t->async_wait(boost::bind(print,
  2. asio::placeholders::error, t, count));
  3. }
  4. }
  5. intmain()
  6. {
  7. asio::io_service io;
  8. intcount = 0;
  9. asio::deadline_timer t(io, boost::posix_time::seconds(1));

和上面一样,我们再一次使用了绑定asio::deadline_timer::async_wait()

  1. t.async_wait(boost::bind(print,
  2. asio::placeholders::error, &t, &count));
  3. io.run();

在结尾,我们打印出的最后一次没有设置timer的调用的count的值

  1. std::cout <<"Final count is "<< count <<" ";
  2. return0;
  3. }

完整的代码:

  1. #include
  2. #include
  3. #include
  4. #include
  5. voidprint(constasio::error&/e/,
  6. bsp; asio::deadline_timer t,int count)
  7. {
  8. if(*count < 5)
  9. {
  10. std::cout << *count <<" ";
  11. ++(*count);
  12. t->expires_at(t->expires_at() + boost::posix_time::seconds(1));
  13. t->async_wait(boost::bind(print,
  14. asio::placeholders::error, t, count));
  15. }
  16. }
  17. intmain()
  18. {
  19. asio::io_service io;
  20. intcount = 0;
  21. asio::deadline_timer t(io, boost::posix_time::seconds(1));
  22. t.async_wait(boost::bind(print,
  23. asio::placeholders::error, &t, &count));
  24. io.run();
  25. std::cout <<"Final count is "<< count <<" ";
  26. return0;
  27. }

5. 成员函数作为回调函数

本例的运行结果和上一节类似

  1. #include
  2. #include
  3. #include
  4. #include

我们先定义一个printer类

  1. classprinter
  2. {
  3. public:
  4. //构造函数有一个io_service参数,并且在初始化timer_时用到了它.用来计数的count_这里同样作为了成员变量
  5. printer(boost::asio::io_service& io)
  6. : timer_(io, boost::posix_time::seconds(1)),
  7. count_(0)
  8. {

boost::bind同样可以出色的工作在成员函数上.众所周知,所有的非静态成员函数都有一个隐式的this参数,我们需要把this作为参数bind到成员函数上.和上一节类似,我们再次用bind构造出void(const boost::asio::error&)形式的函数.

注意,这里没有指定boost::asio::placeholders::error占位符,因为这个print成员函数没有接受一个error对象作为参数.

  1. timer_.async_wait(boost::bind(&printer::print,this));

在类的折构函数中我们输出最后一次回调的count的值

  1. ~printer()
  2. {
  3. std::cout <<"Final count is "<< count_ <<" ";
  4. }


print函数于上一节的十分类似,但是用成员变量取代了参数.

  1. voidprint()
  2. {
  3. if(count_ < 5)
  4. {
  5. std::cout << count_ <<" ";
  6. ++count_;
  7. timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
  8. timer_.async_wait(boost::bind(&printer::print,this));
  9. }
  10. }
  11. private:
  12. boost::asio::deadline_timer timer_;
  13. intcount_;
  14. };

现在main函数清爽多了,在运行io_service之前只需要简单的定义一个printer对象.

  1. intmain()
  2. {
  3. boost::asio::io_service io;
  4. printer p(io);
  5. io.run();
  6. return0;
  7. }

完整的代码:

  1. #include
  2. #include
  3. #include
  4. #include
  5. classprinter
  6. {
  7. public:
  8. printer(boost::asio::io_service& io)
  9. : timer_(io, boost::posix_time::seconds(1)),
  10. count_(0)
  11. {
  12. timer_.async_wait(boost::bind(&printer::print,this));
  13. }
  14. ~printer()
  15. {
  16. std::cout <<"Final count is "<< count_ <<" ";
  17. }
  18. voidprint()
  19. {
  20. if(count_ < 5)
  21. {
  22. std::cout << count_ <<" ";
  23. ++count_;
  24. timer_.expires_at(timer_.expires_at() + boost::posix_time::seconds(1));
  25. timer_.async_wait(boost::bind(&printer::print,this));
  26. }
  27. }
  28. private:
  29. boost::asio::deadline_timer timer_;
  30. intcount_;
  31. };
  32. intmain()
  33. {
  34. boost::asio::io_service io;
  35. printer p(io);
  36. io.run();
  37. return0;
  38. }



6. 多线程回调同步

本节演示了使用boost::asio::strand在多线程程序中进行回调同步(synchronise).

先前的几节阐明了如何在单线程程序中用boost::asio::io_service::run()进行同步.如您所见,asio库确保 仅当当前线程调用boost::asio::io_service::run()时产生回调.显然,仅在一个线程中调用boost::asio::io_service::run()来确保回调是适用于并发编程的.

一个基于asio的程序最好是从单线程入手,但是单线程有如下的限制,这一点在服务器上尤其明显:

  • 当回调耗时较长时,反应迟钝.
  • 在多核的系统上无能为力

如果你发觉你陷入了这种困扰,可以替代的方法是建立一个boost::asio::io_service::run()的线程池.然而这样就允许回调函数并发执行.所以,当回调函数需要访问一个共享,线程不安全的资源时,我们需要一种方式来同步操作.

  1. #include
  2. #include
  3. #include
  4. #include
  5. #include

在上一节的基础上我们定义一个printer类,此次,它将并行运行两个timer

  1. classprinter
  2. {
  3. public:

除了声明了一对boost::asio::deadline_timer,构造函数也初始化了类型为boost::asio::strandstrand_成员.

boost::asio::strand可以分配的回调函数.它保证无论有多少线程调用了boost::asio::io_service::run(),下一个回调函数仅在前一个回调函数完成后开始,当然回调函数仍然可以和那些不使用boost::asio::strand分配,或是使用另一个boost::asio::strand分配的回调函数一起并发执行.

  1. printer(boost::asio::io_service& io)
  2. : strand_(io),
  3. timer1_(io, boost::posix_time::seconds(1)),
  4. timer2_(io, boost::posix_time::seconds(1)),
  5. count_(0)
  6. {

当一个异步操作开始时,用boost::asio::strand来 "wrapped(包装)"回调函数.boost::asio::strand::wrap()会返回一个由boost::asio::strand分配的新的handler(句柄),这样,我们可以确保它们不会同时运行.

  1. timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1,this)));
  2. timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2,this)));
  3. }
  4. ~printer()
  5. {
  6. std::cout <<"Final count is "<< count_ <<" ";
  7. }


多线程程序中,回调函数在访问共享资源前需要同步.这里共享资源是std::coutcount_变量.



  1. voidprint1()
  2. {
  3. if(count_ < 10)
  4. {
  5. std::cout <<"Timer 1: "<< count_ <<" ";
  6. ++count_;
  7. timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
  8. timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1,this)));
  9. }
  10. }
  11. voidprint2()
  12. {
  13. if(count_ < 10)
  14. {
  15. std::cout <<"Timer 2: "<< count_ <<" ";
  16. ++count_;
  17. timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
  18. timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2,this)));
  19. }
  20. }
  21. private:
  22. boost::asio::strand strand_;
  23. boost::asio::deadline_timer timer1_;
  24. boost::asio::deadline_timer timer2_;
  25. intcount_;
  26. };

main函数中boost::asio::io_service::run()在两个线程中被调用:主线程、一个boost::thread线程.

正如单线程中那样,并发的boost::asio::io_service::run()会一直运行直到完成任务.后台的线程将在所有异步线程完成后终结.



  1. intmain()
  2. {
  3. boost::asio::io_service io;
  4. printer p(io);
  5. boost::threadt(boost::bind(&boost::asio::io_service::run, &io));
  6. io.run();
  7. t.join();
  8. return0;
  9. }

完整的代码:

  1. #include
  2. #include
  3. #include
  4. #include
  5. #include
  6. classprinter
  7. {
  8. public:
  9. printer(boost::asio::io_service& io)
  10. : strand_(io),
  11. timer1_(io, boost::posix_time::seconds(1)),
  12. timer2_(io, boost::posix_time::seconds(1)),
  13. count_(0)
  14. {
  15. timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1,this)));
  16. timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2,this)));
  17. }
  18. ~printer()
  19. {
  20. std::cout <<"Final count is "<< count_ <<" ";
  21. }
  22. voidprint1()
  23. {
  24. if(count_ < 10)
  25. {
  26. std::cout <<"Timer 1: "<< count_ <<" ";
  27. ++count_;
  28. timer1_.expires_at(timer1_.expires_at() + boost::posix_time::seconds(1));
  29. timer1_.async_wait(strand_.wrap(boost::bind(&printer::print1,this)));
  30. }
  31. }
  32. voidprint2()
  33. {
  34. if(count_ < 10)
  35. {
  36. std::cout <<"Timer 2: "<< count_ <<" ";
  37. ++count_;
  38. timer2_.expires_at(timer2_.expires_at() + boost::posix_time::seconds(1));
  39. timer2_.async_wait(strand_.wrap(boost::bind(&printer::print2,this)));
  40. }
  41. }
  42. private:
  43. boost::asio::strand strand_;
  44. boost::asio::deadline_timer timer1_;
  45. boost::asio::deadline_timer timer2_;
  46. intcount_;
  47. };
  48. intmain()
  49. {
  50. boost::asio::io_service io;
  51. printer p(io);
  52. boost::threadt(boost::bind(&boost::asio::io_service::run, &io));
  53. io.run();
  54. t.join();
  55. return0;
  56. }





7. TCP客户端:对准时间

  1. #include
  2. #include
  3. #include
    本程序的目的是访问一个时间同步服务器,我们需要用户指定一个服务器(如time-nw.nist.gov),用IP亦可.

    (译者注:日期查询协议,这种时间传输协议不指定固定的传输格式,只要求按照ASCII标准发送数据。)
  4. usingboost::asio::ip::tcp;
  5. intmain(intargc,char* argv[])
  6. {
  7. try
  8. {
  9. if(argc != 2)
  10. {
  11. std::cerr <<"Usage: client "<< std::endl;
  12. return1;
  13. }
    用asio进行网络连接至少需要一个boost::asio::io_service对象
  14. boost::asio::io_service io_service;


    我们需要把在命令行参数中指定的服务器转换为TCP上的节点.完成这项工作需要boost::asio::ip::tcp::resolver对象
  15. tcp::resolver resolver(io_service);


    一个resolver对象查询一个参数,并将其转换为TCP上节点的列表.这里我们把argv[1]中的sever的名字和要查询字串daytime关联.
  16. tcp::resolver::query query(argv[1],"daytime");


    节点列表可以用boost::asio::ip::tcp::resolver::iterator来进行迭代.iterator默认的构造函数生成一个end iterator.
  17. tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
  18. tcp::resolver::iterator end;
    现在我们建立一个连接的sockert,由于获得节点既有IPv4也有IPv6的.所以,我们需要依次尝试他们直到找到一个可以正常工作的.这步使得我们的程序独立于IP版本
  19. tcp::socket socket(io_service);
  20. boost::asio::error error = boost::asio::error::host_not_found;
  21. while(error && endpoint_iterator != end)
  22. {
  23. socket.close();
  24. socket.connect(*endpoint_iterator++, boost::asio::assign_error(error));
  25. }
  26. if(error)
  27. throwerror;
    连接完成,我们需要做的是读取daytime服务器的响应.

    我们用boost::array来保存得到的数据,boost::asio::buffer()会自动根据array的大小暂停工作,来防止缓冲溢出.除了使用boost::array,也可以使用char []std::vector.
  28. for(;;)
  29. {
  30. boost::array<char, 128> buf;
  31. boost::asio::error error;
  32. size_tlen = socket.read_some(
  33. boost::asio::buffer(buf), boost::asio::assign_error(error));
    当服务器关闭连接时,boost::asio::ip::tcp::socket::read_some()会用boost::asio::error::eof标志完成, 这时我们应该退出读取循环了.
  34. if(error == boost::asio::error::eof)
  35. break;// Connection closed cleanly by peer.
  36. elseif(error)
  37. throwerror;// Some other error.
  38. std::cout.write(buf.data(), len);
  39. 如果发生了什么异常我们同样会抛出它
  40. }
  41. catch(std::exception& e)
  42. {
  43. std::cerr << e.what() << std::endl;
  44. }


    运行示例:在windowsXPcmd窗口下

    输入:upload.exetime-a.nist.gov
    输出:54031 06-10-23 01:50:45 07 0 0 454.2 UTC(NIST) *

完整的代码:

  1. #include
  2. #include
  3. #include
  4. usingasio::ip::tcp;
  5. intmain(intargc,char* argv[])
  6. {
  7. try
  8. {
  9. if(argc != 2)
  10. {
  11. std::cerr <<"Usage: client "<< std::endl;
  12. return1;
  13. }
  14. asio::io_service io_service;
  15. tcp::resolver resolver(io_service);
  16. tcp::resolver::query query(argv[1],"daytime");
  17. tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
  18. tcp::resolver::iterator end;
  19. tcp::socket socket(io_service);
  20. asio::error error = asio::error::host_not_found;
  21. while(error && endpoint_iterator != end)
  22. {
  23. socket.close();
  24. socket.connect(*endpoint_iterator++, asio::assign_error(error));
  25. }
  26. if(error)
  27. throwerror;
  28. for(;;)
  29. {
  30. boost::array<char, 128> buf;
  31. asio::error error;
  32. size_tlen = socket.read_some(
  33. asio::buffer(buf), asio::assign_error(error));
  34. if(error == asio::error::eof)
  35. break;// Connection closed cleanly by peer.
  36. elseif(error)
  37. throwerror;// Some other error.
  38. std::cout.write(buf.data(), len);
  39. }
  40. }
  41. catch(std::exception& e)
  42. {
  43. std::cerr << e.what() << std::endl;
  44. }
  45. return0;
  46. }

8. TCP同步时间服务器

  1. #include
  2. #include
  3. #include
  4. #include
  5. usingasio::ip::tcp;
    我们先定义一个函数返回当前的时间的string形式.这个函数会在我们所有的时间服务器示例上被使用.
  6. std::string make_daytime_string()
  7. {
  8. usingnamespacestd;// For time_t, time and ctime;
  9. time_tnow = time(0);
  10. returnctime(&now);
  11. }
  12. intmain()
  13. {
  14. try
  15. {
  16. asio::io_service io_service;
    新建一个asio::ip::tcp::acceptor对象来监听新的连接.我们监听TCP端口13,IP版本为V4
  17. tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));


    这是一个iterative server,也就是说同一时间只能处理一个连接.建立一个socket来表示一个和客户端的连接, 然后等待客户端的连接.
  18. for(;;)
  19. {
  20. tcp::socket socket(io_service);
  21. acceptor.accept(socket);
    当客户端访问服务器时,我们获取当前时间,然后返回它.
  22. std::string message = make_daytime_string();
  23. asio::write(socket, asio::buffer(message),
  24. asio::transfer_all(), asio::ignore_error());
  25. }
  26. }
    最后处理异常
  27. catch(std::exception& e)
  28. {
  29. std::cerr << e.what() << std::endl;
  30. }
  31. return0;
  32. 运行示例:运行服务器,然后运行上一节的客户端,在windowsXPcmd窗口下

    输入:client.exe127.0.0.1

    输出:Mon Oct 23 09:44:48 2006

完整的代码:

  1. #include
  2. #include
  3. #include
  4. #include
  5. usingasio::ip::tcp;
  6. std::string make_daytime_string()
  7. {
  8. usingnamespacestd;// For time_t, time and ctime;
  9. time_tnow = time(0);
  10. returnctime(&now);
  11. }
  12. intmain()
  13. {
  14. try
  15. {
  16. asio::io_service io_service;
  17. tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 13));
  18. for(;;)
  19. {
  20. tcp::socket socket(io_service);
  21. acceptor.accept(socket);
  22. std::string message = make_daytime_string();
  23. asio::write(socket, asio::buffer(message),
  24. asio::transfer_all(), asio::ignore_error());
  25. }
  26. }
  27. catch(std::exception& e)
  28. {
  29. std::cerr << e.what() << std::endl;
  30. }
  31. return0;
  32. }

原文链接: https://www.cnblogs.com/lexus/archive/2012/09/26/2703959.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/63901

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年2月9日 上午11:11
下一篇 2023年2月9日 上午11:12

相关推荐