swoole_table 实现原理剖析

2017年8月29日 355点热度 0人点赞 0条评论

Swoole项目从 2012 年推出到现在已经有 5 年的历史,现在越来越多的互联网企业使用 Swoole来开发各类后台应用。受限于 PHP 的 ZendVM实现,PHP 程序无法使用多线程进行编程开发。应用程序中实现并行处理只能使用多进程模式。

做过多进程开发的 PHPer 都知道进程的内存隔离性。在程序中声明的 global全局数组,实际上并不是数据共享的,在一个进程内修改数组的值,在另外一个进程中是无效的。

  1. $array = array();

  2. function process1() {

  3.    global $array;

  4.    $array['test'] = 'hello world';

  5. }

  6. function process2() {

  7.    global $array;

  8.    //这里读取不到test的值

  9.    var_dump($array['test']);

  10. }

这个进程隔离性给程序的开发带来的很多烦恼。比如实现一个聊天室程序,用户 A在进程1中处理,用户 B在进程2中处理, AB如果在同一个 group,这个 group在多线程环境中直接用 set表示, AB加到对应 groupset中即可。但多进程环境中,用 PHP 的 array无法实现。一般可以有2个思路解决问题:

  • 进程间通信,可以使用管道,向另外一个进程发送请求,获取数据的值

  • 借助存储实现,如 Redis、 MySQL、 文件

这2个方案虽然可以实现,但都存在明显的缺点。方案一实现较为复杂,开发困难。方案二实现简单,但存在额外的 IO消耗,不是 纯内存操作,有性能瓶颈。基于 /dev/shm实现内存文件读写的方案,是一个不错的方案,但需要注意锁的操作,读写时需要额外的系统调用开销。

想要解决这个问题,必须实现一个基于共享内存的数据结构。在 PHP 中也有一些扩展模块可以使用。如 APCuYacshm_put_var/shm_get_var

  • Yac:性能高,但由于底层实现的限制,无法保证一致性。只能作为 Cache来使用

  • APCu:支持 Key-Value式数据的读写,缺点是实现简单粗暴,锁的粒度太粗。高并发时存在大量锁的争抢,性能较差

  • shm系列函数:这个方案虽然能实现共享内存操作,但实际上底层实现非常简陋。一方面底层根本没有加锁,如果你要在并发环境中使用,需要自行实现锁的操作。另外,底层实际上是一个链表结构,数据较多时,查询性能非常差

swoole_table 介绍

为了解决多进程程序中数据共享的难题, Swoole扩展提供了 swoole_table数据结构。 Table的实现非常精巧,使用最方便,同时性能也是最好的。

  1. $table = new swoole_table(1024);

  2. $table->column('id', swoole_table::TYPE_INT, 4);

  3. $table->column('name', swoole_table::TYPE_STRING, 64);

  4. $table->column('num', swoole_table::TYPE_FLOAT);

  5. $table->create();

  6. $table->set('[email protected]', array('id' => 145, 'name' => 'rango', 'num' => 3.1415));

  7. $table->set('[email protected]', array('id' => 358, 'name' => "Rango1234", 'num' => 3.1415));

  8. $table->set('[email protected]', array('id' => 189, 'name' => 'rango3', 'num' => 3.1415));

  9. $ret1 = $table->get('[email protected]');

  10. $ret2 = $table->get('[email protected]');

  11. $table->del('[email protected]');

Table实现了一个二维 Map结构,有点像 PHP 的二维数组,简单易用。在最新的 1.9.19中还可以使用 ArrayAccess接口以 array的方式操作 Table

  1. $table = new swoole_table(1024);

  2. $table->column('id', swoole_table::TYPE_INT);

  3. $table->column('name', swoole_table::TYPE_STRING, 64);

  4. $table->column('num', swoole_table::TYPE_FLOAT);

  5. $table->create();

  6. $table['apple'] = array('id' => 145, 'name' => 'iPhone', 'num' => 3.1415);

  7. $table['google'] = array('id' => 358, 'name' => "AlphaGo", 'num' => 3.1415);

  8. $table['microsoft']['name'] = "Windows";

  9. $table['microsoft']['num'] = '1997.03';

  10. var_dump($table['apple']);

  11. var_dump($table['microsoft']);

  12. $table['google']['num'] = 500.90;

  13. var_dump($table['google']);

