利用setcontext函数实现协程的实验

CGoErlangLinux协程性能 by 达达 at 2012-01-07

元旦期间我实验了最简单的setcontext族函数的使用,元旦过后决定利用setcontext族函数加pthread实现一个只针对Linux的协程库,昨天花了一天时间终于在深夜调通了程序。

这个协程库我打算做成像erlang那样的actor模式,libactor这名字已经有人用了,那是一个基于pthread封装的actor库,所以我做的这个库名叫做libact。

libact的基本原理就是程序启动时创建数个pthead线程作为调度器,协程列表则用一个头尾相连的环形链表表示,协程具有不同的状态,分别是:runable,running,waiting,exit。

调度器线程每次调度一个协程,调度算法就是从上一次被调度的协程开始找到最近的一个runable状态的协程,然后利用swapcontext函数把上下文切换到协程的最后状态。

我设想协程会在几种不同的情况下会让出调度器,分别是:

  1. 协程自然退出
  2. 协程调用act_yield函数,让出调度器
  3. 协程调用act_sleep函数,进入睡眠
  4. 协程调用了act_receive函数,但消息队列是空的
  5. 协程需要等待IO操作完成

作为一个实验原型,act_yield是最基础也最容易实现的,3 ~ 5的情况都需要做比较多的额外封装,所以我只先实现了act_yield函数。

这个实验性的库只有两个文件,下面是完整的代码:

act.h

#ifndef _ACT_H_
#define _ACT_H_

#include <stdint.h>

#define act_pid int

typedef enum {
    ACT_RUNABLE,
    ACT_RUNNING,
    ACT_WAITING,
    ACT_EXIT
} act_status;

typedef void (*act_func)(void *);

act_pid act_spawn(act_func func, void *data);

act_pid act_spawn2(act_func func, void *data, int stack_size);

void act_exit();

act_pid act_self();

void act_yield();

void act_loop(int num_schedulers);

#endif

act.c

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h> 
#include <pthread.h>
#include <ucontext.h>

#include "act.h"

//==============================================================================
//    Private
//==============================================================================

#define act_access_procs_begin() do{ \
    pthread_mutex_lock(&act_procs_mutex); \
}while(0)

#define act_access_procs_end() do{ \
    pthread_mutex_unlock(&act_procs_mutex); \
}while(0)

static pthread_mutex_t act_procs_mutex = PTHREAD_MUTEX_INITIALIZER;

typedef struct act_proc act_proc_t;

struct act_proc {
    act_pid     pid;
    ucontext_t *ctx;
    act_status  status;

    act_proc_t *prev;
    act_proc_t *next;
};

static act_pid act_max_pid;

static act_proc_t *act_procs;

typedef struct act_sche act_sche_t;

struct act_sche {
  pthread_t   tid;
  ucontext_t *ctx;
  act_proc_t *running_proc;
};

static int act_sche_num;

static act_sche_t **act_sches;

static void *act_scheduler(void *data)
{
    int sche_id = (int)data;

    act_sche_t sche;

    ucontext_t main_ctx;

    act_proc_t *proc = NULL;
    act_proc_t *next_proc = act_procs;

    sche.ctx = &main_ctx;
    sche.tid = pthread_self();
    sche.running_proc = NULL;

    act_sches[sche_id] = &sche;

    //printf("scheduler %d begin\n", sche_id);

    //printf("thread id %ul\n", sche.tid);

    for (;;) {
        for (;;) {
            act_access_procs_begin();

            if (proc == NULL) {
                proc = act_procs;
            } else {
                proc = proc->next;
            }

            if (ACT_RUNABLE == proc->status) {
                proc->status = ACT_RUNNING;
                act_access_procs_end();
                break;
            }

            act_access_procs_end();

            //printf("scheduler %d wait\n", sche_id);

            //usleep(1000 * 10);
        }

        //printf("scheduler %d run proc %d begin\n", sche_id, proc->pid);

        sche.running_proc = proc;

        swapcontext(&main_ctx, proc->ctx);

        sche.running_proc = NULL;

        //printf("scheduler %d run proc %d end\n\n", sche_id, proc->pid);

        if (ACT_RUNNING == proc->status) {
            proc->status = ACT_RUNABLE;
        }
        else if (ACT_EXIT == proc->status) {
            act_access_procs_begin();

            proc->next->prev = proc->prev;
            proc->prev->next = proc->next;

            act_access_procs_end();

            free(proc->ctx->uc_stack.ss_sp);
            free(proc->ctx);
            free(proc);

            proc = NULL;
        }
    }
}

static act_sche_t *act_current_sche()
{
    int i;

    pthread_t tid;

    tid = pthread_self();

    for (i = 0; i < act_sche_num; i ++) {
        act_sche_t *sche = act_sches[i];

        if (sche != NULL && sche->tid == tid) {
            return sche;
        }
    }

    return NULL;
}

//==============================================================================
//    Public
//==============================================================================

int act_default_stack_size = 64 * 1024;

act_pid act_spawn(act_func func, void *data)
{
    return act_spawn2(func, data, act_default_stack_size);
}

