Erlang和PHP间的Socket通讯-01

分享ErlangPHPSocketTCP by 达达 at 2010-07-02

前段时间,在群里和发哥聊起memcached和APC,渐渐的聊到了/dev/shm,发哥说他用/dev/shm做缓存很好用。这次讨论触发了我对memcached、APC和dev/shm数据读写性能的测试。

测试中我想到了Erlang内置的ets和传说中的并发性能,如果用Erlang + ets做一个类似memcached这样的key-value的缓存服务器,性能会比memcached好吗?于是我动手做了试验,用Erlang编写了一个支持并发连接的Socket服务器,写了一个PHP的客户端。

测试的结果我先按下不表,放到文章结尾再附带说明,以免冲淡了本篇文章的主题。

这次试验最值得分享的经验是Erlang和PHP间的Socket通讯方式。

下面的Erlang代码和PHP代码都没有实际涉及到缓存功能,愿因文章最后会有说明,这里只关注通讯机制。

Erlang端的服务器代码,有一个很好听的名字叫mycached,启动服务的方式是:

mycached:start(10086, 10).

mycached:start的第一个参数是端口号,第二个参数是工作进程数。服务器启动后,会有n个工作进程同时等待accpet,我不知道这里会不会有“惊群”问题,我假定Erlang内部机制会妥善处理多个进程同时等待一个套接字的情况,之所以这么假定,是因为这段服务器代码的基础来自于Erlang官方文档,上面的示例代码就是这样一个模式。

当其中一个进程接收到来自客户端的连接后,就会陷入loop函数,处理客户端请求,直到客户端断开连接。

事实上,也可以用spawn来启动一个新进程执行loop函数,响应客户端请求,而工作进程继续回到accept的状态。只变了一行代码,服务器的工作模式立马就发生改变,Erlang真的很神奇。

我还编写了两个用于测试mycached自身的函数。一个叫test,用于执行单次请求并输出返回结果,主要起功能测试的作用。一个叫banch,用于执行批量请求,并输出执行时间,主要起压力测试的作用。

test函数的调用示例:

mycached:test("localhost", 10086, 1, "Hello").

test函数的第一个参数是服务器地址,第二个参数是服务器端口号,第三个参数是请求类型,第四个参数是请求参数。

banch函数的调用示例:

mycached:banch("localhost", 10086, 1, "Hello", 10, 1000).

banch函数的前四个参数都和test函数一致,增加的两个参数,一个是连接次数,一个是请求次数。上面的示例代码将会执行10次连接,每次连接会分别发起1000次的请求,总共1w次请求。

以下是Erlang端的完整代码:

-module(mycached).
-export([start/2, server/1, loop/1, test/4, for/3, banch/6, banch_call/6]).

-define(CMD_GET, 1).
-define(CMD_SET, 2).
-define(CMD_DEL, 3).


start(LPort, Num) ->
    case gen_tcp:listen(LPort, [binary, {active, false}, {packet, 2}]) of

        {ok, LSock} ->
            start_servers(LSock, Num),

            {ok, Port} = inet:port(LSock),

            Port;

        {error, Reason} ->
            {error, Reason}
    end.


start_servers(_, 0) ->
    ok;

start_servers(LSock, Num) ->
    spawn(?MODULE, server, [LSock]),

    start_servers(LSock, Num - 1).


server(LSock) ->
    case gen_tcp:accept(LSock) of

        {ok, CSock} ->
            loop(CSock),
            server(LSock);

        Other ->
            io:format("accept returned ~w - goodbye!~n", [Other]),
            ok
    end.


loop(CSock) ->
    inet:setopts(CSock, [{active, once}]),

    receive

        {tcp, CSock, Request} ->
            Response = process(Request),

            Response_Bin = list_to_binary(Response),

            gen_tcp:send(CSock, Response_Bin),

            loop(CSock);

        {tcp_closed, CSock} ->
            io:format("socket ~w closed [~w]~n", [CSock, self()]),
            ok
    end.


process(Request) ->
    try
        {<<Type>>, Params} = split_binary(Request, 1),

        case Type of
            ?CMD_GET ->
                "Command: GET";
            ?CMD_SET ->
                "Command: SET";
            ?CMD_DEL ->
                "Command: DEL";
            _ ->
                "Unknow Command"
        end
    catch
        _:E -> io:format("process failed: ~w [~w]~n", [E, self()]),
        "Server Error"
    end.


test(Host, Port, Command, Params) ->
    test_call(Host, Port, Command, Params, 1).


banch(Host, Port, Command, Params, Times, RTimes) ->
    {M, _} = timer:tc(?MODULE, banch_call, [Host, Port, Command, Params, Times, RTimes]),

    io:format("Time: ~p micro seconds~n", [M]),

    ok.


banch_call(Host, Port, Command, Params, Times, RTimes) ->
    for (0, Times,
        fun() ->
            test_call(Host, Port, Command, Params, RTimes)
        end
    ),
    ok.