Table 的优势

  • 性能极高,全部是纯内存操作,没有任何系统调用和IO的开销。在 酷睿I5机器上测试, Table单进程单线程每秒可完成写操作 300万次,读操作每秒可完成 150万次。在 24核服务器上,理论上每秒可实现数千万次读写操作。

  • 使用数据行锁,底层使用了数据行锁自旋锁。多进程并发执行时,读写不同的 key不存在锁的争抢问题。只有同一 CPU时间读写同一个 Key才需要进行加锁操作。而且 Table本身锁的粒度非常小, get、 set操作内部只有少量内存读写的指令,可以在数百纳秒内完成操作。

Table 的局限性

  • Key最大长度不得超过 64字节

  • 必须在创建前规划好容量,一旦写满后,再 set新的数据会出现内存分配导致失败,无法实现动态扩容

因此使用 Table时尽可能地设置较大的内存尺寸,这样虽然会带来一定的内存浪费,但实际上现代服务器内存非常廉价,这个局限性在实际项目中的问题并不大。

swoole_table 实现原理

Table底层基于共享内存实现,所占内存取决于表格的尺寸 size、冲突率(默认 20%)、 column的设置(如上面的示例中每行需要 8+64+8字节)、 64字节 KEY的存储空间、管理结构的内存消耗。

Table 的内存申请

  1. size_t row_num = table->size * (1 + table->conflict_proportion);

  2. size_t row_memory_size = sizeof(swTableRow) + table->item_size;

  3. size_t memory_size = row_num * row_memory_size;

  4. memory_size += sizeof(swMemoryPool) + sizeof(swFixedPool) + ((row_num - table->size) * sizeof(swFixedPool_slice));

  5. memory_size += table->size * sizeof(swTableRow *);

  6. void *memory = sw_shm_malloc(memory_size);

swoole_table本身是一个 HashTable结构, Key会计算为 hash值,来散列到每一行。 HashTable结构会遇到 Hash冲突问题,两个完全不同的 Key可能计算的 hash值是同一个,这时需要使用链表来解决 Hash冲突Swoole底层会创建一个浮动的内存池 swFixedPool结构来管理这些冲突 Key的内存。默认会创建 size*20%数量的浮动内存池。在 1.9.19中可以自行定义冲突率。

  1. $table = new swoole_table(65536, 0.9);

假如你的场景中 Hash冲突较多,可以调高冲突率,以申请一块较大的浮动内存池。

  1. static swTableRow* swTable_hash(swTable *table, char *key, int keylen)

  2. {

  3. #ifdef SW_TABLE_USE_PHP_HASH

  4.    uint64_t hashv = swoole_hash_php(key, keylen);

  5. #else

  6.    uint64_t hashv = swoole_hash_austin(key, keylen);

  7. #endif

  8.    uint64_t index = hashv & table->mask;

  9.    assert(index < table->size);

  10.    return table->rows[index];

  11. }

  12. swTableRow* swTableRow_set(swTable *table, char *key, int keylen, swTableRow **rowlock)

  13. {

  14.    if (keylen > SW_TABLE_KEY_SIZE)

  15.    {

  16.        keylen = SW_TABLE_KEY_SIZE;

  17.    }

  18.    swTableRow *row = swTable_hash(table, key, keylen);

  19.    *rowlock = row;

  20.    swTableRow_lock(row);

  21. #ifdef SW_TABLE_DEBUG

  22.    int _conflict_level = 0;

  23. #endif

  24.    if (row->active)

  25.    {

  26.        for (;;)

  27.        {

  28.            if (strncmp(row->key, key, keylen) == 0)

  29.            {

  30.                break;

  31.            }

  32.            else if (row->next == NULL)

  33.            {

  34.                table->lock.lock(&table->lock);

  35.                swTableRow *new_row = table->pool->alloc(table->pool, 0);

  36. #ifdef SW_TABLE_DEBUG

  37.                conflict_count ++;

  38.                if (_conflict_level > conflict_max_level)

  39.                {

  40.                    conflict_max_level = _conflict_level;

  41.                }

  42. #endif

  43.                table->lock.unlock(&table->lock);

  44.                if (!new_row)

  45.                {

  46.                    return NULL;

  47.                }

  48.                //add row_num

  49.                bzero(new_row, sizeof(swTableRow));

  50.                sw_atomic_fetch_add(&(table->row_num), 1);

  51.                row->next = new_row;

  52.                row = new_row;

  53.                break;

  54.            }

  55.            else

  56.            {

  57.                row = row->next;

  58. #ifdef SW_TABLE_DEBUG

  59.                _conflict_level++;

  60. #endif

  61.            }

  62.        }

  63.    }

  64.    else

  65.    {

  66. #ifdef SW_TABLE_DEBUG

  67.        insert_count ++;

  68. #endif

  69.        sw_atomic_fetch_add(&(table->row_num), 1);

  70.    }

  71.    memcpy(row->key, key, keylen);

  72.    row->active = 1;

  73.    return row;

  74. }

  • 使用 swTable_hash计算 hash值,散列到对应的行

  • Key发生冲突时,需要调用 table->pool->alloc从浮动内存池中分配内存

  • 浮动内存池内存不足时, alloc失败,这时无法写入数据到 Table

