Archive

Posts Tagged ‘coroutine’

Coroutines in C++/Boost

January 19th, 2016 No comments

Starting with 1.56, boost/asio provides asio::spawn() to work with coroutines. Just paste the sample code here, with minor modifications:

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <iostream>
using namespace std;
using boost::asio::ip::tcp;


class session: public boost::enable_shared_from_this<session>
{
public:
    explicit session(boost::asio::io_service &io_service)
        : socket_(io_service), timer_(io_service), strand_(io_service)
    {
    }
    tcp::socket &socket()
    {
        return socket_;
    }
    void go()
    {
        boost::asio::spawn(strand_, boost::bind(&session::echo, shared_from_this(), _1));
        boost::asio::spawn(strand_, boost::bind(&session::timeout, shared_from_this(), _1));
    }
private:
    void echo(boost::asio::yield_context yield)
    {
        try {
            char data[128];
            while (true) {
                timer_.expires_from_now(boost::posix_time::seconds(10));
                size_t n = socket_.async_read_some(boost::asio::buffer(data), yield);
                boost::asio::async_write(socket_, boost::asio::buffer(data, n), yield);
            }
        } catch (exception &) {
            socket_.close();
            timer_.cancel();
        }
    }
    void timeout(boost::asio::yield_context yield)
    {
        while (socket_.is_open()) {
            boost::system::error_code ignored_ec;
            timer_.async_wait(yield[ignored_ec]);
            if (timer_.expires_from_now() <= boost::posix_time::seconds(0)) {
                socket_.close();
            }
        }
    }
    tcp::socket socket_;
    boost::asio::deadline_timer timer_;
    boost::asio::io_service::strand strand_;
};

void do_accept(boost::asio::io_service &io_service, unsigned short port, boost::asio::yield_context yield)
{
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port));
    while (true) {
        boost::system::error_code ec;
        boost::shared_ptr<session> new_session(new session(io_service));
        acceptor.async_accept(new_session->socket(), yield[ec]);
        if (!ec) {
            new_session->go();
        }
    }
}

int main()
{
    try {
        boost::asio::io_service io_service;
        boost::asio::spawn(io_service, boost::bind(do_accept, boost::ref(io_service), 2222, _1));
        io_service.run();
    } catch (exception &e) {
        cerr << "Exception: " << e.what() << endl;
    }
    return 0;
}

The Python in my previous article can be used to work with the code above. I also tried to write a TCP server with only boost::coroutines classes. select() is used, since I want the code to be platform independent. NOTE: with coroutines, we have only _one_ thread.

#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#include <windows.h>
#pragma comment(lib, "ws2_32.lib")
#pragma warning(disable: 4996)
#define sock_send(s, str, len)      send(s, str, len, 0)
#define sock_close(s)               closesocket(s)
#else
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#define sock_send(s, str, len)      send(s, str, len, MSG_NOSIGNAL)
#define sock_close(s)               close(s)
#endif
#include <cerrno>
#include <cstdio>
#include <cstring>
#include <iostream>
#include <list>
#include <boost/bind.hpp>
#include <boost/coroutine/all.hpp>
#include <boost/shared_ptr.hpp>
using namespace std;


#ifdef _WIN32
struct Win32SocketWrapper
{
    Win32SocketWrapper()
    {
        WSADATA wsaData;
        WSAStartup(0x0202, &wsaData);
    }
    ~Win32SocketWrapper()
    {
        WSACleanup();
    }
} g_win32_socket_wrapper;
#endif


class session
{
    typedef boost::coroutines::symmetric_coroutine<void> coro_t;
public:
    explicit session(int sock)
        : socket_(sock)
    {
        echo_coro_ = coro_t::call_type(boost::bind(&session::echo, this, _1));
    }
    int socket()
    {
        return socket_;
    }
    void go()
    {
        echo_coro_();
    }
    void echo(coro_t::yield_type &yield)
    {
        int rc;
        char buffer[128];
        while (true) {
            memset(buffer, 0, sizeof(buffer));
            yield(); rc = recv(socket_, buffer, sizeof(buffer), 0);
            if (rc == 0 || rc == -1) { /* close or error */
                printf("socket[%d] closed, rc=%d..\n", socket_, rc);
                sock_close(socket_);
                socket_ = -1;
                /* do not release here, or the whole coroutine context will be invalid.. */
                break;
            } else {
                sock_send(socket_, buffer, rc);
            }
        }
    }
private:
    int socket_;
    coro_t::call_type echo_coro_;
};