test_call(Host, Port, Command, Params, Times) ->
    {ok, Sock} = gen_tcp:connect(Host, Port, [binary, {active, false}, {packet, 2}]),

    Request = [Command, Params],

    Request_Bin = list_to_binary(Request),

    case Times of
        1 ->
            {ok, Bin} = test_send(Sock, Request_Bin),

            ok = gen_tcp:close(Sock),

            Bin;

        _ ->
            for (0, Times,
                fun() ->
                    {ok, _} = test_send(Sock, Request_Bin)
                end
            ),

            ok = gen_tcp:close(Sock),

            ok
    end.


test_send(Sock, Request_Bin) ->
    ok = gen_tcp:send(Sock, Request_Bin),

    gen_tcp:recv(Sock, 0).


for (To, To, _) ->
    ok;

for (From, To, Callback) ->
    Callback(),
    for (From + 1, To, Callback).

PHP端最大的难点在于请求的封包和解包,事实上这部分都集中在php内置的pack和unpack函数的使用上。

mycached的通讯是基于Erlang的{packet, 2}模式的,Erlang会自动将数据包的前两个字节当作请求的长度,在Erlang端就不需要自己进行复杂的封包和解包工作了,只需要把精力都放在业务数据的解析上。而PHP端就没这么幸运了,你必须自己将在请求的头部加上两个字节的包大小信息,而接受到服务器响应时,必须先从头部读取两个字节的包大小信息,再解包。

完整的PHP客户端代码:

<?php

class mycached
{
    private $host;
    private $port;
    private $sock;

    function __construct ($host, $port)
    {
        $this->host = $host;
        $this->port = $port;

        $this->sock = @socket_create(AF_INET, SOCK_STREAM, getprotobyname('tcp'));

        if ($this->sock)
        {
            socket_connect($this->sock, $this->host, $this->port);
        }
    }

    public function set ($key, $value)
    {
    }

    public function get ($key)
    {
        $msg = $this->pack_data(1, $key);

        $sent = @socket_write($this->sock, $msg, strlen($msg));

        if ($sent === FALSE)
        {
            return null;
        }

        $buff = $this->socket_read_len($this->sock, 2, PHP_BINARY_READ);

        $head = unpack("H*", $buff);

        $len = hexdec($head[1]);

        $res = $this->socket_read_len($this->sock, $len, PHP_BINARY_READ);

        return $res;
    }

    public function remove ($key)
    {
    }

    public function remove_by_search ($key)
    {
    }

    private function pack_data ($type, $data)
    {
        $cmd = pack("C*", $type);

        $cmd_len = strlen($cmd);

        $body = pack("A*", $data);

        $body_len = strlen($body);

        $len = $cmd_len + $body_len;

        $head = pack("H*", $this->to_hex_str($len));

        return $head.$cmd.$body;
    }

    private function to_hex_str ($num)
    {
        $str = dechex($num);

        $str = str_repeat('0', 4 - strlen($str)).$str;

        return $str;
    }

    private function socket_read_len ($socket, $len, $type)
    {
        $offset = 0;
        $socketData = '';

        while ($offset < $len)
        {
            if (($data = @socket_read ($socket, $len - $offset, $type)) === false)
            {
                return false;
            }

            $dataLen = strlen ($data);

            $offset += $dataLen;

            $socketData .= $data;

            if ($dataLen == 0) { break; }
        }

        return $socketData;
    }
}

?>

以下是PHP端的测试代码:

<?php

$cache = new mycached('localhost', 10086);

$stime = microtime(true);

for ($i = 0; $i < 10000; $i ++)
{
    $res = $cache->get("hello");
}

$etime = microtime(true);

echo "Time: " . ($etime - $stime) . "n";

?>

好了,通讯相关代码都介绍完毕了,这里可以说说测试结果了。

测试结果是APC读写最快比memcached快差不多5倍左右,但是APC不能在进程间共享数据,不是我要找的东西。memcached和PHP读写/dev/shm差不多,memcached略胜一点点,但/dev/shm有个很大的好处就是可以很容易实现缓存的层级管理。

在我刚能让Erlang和PHP建立通讯后,我决定先测试一下通讯性能,如果还没有加入缓存操作逻辑的单纯通讯,性能都无法让人接受的话,那也就没必要再完整实现整个缓存服务器了。

试验结果真的很出人意料,通讯性能比之前想象的相差得太远,1万次请求响应时间在2s ~ 1s波动,而memcached的1万次写才0.3s,读还更快一点。这就是为什么上面分享的代码实际上没有涉及任何缓存操作的原因。

也许是我之前对Erlang性能有过高的估计,也许是我的代码优化得不够好。但不管怎么样,Erlang的开发效率是值得肯定的,实验过程中整个Socket服务器一直流畅的演化着。从无到有,从有到支持并发,从支持并发到支持测试,从支持测试到支持压力测试。

我想大家也许会有疑问,是不是PHP端效率太低,让我得出了错误的结论?这一点,大家可以放心,我在PHP和Erlang两端都提供了测试代码,大家可以自己都测试一遍。事实上,PHP端的性能损失反而是出乎我意料的小,呵呵。