数据自旋锁

当同一 CPU时间,多个进程同时读取某一行时,需要锁的争抢。

  1. swTableRow_lock(row);

  2. //内存操作

  3. swTableRow_unlock(_rowlock);

swTableRow_lock 本身是一个自选锁,这里使用了 gcc编译器提供的 __sync_bool_compare_and_swap函数进行 CPU原子操作。多个进程同时读写某一行数据时,先得到锁的进程会执行内存读写操作,未得到锁的进程会进行 CPU自旋等待进程释放锁。

  1. static sw_inline void sw_spinlock(sw_atomic_t *lock)

  2. {

  3.    uint32_t i, n;

  4.    while (1)

  5.    {

  6.        if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))

  7.        {

  8.            return;

  9.        }

  10.        if (SW_CPU_NUM > 1)

  11.        {

  12.            for (n = 1; n < SW_SPINLOCK_LOOP_N; n <<= 1)

  13.            {

  14.                for (i = 0; i < n; i++)

  15.                {

  16.                    sw_atomic_cpu_pause();

  17.                }

  18.                if (*lock == 0 && sw_atomic_cmp_set(lock, 0, 1))

  19.                {

  20.                    return;

  21.                }

  22.            }

  23.        }

  24.        swYield();

  25.    }

  26. }

返回结果

使用 table::get方法时,从 Table共享内存中,读取数据写入到 PHP本地内存数组中。底层会根据列信息 table->columns,计算内存指针的偏移量,得到对应字段的值。


相关文章推荐

PHP 完整实战 23种设计模式

那些 PHP 开发者可能用得上的工具

2017 年 PHP 程序员未来路在何方

  1. static inline void php_swoole_table_row2array(swTable *table, swTableRow *row, zval *return_value)

  2. {

  3.    array_init(return_value);

  4.    swTableColumn *col = NULL;

  5.    swTable_string_length_t vlen = 0;

  6.    double dval = 0;

  7.    int64_t lval = 0;

  8.    char *k;

  9.    while(1)

  10.    {

  11.        col = swHashMap_each(table->columns, &k);

  12.        if (col == NULL)

  13.        {

  14.            break;

  15.        }

  16.        if (col->type == SW_TABLE_STRING)

  17.        {

  18.            memcpy(&vlen, row->data + col->index, sizeof(swTable_string_length_t));

  19.            sw_add_assoc_stringl_ex(return_value, col->name->str, col->name->length + 1, row->data + col->index + sizeof(swTable_string_length_t), vlen, 1);

  20.        }

  21.        else if (col->type == SW_TABLE_FLOAT)

  22.        {

  23.            memcpy(&dval, row->data + col->index, sizeof(dval));

  24.            sw_add_assoc_double_ex(return_value, col->name->str, col->name->length + 1, dval);

  25.        }

  26.        else

  27.        {

  28.            switch (col->type)

  29.            {

  30.            case SW_TABLE_INT8:

  31.                memcpy(&lval, row->data + col->index, 1);

  32.                sw_add_assoc_long_ex(return_value, col->name->str, col->name->length + 1, (int8_t) lval);

  33.                break;

  34.            case SW_TABLE_INT16:

  35.                memcpy(&lval, row->data + col->index, 2);

  36.                sw_add_assoc_long_ex(return_value, col->name->str, col->name->length + 1, (int16_t) lval);

  37.                break;

  38.            case SW_TABLE_INT32:

  39.                memcpy(&lval, row->data + col->index, 4);

  40.                sw_add_assoc_long_ex(return_value, col->name->str, col->name->length + 1, (int32_t) lval);

  41.                break;

  42.            default:

  43.                memcpy(&lval, row->data + col->index, 8);

  44.                sw_add_assoc_long_ex(return_value, col->name->str, col->name->length + 1, lval);

  45.                break;

  46.            }

  47.        }

  48.    }

  49. }

图片

50980swoole_table 实现原理剖析

这个人很懒,什么都没留下

文章评论