void event_loop(int server_sock)
{
    list<boost::shared_ptr<session> > session_list;
    int rc, maxfd, client_sock;
    fd_set rdset;
    struct sockaddr_in client_addr;
    size_t addr_size = sizeof(struct sockaddr_in);

    while (true) {
        FD_ZERO(&rdset);
        FD_SET(server_sock, &rdset);
        maxfd = server_sock;
        list<boost::shared_ptr<session> >::iterator it = session_list.begin();
        while (it != session_list.end()) {
            if ((*it)->socket() == -1) {
                session_list.erase(it++);
            } else {
                FD_SET((*it)->socket(), &rdset);
                if (maxfd < (*it)->socket()) {
                    maxfd = (*it)->socket();
                }
                ++it;
            }
        }
        /* max fd value plus 1 */
        rc = select(maxfd+1, &rdset, 0, 0, NULL);
        if (rc == -1) {
            continue;
        } else {
            if (FD_ISSET(server_sock, &rdset)) {
                client_sock = (int)accept(server_sock, (struct sockaddr *)&client_addr, (socklen_t *)&addr_size);
                printf("socket[%d] accepted: %s:%d..\n", client_sock, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));
                boost::shared_ptr<session> new_session(new session(client_sock));
                new_session->go(); /* go first */
                session_list.push_back(new_session);
            }
            for (list<boost::shared_ptr<session> >::iterator it = session_list.begin(); it != session_list.end(); ++it) {
                if (FD_ISSET((*it)->socket(), &rdset)) {
                    (*it)->go();
                }
            }
        }
    }
}

int main() 
{
    int rc, server_sock;
    struct sockaddr_in server_addr;

    server_sock = (int)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(2222);
    rc = bind(server_sock, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in));
    if (rc < 0) {
        fprintf(stderr, "bind: %s.\n", strerror(errno));
        return -1;
    }
    listen(server_sock, 5);
    /* loop */
    event_loop(server_sock);
    sock_close(server_sock);
    return 0;
}
Categories: C/C++ Tags: , ,

Coroutines in Python

January 11th, 2016 No comments

Python 3.5 added native support for coroutines. Actually, there were several steps towards the current implementation. See Wikipedia, and it seems a bit messy to me:

  • Python 2.5 implements better support for coroutine-like functionality, based on extended generators (PEP 342).
  • Python 3.3 improves this ability, by supporting delegating to a subgenerator (PEP 380).
  • Python 3.4 introduces a comprehensive asynchronous I/O framework as standardized in PEP 3156, which includes coroutines that leverage subgenerator delegation.
  • Python 3.5 introduces explicit support for coroutines with async/await syntax (PEP 0492).

Before Python 2.5, there were only generators.

In Python 2.5, yield was refined to be an expression rather than a statement, which gave the possibility to implement a simple coroutine. But still a lot of work left for programmers to use it. For instance, a simple conroutine scheduler was required.

In Python 3.3, yield from was added to support subgenerators. Nothing to do with coroutines.

In Python 3.4, the Father of Python (Guido van Rossum) wrote a PEP himself to add an asyncio module to simplify coroutine usage in Python. An official scheduler was added. We can use @asyncio.coroutine to decorate a function. We can use yield from expressions to yield to a specific coroutine.

In Python 3.5, async/await syntax was added, borrowed from C#. The newest PEP made coroutines a native Python language feature, and clearly separated them from generators. A native coroutine now declares with async def syntax, and yield from is replaced with await expression. This removes generator/coroutine ambiguity. So in Python 3.5, coroutines used with asyncio may be implemented using the async def statement, or by using generators. Generator-based coroutines should be decorated with @asyncio.coroutine, although this is not strictly enforced. The decorator enables compatibility with async def coroutines, and also serves as documentation. See Python documents here.

The implementation can be found in this commit.

I wrote a echo server/client sample to try corutines. Server code first:

#!/usr/bin/python3
import asyncio

@asyncio.coroutine
def start_server():
    yield from asyncio.start_server(client_connected_handler, '127.0.0.1', 2222)

@asyncio.coroutine
def client_connected_handler(client_reader, client_writer):
    peer = client_writer.get_extra_info('peername')
    print('Connected..%s:%s' % (peer[0], peer[1]))
    while True:
        data = yield from client_reader.read(1024)
        if not data:
            print('Disconnected..%s:%s\n' % (peer[0], peer[1]))
            break
        print(data.decode(), end='')
        client_writer.write(data)

loop = asyncio.get_event_loop()
server = loop.run_until_complete(start_server())
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Client code here, or you can simply use telnet command:

#!/usr/bin/python3
import asyncio

@asyncio.coroutine
def tcp_echo_client():
    reader, writer = yield from asyncio.open_connection('127.0.0.1', 2222)
    writer.write(b'first line\n')
    writer.write(b'second line\n')
    writer.write(b'third line\n')
    writer.write(b'EOF\n')
    print("Lines received..")
    while True:
        line = yield from reader.readline()
        if not line:
            break
        line = line.decode()
        print(line, end='')
        if line == 'EOF\n':
            break
    writer.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client())
loop.close()

Server output:

Connected..127.0.0.1:27643
first line
second line
third line
EOF
Disconnected..127.0.0.1:27643

Client output:

Lines received..
first line
second line
third line
EOF

With Python 3.5 on Ubuntu 16.04, we can also use async/await:

#!/usr/bin/python3
import asyncio

async def start_server():
    await asyncio.start_server(client_connected_handler, '127.0.0.1', 2222)

async def client_connected_handler(client_reader, client_writer):
    peer = client_writer.get_extra_info('peername')
    print('Connected..%s:%s' % (peer[0], peer[1]))
    while True:
        data = await client_reader.read(1024)
        if not data:
            print('Disconnected..%s:%s\n' % (peer[0], peer[1]))
            break
        print(data.decode(), end='')
        client_writer.write(data)

loop = asyncio.get_event_loop()
server = loop.run_until_complete(start_server())
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
Categories: Python Tags: ,