act_pid act_spawn2(act_func func, void *data, int stack_size)
{
    act_proc_t *proc = NULL;

    proc = (act_proc_t *)malloc(sizeof(act_proc_t));

    proc->status = ACT_RUNABLE;

    proc->ctx = (ucontext_t *)malloc(sizeof(ucontext_t));
    proc->ctx->uc_stack.ss_sp = (char *)malloc(stack_size);
    proc->ctx->uc_stack.ss_size = stack_size;

    getcontext(proc->ctx);

    makecontext(proc->ctx, (void (*)(void))func, 1, data);

    act_access_procs_begin();

    proc->pid = ++ act_max_pid;

    if (act_procs != NULL) {
        proc->next = act_procs;
        proc->prev = act_procs->prev;

        proc->next->prev = proc;
        proc->prev->next = proc;
    }
    else {
        proc->next = proc;
        proc->prev = proc;

        act_procs = proc;
    }

    act_access_procs_end();
}

void act_exit()
{
    act_sche_t *sche = act_current_sche();

    sche->running_proc->status = ACT_EXIT;
}

act_pid act_self()
{
    act_sche_t *sche = act_current_sche();

    return sche->running_proc->pid;
}

void act_yield()
{
    act_sche_t *sche = act_current_sche();

    //printf("yield by thread %u\n", sche->tid);

    swapcontext(sche->running_proc->ctx, sche->ctx);
}

void act_loop(int num_schedulers)
{
    int i;
    int rc;
    void *status;

    pthread_mutex_init(&act_procs_mutex, NULL);

    act_sche_num = num_schedulers;

    act_sches = (act_sche_t **)malloc(num_schedulers * sizeof(act_sche_t *));

    pthread_attr_t attr;
    pthread_t schedulers[num_schedulers];

    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    for (i = 0; i < num_schedulers; i ++) {
        act_sches[i] = NULL;

        if (rc = pthread_create(&schedulers[i], &attr, act_scheduler, (void *)i)) {
            printf("ERROR: return code from pthread_create() is %d\n", rc);
            exit(-1);
        }
    }

    pthread_attr_destroy(&attr);

    for (i = 0; i < num_schedulers; i ++) {
        if (rc = pthread_join(schedulers[i], &status)) {
            printf("ERROR: return code from pthread_join() is %d\n", rc);
            exit(-1);
        }
    }

    pthread_exit(NULL);
}

下面是用2调度器运行5个协程的一个示例,每个协程分别输出A,B,C,D,E。

main.c

#include <stdio.h>
#include <stdlib.h>

#include "act.h"

void proc(void *data)
{
    for (;;) {
        printf("%s\n", (char *)data);
        act_yield();
    }

    act_exit();
}

void main(void)
{
    act_spawn(proc, "A");
    act_spawn(proc, " B");
    act_spawn(proc, "  C");
    act_spawn(proc, "   D");
    act_spawn(proc, "    E");

    act_loop(3);
}

完整的项目代码我已经提交到GitHub上,有兴趣的朋友可以到上面下载。

需要强调一点:这只是一个实验性的项目,目的是实践C/C++中协程的可行性,产出的代码并没有实用价值,并且未在任何正式项目上使用,所以请大家不要把这里的代码用到实际项目中(setcontext不一定如想象的那么稳定,下面我会提到)。

我最初追求在C/C++下实现协程的目的是像Erlang那样设计服务端程序,虽然初步已经可以实现一个简单的协程库,但是服务端真的非用这种方式不可吗?

在做完这个原型的时候我跟同行的一位前辈进行的讨论,感谢这位前辈帮我理清了思路。

首先,使用协程模型和异步模型的目的都一样,就是在需要等待的时候把CPU让给别的事务,相较于异步,协程的优势在于它将不同事务隔离开,并让程序员可以不用自己管理大量的异步状态,这让程序设计更容易,但这写优势只有在异步方式设计中需要涉及到很多不同事务的不同状态时才会显现出来。

而服务端需要等待的操作其实并不多,无非是:磁盘IO、数据库、网络,而这些环节又是非逻辑性的代码,只需要初期在机制上设计一次就不需要再动到了,完全可以针对这几个主要环节进行针对性的异步设计,也可以用几个线程把事务隔离开,所以协程的优势其实不明显。

其次,因为协程的上下文切换依靠的是保存CPU寄存器,自己实现的协程需要格外小心,否则程序很容易出现很难调试的错误,例如我在网上就了解到setcontext和getcontext函数在i386的CPU架构上是跟pthread不兼容的,因为getcontext保存上下文的寄存器时会忽略存放指向当前线程结构体的寄存器,导致pthread_self()在跨上下文切换后会出现取到别的线程ID的情况。

并且自己实现的协程不是语言原生支持的,不能像Erlang那样公平调度,也不能像Go语言那样自动扩展栈空间,所以是有一定局限性的。

再加上上面说的协程需要上下文切换的几种情况都需要重新封装一组函数,我觉得目前自己花大量时间来做这些,然后再花大量时间排错和调试,不如直接用Go语言好了。

综合以上观点,的确如同行的前辈所说的,还是按传统套路做最好。

所以接下来一段时间,我会把关注点移到zmq和libevent等这些传统的开源库上,至于协程在C/C++中的实践,暂时就到这里,以后是否自己做个脚本语言自己实现更高效更保险的协程呢?那应该是要很久以后的事了。

有兴趣继续研究协程的朋友,可以了解下GUN Pth项目,我今天上去下了一份代码,发现它封装得很完善,这也间接证明了要做一个真正可用的协程库,不是一个人十天半个月就可以搞定的事情。