Redis源代码分析

22221cjp

贡献于2011-11-26

字数:0 关键词: NoSQL数据库

Redis源代码分析 文档审核人 文档拟制人 文档提交时间 胡戊 (huwu) 邹雨晗 (yuhanzou) Jun 17, 2011 Redis源代码分析 · Arch_Platform 1 / 32 文档修改记录 文档更新时间 变更内容 变更提出部门 变更理由 Jun 17, 2011 初稿 Redis源代码分析 · Arch_Platform 2 / 32 目 录 1. Redis介绍 ! 4 2. 基本功能 ! 4 2.1. 链表 (adlist.h/adlist.c)! 4 2.2. 字符串 (sds.h/sds.c)! 5 2.3. 哈希表 (dict.h/dict.c)! 6 2.4. 内存 (zmalloc.h/zmalloc.h)! 9 3. 服务器模型 ! 11 3.1. 事件处理 (ae.h/ae.c)! 11 3.2. 套接字操作 (anet.h/anet.c)! 15 3.3. 客户端连接 (networking.h/networking.c, redis.c/redis.h)! 15 3.4. 命令处理 ! 17 4. 虚拟内存 ! 19 4.1. 数据读取过程 ! 20 4.2. 数据交换策略 ! 23 5. 备份机制 ! 24 5.1. Snapshot! 24 5.2. AOF! 26 6. 主从同步 ! 27 6.1. 建立连接 ! 27 6.2. 指令同步 ! 30 6.3. 主从转换 ! 31 Redis源代码分析 · Arch_Platform 3 / 32 1. Redis介绍 Redis(http://redis.io)是一个开源的 Key-Value存储引擎。它支持 string、 hash、 list、 set和 sorted set等多种值类型。 2. 基本功能 2.1. 链表 (adlist.h/adlist.c) 链表 (list)是 Redis中最基本的数据结构,由 adlist.h和 adlist.c定义。 基本数据结构 listNode是最基本的结构,标示链表中的一个结点。结点有向前 (next)和向后 (prev)连个指针,链表是双向链表。保存的值是 void*类型 。 typedef struct listNode { struct listNode *prev; struct listNode *next; void *value; } listNode; 链表通过 list定义,提供头 (head)、尾 (tail)两个指针,分别指向头部的结点和尾 部的结点;提供三个函数指针,供用户传入自定义函数,用于复制 (dup)、释放 (free)和 匹配 (match)链表中的结点的值 (value);通过无符号整数 len,标示链表的长度。 typedef struct list { listNode *head; listNode *tail; void *(*dup)(void *ptr); void (*free)(void *ptr); int (*match)(void *ptr, void *key); unsigned int len; } list; listIter是访问链表的迭代器,指针 (next)指向链表的某个结点, direction标 示迭代访问的方向 (宏 AL_START_HEAD表示向前, AL_START_TAIL表示向后)。 typedef struct listIter { listNode *next; int direction; } listIter; 使用方法 Redis定义了一系列的宏,用于访问 list及其内部结点。 链表创建时 (listCreate),通过 Redis自己实现的 zmalloc()分配堆空间。链表释 放 (listRelease)或删除结点 (listDelNode)时,如果定义了链表 (list)的指针函数 free, Redis会使用它释放链表的每一个结点的值 (value),否则需要用户手动释放。结 点的内存使用 Redis自己实现的 zfree()释放。 对于迭代器,通过方法 listGetIterator()、 listNext()、 listReleaseIterator()、 listRewind()和 listRewindTail()使用,例如对于链 表 list,要从头向尾遍历,可通过如下代码: iter = listGetIterator(list, AL_START_HEAD); // 获取迭代器 Redis源代码分析 · Arch_Platform 4 / 32 while((node = listNext(iter)) != NULL) { DoItWithValue(listNodeValue(node)); // 用户实现 DoItWithValue } listReleaseIterator(iter); listDup()用于复制链表,如果用户实现了 dup函数,则会使用它复制链表结点的 value。 listSearchKey()通过循环的方式在 O(N)的时间复杂度下查找值,若用户实现 了 match函数,则用它进行匹配,否则使用按引用匹配。 2.2. 字符串 (sds.h/sds.c) 字符串是 Redis中最基本的数据。 Redis使用 key作为存取 value的唯一标示符,而 key的通俗理解就是字符串。 Redis中的字符串分为两类:二进制安全 (Binary Safe)和 非二进制安全。二进制安全的字符串,是指字符串中所有字符均可用 256个字符 (8bit)编 码 [2]。 Redis中的 value都是通过二进制安全的字符串存储的,而 key使用的是非二进制 安全的。 基本数据结构 Redis内部实现了字符串类型,由 sds.h和 sds.c定义。 sds本质是 char*: typedef char *sds; 通过 sdsnewlen()创建时, Redis内部会创建 sdshdr结构,这个结构的定义如下, 其中 len表示字符串的实际长度, free表示可以额外使用的字节数。 sdshdr分配的内存空 间,为 sizeof(int)+sizeof(int)+sizeof(char*)+len+free。 struct sdshdr { int len; int free; char buf[]; }; 在 sdsnewlen(const void *init, size_t initlen)中,会分配 sizeof (struct sdshdr)+initlen+1的空间,并将 *init指向的字符串拷贝到 buf中,在 buf 末尾补上 ’\0’。但是 sdsnewlen()中返回给外部的,只有 sdshdr->buf。 假如用户调用了 sdsnewlen(“qqmail”, 6),则 64位系统中, Redis会分配 24+ 6+1字节的空间,在内存中的结构如下: 6 0 qqmail 返回给用户的 sdshdr->buf,指向内存中 ”qqmail”的起点,想通过 buf获得指向 sdshdr的指针是很容易的,例如方法 sdslen()中: size_t sdslen(const sds s) { struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr))); return sh->len; } 通过 (s-(sizeof(struct sdshdr))即可得到 sdshdr的地址。 使用方法 了解了 sdshdr的结构之后,理解相关的方法就很容易了。而 Redis这样仅暴露 buf的 做法,使得应用程序可以将 sds简单地当成 char*使用。 Redis源代码分析 · Arch_Platform 5 / 32 创建 sds的方法有: sds sdsnewlen(const void *init, size_t initlen); sds sdsnew(const char *init); sds sdsempty(); 三个方法均用 zmalloc()分配内存空间, sdsempty()创建空字符串。 void sdsfree(sds s)用于释放字符串。 size_t sdslen(const sds s)会返回字符串的总 长度。而 size_t sdsavail(sds s)返回字符串中可用的字节数。以下几个函数会涉及 sds内存空间的分配: sds sdsgrowzero(sds s, size_t len); sds sdscatlen(sds s, void *t, size_t len); sds sdscat(sds s, char *t); sds sdscpylen(sds s, char *t, size_t len); sds sdscpy(sds s, char *t); 其中 sdsgrowzero()会增加 sds的内存空间,并用 ’\0’填充,用 free记录空余空 间。 sdscat()将字符串 t连接在 s的尾部, sdscpy()将字符串 t复制到 s中。若 s中空间不 够时,会调用 sdsgrowzero()分配更多内存。 此外, sds还提供了一些字符串相关的操作函数,在此不详细分析。 2.3. 哈希表 (dict.h/dict.c) Redis的哈希表最大的特色就是自动扩容。当它的哈希表容量不够时,可以 0/1切 换,然后自动扩容。下面具体分析哈希表的实现。 整个哈希系统由结构体 dict定义,其中 type包含一系列哈希表需要用的函数, dictht类型的数组 ht[2]表示两个哈希表实例,由 rehashidx指明下一个需要扩容的哈希 实例的编号, iterators记录外部使用哈希表的迭代器的数目。 typedef struct dict { dictType *type; void *privdata; dictht ht[2]; int rehashidx; /* rehashing not in progress if rehashidx == -1 */ int iterators; /* number of iterators currently running */ } dict; dictht为哈希表具体实现的结构体, table指向哈希中的记录,用数组+开链的形式 保存记录; size表示哈希表桶的大小,为 2的指数; sizemark= size-1,方便哈希值根 据 size取模; used记录了哈希表中的记录数目。 typedef struct dictht { dictEntry **table; unsigned long size; unsigned long sizemask; unsigned long used; } dictht; 哈希表使用开链的方式处理冲突,每条记录都是链表中的一个结点。 dictType在哈 希系统中包含了一系列可由应用程序定义的函数指针,包括 Hash函数、 Key复制、 Value 复制、 Key比较、 Key析构、 Value析构,以增加哈希系统的灵活性。 其中系统定义了三 种默认的 type,表示最常用的三种哈希表。 typedef struct dictType { unsigned int (*hashFunction)(const void *key); void *(*keyDup)(void *privdata, const void *key); Redis源代码分析 · Arch_Platform 6 / 32 void *(*valDup)(void *privdata, const void *obj); int (*keyCompare)(void *privdata, const void *key1, const void *key2); void (*keyDestructor)(void *privdata, void *key); void (*valDestructor)(void *privdata, void *obj); } dictType; extern dictType dictTypeHeapStringCopyKey; extern dictType dictTypeHeapStrings; extern dictType dictTypeHeapStringCopyKeyValue; Redis定义了一系列的宏用于操作哈希表,例如设置记录的 Value。对外提供的 API,除了常规的创建哈希表,增、删、改记录之外,有两类是比较特别的:自动扩容和迭 代器。 自动扩容 Redis用变量 dict_can_resize记录哈希是否可以自动扩容,由两个方法 dictEnableResize()和 dictDisableResize()设置该变量。 应用程序可以使用 dictResize()扩容,它首先判断是否允许扩容,及是否正在扩 容。若可以扩容,则调用 dictExpand()扩容。然后应用程序需要调用 dictRehashMilliseconds()启动扩容过程,并指定扩容过程中记录拷贝速度。 除了应用程序指定的扩容外,在调用 dictAdd()往哈希中添加记录时,系统也会通过 调用 _dictExpandIfNeeded()判断是否需要扩容。 _dictExpandIfNeeded()中,如 果正在扩容,则不会重复进行扩容;如果哈希表 size= 0,即桶数目为 0,则扩容到初始大 小;否则如果 used>=size,并且 can_resize==1或 used/size超过阀值(默认为 5) 时,以 max(used, size)的两倍为基数,调用 dictExpand()扩容。 if (dictIsRehashing(d)) return DICT_OK; if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE); if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) { return dictExpand(d, ((d->ht[0].size > d->ht[0].used) ? d->ht[0].size : d->ht[0].used)*2); } return DICT_OK; dictExpand()进行扩容时,会先选择一个满足 size需求的 2的指数,然后分配内存 空间,创建新的哈希表。如果此时 ht[0]为空,则直接将哈希表赋值给 ht[0];否则,赋值 给 ht[1],并启动拷贝过程,将 ht[0]的记录逐个桶地拷贝到 ht[1]中。置 rehashidx=0,表明正在扩容,且待拷贝的桶为 ht[0]->table[rehashidx]。 dictht n; /* the new hashtable */ unsigned long realsize = _dictNextPower(size); if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR; n.size = realsize; n.sizemask = realsize-1; n.table = zcalloc(realsize*sizeof(dictEntry*)); n.used = 0; if (d->ht[0].table == NULL) { d->ht[0] = n; Redis源代码分析 · Arch_Platform 7 / 32 return DICT_OK; } d->ht[1] = n; d->rehashidx = 0; return DICT_OK; dictIsRehashing()通过 rehashidx来判断是否正在扩容。这个方法在多处被调 用,当 dictAdd()往哈希表中添加记录时,也会通过该方法判断是否正在扩容。若正在扩 容,则调用 _dictRehashStep(),该函数判断,若此时 iterators==0,即没有迭代器 时,就从 ht[0]中拷贝一部分记录到 ht[1]。 拷贝过程在 dictRehash()中完成,该函数返回 0时,表示扩容结束, ht[0]中所有 记录都已拷贝到 ht[1],且 rehashidx被置为 -1;否则返回 1,表示扩容未结束。拷贝过 程中,将 ht[0]->table[rehashidx]拷贝到 ht[1]后, rehashidx++,直到 used==0,即所有记录拷贝完成。拷贝一个桶时,需要对桶中所有元素重新求哈希值,然 后一个个放入 ht[1]中。 dictRehash()通过参数,控制每次拷贝的桶的数目。该过程由 下面代码描述: int dictRehash(dict *d, int n) { if (!dictIsRehashing(d)) return 0; while(n--) { dictEntry *de, *nextde; if (d->ht[0].used == 0) { zfree(d->ht[0].table); d->ht[0] = d->ht[1]; _dictReset(&d->ht[1]); d->rehashidx = -1; return 0; } while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++; de = d->ht[0].table[d->rehashidx]; while(de) { unsigned int h; nextde = de->next; /* Get the index in the new hash table */ h = dictHashKey(d, de->key) & d->ht[1].sizemask; de->next = d->ht[1].table[h]; d->ht[1].table[h] = de; d->ht[0].used--; d->ht[1].used++; de = nextde; } d->ht[0].table[d->rehashidx] = NULL; d->rehashidx++; } return 1; } Redis源代码分析 · Arch_Platform 8 / 32 迭代器 迭代器提供了遍历哈希表中所有元素的方法,通过 dictGetIterator()获得迭代器 后,使用 dictNext(dictIterator *)方法获得下一个元素。当外部持有的迭代器数目 不为 0时,哈希表会暂停扩容操作。 迭代器遍历的过程,从 ht[0]开始,依次从第一个桶 table[0]开始遍历桶中的元 素,然后时 table[1], table[2], ..., table[size],若正在扩容,则会继续遍历 ht[1]中的桶。遍历桶中元素时,依次访问链表中的每个元素。 2.4. 内存 (zmalloc.h/zmalloc.h) 前文已提到, Redis通过自己的方法管理内存 ,,主要方法有 zmalloc(), zrealloc(), zcalloc()和 zfree(), 分别对应 C中的 malloc(), realloc()、 calloc()和 free()。相关代码在 zmalloc.h和 zmalloc.c中。 Redis自己管理内存的好处主要有两个:可以利用内存池等手段提高内存分配的性 能;可以掌握更多的内存信息,以便于 Redis虚拟内存 (VM)等功能中,决定何时将数据 swap到磁盘。 先回忆各个系统中常见的内存分配函数: malloc()分配一块指定大小的内存区域,并返回指向区域开头的指针,若分配失 败,则返回 NULL。 calloc()与 malloc()一样,分配一块指定大小的内存区域,成功时返回区域头指 针,失败返回 NULL。区别在于, calloc()的输入参数为 count和 size,即分配的项的数 目,及每一项的大小。 calloc()在成功分配内存空间后,会将空间内所有值置 0。 realloc()修改已分配的内存块的大小。若已分配的内存块后没有足够的空间用于扩 展内存块,则重新申请一块满足需要的内存块,并将旧的数据拷贝到新位置,释放旧的内存 块,返回指向新的内存块的指针;否则直接扩展原有的内存块。若分配失败,返回 NULL。 free()释放已分配的内存块。 内存分配 在 Redis中,如果系统中包含 TCMALLOC,则会使用 tc_malloc()等 TCMALLOC中的 方法代替 malloc()等原有的分配内存方法。 TCmalloc是 google perftools中的一个 组件。 #if defined(USE_TCMALLOC) #define malloc(size) tc_malloc(size) 首先看 zmalloc()和 zfree()两个最常用的方法。 Redis在申请内存时,除了申请需 要的 size外,还会多申请一块定长 (PREFIX_SIZE)的区域用于记录所申请的内存块的长 度。如果申请成功, Redis会使用宏函数( Redis中为性能考虑,大量使用宏函数) update_zmalloc_stat_alloc(size+PREFIX_SIZE, size)记录申请的内存块的相 关信息,以便监控内存使用状况;当内存块被 zfree()释放时,根据头部的信息可以快速 地获知被释放的内存区域的长度,然后通过宏函数 update_zmalloc_stat_free()标记 释放。源代码中,若系统支持 malloc_size()方法,则会使用它返回指针所指向的内存块 的大小( Mac OS X 10.4以上支持该方法 [3])。 有疑惑的是,在支持 malloc_size() 的系统中,为何还要多申请 PREFIX_SIZE的内存? Redis源代码分析 · Arch_Platform 9 / 32 void *zmalloc(size_t size) { void *ptr = malloc(size+PREFIX_SIZE); if (!ptr) zmalloc_oom(size); #ifdef HAVE_MALLOC_SIZE update_zmalloc_stat_alloc(redis_malloc_size(ptr),size); return ptr; #else *((size_t*)ptr) = size; // 在头部记录内存块的长度 update_zmalloc_stat_alloc(size+PREFIX_SIZE,size); return (char*)ptr+PREFIX_SIZE; #endif } 宏 update_zmalloc_stat_alloc()中,首先将要分配的空间与内存对齐,然后会 根据宏 zmalloc_thread_safe判断是否需要对内存信息记录表的相关操作加锁。虽然 Redis在大部分场景中是单线程读写的,即 thread_safe的,但启用虚拟内存 (VM),或持 久化 dump到磁盘等操作时会启动多线程,因此在多线程模式中,需要对部分操作加锁。 内存监控 used_memory记录了 Redis使用的内存总数。而多线程下 malloc()是线程安全的。 zmalloc_allocations[]记录了各个 size分配的内存块的数目,大于 256个字节的按 256算。应用程序可以通过 zmalloc_allocations_for_size(size)获得对应 size的 内存块的分配数目;也可以通过 zmalloc_used_memory()获得 Redis占用的总内存。这 些监控类的方法在 Redis的日志系统中被用到。 #define update_zmalloc_stat_alloc(__n,__size) do { \ size_t _n = (__n); \ size_t _stat_slot = (__size < ZMALLOC_MAX_ALLOC_STAT) ? __size : ZMALLOC_MAX_ALLOC_STAT; \ if (_n&(sizeof(long)-1)) _n += sizeof(long)-(_n&(sizeof(long)-1)); \ if (zmalloc_thread_safe) { \ pthread_mutex_lock(&used_memory_mutex); \ used_memory += _n; \ zmalloc_allocations[_stat_slot]++; \ pthread_mutex_unlock(&used_memory_mutex); \ } else { \ used_memory += _n; \ zmalloc_allocations[_stat_slot]++; \ } \ } while(0) 外部应用程序通过 zmalloc_enable_thread_safeness()方法开启 Redis内存模 块的线程安全模式,后文会分析哪些功能需要开启线程安全。 zcalloc(size)、 zrealloc()与 zmalloc()的处理策略类似,不再详述。 在部分操作系统中, Redis可以通过 zmalloc_get_rss()方法获得自己的进程占用 的内存信息。该信息通过操作系统提供,往往比 Redis自己记录的 used_memory更准确, 但其获取速度也较慢。这些信息也是用于虚拟内存功能。 除了内存相关的操作外, Redis在此还提供了一个复制字符串的方法 zstrdup(char *),该方法将申请一块与源字符串长度相同的内存区域,并用 memcpy()拷贝字符串的内 容。 Redis源代码分析 · Arch_Platform 10 / 32 3. 服务器模型 3.1. 事件处理 (ae.h/ae.c) Redis是单线程模型(虚拟内存等功能会启动其它线程 (进程 )),通过事件机制异步 地处理所有请求。 Redis的事件模型在不同的操作系统中提供了不同的实现, ae_epoll.h/ ae_epoll.c为 epoll的实现, ae_select.h/ae_select.c是 select的实现, ae_kqueue.h/ae_kqueue.c是 bsd中 kqueue的实现。 基本事件 Redis提供两种基本事件: FileEvent和 TimeEvent。前者是基于操作系统的异步 机制 (epoll/kqueue)实现的文件事件,后者是 Redis自己实现的定时器。 aeEventLoop是事件模型中最基本的结构体: typedef struct aeEventLoop { int maxfd; long long timeEventNextId; aeFileEvent events[AE_SETSIZE]; /* Registered events */ aeFiredEvent fired[AE_SETSIZE]; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; } aeEventLoop; maxfd标示当前最大的事件描述符。 events和 fired分别保存了已注册和已注销的 FileEvent, AE_SETSIZE是 Redis中可以注册的事件的上限,默认为 1024*10。 timeEventHead指向一个 TimeEvent的链表, timeEventNextId标示下一个定时器。 beforesleep是每次事件轮询前都会执行的函数,相当于 hook。 stop用于停止事件轮 询。 typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent; aeFileEvent可以用于 socket事件的监听。 mask表示要监听的事件类型, rfileProc和 wfileProc分别为读事件和写事件的响应函数。 clientData。 typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent; aeTimeEvent用于定时器 (timer),其实现是一个链表,其中每一个结点是一个 timer,并有独立 id。 when_sec和 when_ms指定了定时器的触发时间, timeProc为响应 函数, finalizerProc为删除定时器时的 “析构函数 ”。 Redis源代码分析 · Arch_Platform 11 / 32 aeFiredEvent表示触发的 FileEvent,其中 fd为文件描述符, mask为事件类型。 相关操作 Redis提供了一系列的 API用于操作事件。 aeCreateEventLoop()和 aeDeleteEventLoop(aeEventLoop*)分别用于初始化和删除事件循环。初始化 eventLoop时,会初始化对应操作系统的实现,例如 Linux中会使用 ae_epoll.c中的 aeApiCreate()初始化事件使用的文件描述符和事件列表。 Redis中使用了三个宏 AE_NONE、 AE_READABLE和 AE_WRITABLE表示监听的 FileEvent的类型,初始时 eventLoop中所有 events的监听类型为 AE_NONE。 aeStop(aeEventLoop*)用户暂停 事件轮询。 aeCreateFileEvent()和 aeDeleteFileEvent()用于创建和删除 FileEvent, 在 Linux系统中,会使用 epoll监听文件描述符,并用 EPOLLIN和 EPOLLOUT监听文件的 Read和 Write事件,并在 eventLoop->apiData中记录相关信息。 aeCreateTimeEvent()和 aeDeleteTimeEvent()用于创建和删除 TimeEvent。 aeMain(aeEventLoop*)是启动事件轮询的入口,内部实现是一个死循环,每一次 循环中先执行 eventLoop->beforesleep()方法,然后使用 aeProcessEvents()轮询 事件。 aeProcessEvents()中,首先找到最近的 timer(最先超时的),记下距该 timer触发的时间,并作为检查 FileEvent的超时时间。 Redis通过不同操作系统实现 (Linux:ae_epoll.c)中的 aeApiPoll()方法,检查是否触发了 FileEvent,若触发则 根据 mask决定响应函数 (rfileProc/wfileProc)。 FileEvent的优先级高于 TimeEvent,因此可以将 TimerEvent的触发时间用作 FileEvent轮询的超时时间。 检查完 FileEvent后,通过调用 processTimeEvents()检查 TimeEvent,触发所 有已超时的 timer的 timeProc,根据 timeProc的返回值,决定是删除 timer还是继续增 加 timer的触发时间。例如, timeProc返回 100时,则将 timer的触发时间增加 100ms, 即 100ms后再次触发该 timer。若 timeProc一直返回 100,则该 timer会每隔 100ms触发 一次。其中重要代码如下: ... retval = te->timeProc(eventLoop, id, te->clientData); processed++; if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); } else { aeDeleteTimeEvent(eventLoop, id); } ... 除了上述异步的事件机制外, Redis还提供了一个同步处理事件的 API: aeWait (int fd, int mask, long long mileseconds), fd指明了需要监听的文件描述 符, mask说明监听的事件类型, mileseconds指明超时事件。该方法内部通过 select实 现了阻塞型等待。 Redis提供 aeSetBeforeSleepProc()用于设置事件轮询中每次循环前执行的方 法。 初始化事件轮询 Redis启动时便会初始化事件轮询并启动, int main()中,调用 initServer()初 始化服务端。在 initServer()中,初始化事件轮询,通过 TimeEvent定制一个定时器 Redis源代码分析 · Arch_Platform 12 / 32 serverCron,每 100ms执行一次服务器相关的操作。在 Redis所有初始化工作完成后,执 行 aeMain()启动事件轮询。 int initServer() { // ... server.el = aeCreateEventLoop(); // ... } int main(int argc, char **argv) { // ... initServer(); // .... aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); aeDeleteEventLoop(server.el); } 事件系统在处理 TimeEvent和 FileEvent之前,会调用 BeforeSleepProc, Redis中事件循环前会调用 beforeSleep()。该函数负责许多功能,例如处理已经 ready 的客户端连接,处理虚拟内存的换入换出机制,处理 aof等等。 serverCron()是 Redis中非常重要的函数,负责协调 Redis中许多功能。首先更新 全局的时间戳 server.unixtime, Redis中各个模块中对时间要求不严格的地方,均通过 这个时间戳获取系统时间,以避免调用 time(NULL)的时间消耗。然后更新虚拟内存换入换 出策略相关的时间戳 server.lruclock。 /* We take a cached value of the unix time in the global state because * with virtual memory and aging there is to store the current time * in objects at every object access, and accuracy is not needed. * To access a global var is faster than calling time(NULL) */ server.unixtime = time(NULL); /* We have just 22 bits per object for LRU information. * So we use an (eventually wrapping) LRU clock with 10 seconds resolution. * 2^22 bits with 10 seconds resoluton is more or less 1.5 years. * * Note that even if this will wrap after 1.5 years it's not a problem, * everything will still work but just some object will appear younger * to Redis. But for this to happen a given object should never be touched * for 1.5 years. * * Note that you can change the resolution altering the * REDIS_LRU_CLOCK_RESOLUTION define. */ updateLRUClock(); 然后,如果收到 SIGTERM信息,会停止服务器。然后打印一些数据库相关的运行日 志。当没有后台的虚拟内存控制进程和备份进程时,会尝试对哈希表进行扩容。 /* We don't want to resize the hash tables while a bacground saving * is in progress: the saving child is created using fork() that is * implemented with a copy-on-write semantic in most modern systems, so * if we resize the HT while there is the saving child at work actually * a lot of memory movements in the parent will cause a lot of pages * copied. */ if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) { if (!(loops % 10)) tryResizeHashTables(); if (server.activerehashing) incrementallyRehash(); } Redis源代码分析 · Arch_Platform 13 / 32 然后输出客户端连接的相关数据日志,并关闭超时连接。再次检查,如果有后台进程 在进行备份,则等待它们完成,并进行一些清理,然后开始执行哈希表扩容。如果没有上述 后台进程,则判断是否需要进行备份。 /* Check if a background saving or AOF rewrite in progress terminated */ if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) { int statloc; pid_t pid; if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { if (pid == server.bgsavechildpid) { backgroundSaveDoneHandler(statloc); } else { backgroundRewriteDoneHandler(statloc); } updateDictResizePolicy(); } } else { /* If there is not a background saving in progress check if * we have to save now */ time_t now = time(NULL); for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; if (server.dirty >= sp->changes && now-server.lastsave > sp->seconds) { redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, sp->seconds); rdbSaveBackground(server.dbfilename); break; } } } 若服务器为 master,则释放过期的 key;若为 slave,则等待同步指令 DEL。 /* Expire a few keys per cycle, only if this is a master. * On slaves we wait for DEL operations synthesized by the master * in order to guarantee a strict consistency. */ if (server.masterhost == NULL) activeExpireCycle(); 然后检查物理内存使用率,若超过阀值,则将部分不常用的 Entity换出到磁盘。 Redis的虚拟内存的换入换出机制分两种,根据 server.vm_max_threads设置分别使用 不同的机制。 /* Swap a few keys on disk if we are over the memory limit and VM * is enbled. Try to free objects from the free list first. */ if (vmCanSwapOut()) { while (server.vm_enabled && zmalloc_used_memory() > server.vm_max_memory) { int retval = (server.vm_max_threads == 0) ? vmSwapOneObjectBlocking() : vmSwapOneObjectThreaded(); if (retval == REDIS_ERR && !(loops % 300) && zmalloc_used_memory() > (server.vm_max_memory+server.vm_max_memory/10)) { redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!"); } Redis源代码分析 · Arch_Platform 14 / 32 /* Note that when using threade I/O we free just one object, * because anyway when the I/O thread in charge to swap this * object out will finish, the handler of completed jobs * will try to swap more objects if we are still out of memory. */ if (retval == REDIS_ERR || server.vm_max_threads > 0) break; } } 最后,若服务器为 slave,则每 10次 serverCron(即 1秒)检查一次与 master的连 接式否正常。 /* Replication cron function -- used to reconnect to master and * to detect transfer failures. */ if (!(loops % 10)) replicationCron(); // Todo slave不正常 3.2. 套接字操作 (anet.h/anet.c) Redis在 anet.h/c中封装了所有底层套接字操作 : int anetTcpConnect(char *err, char *addr, int port); int anetTcpNonBlockConnect(char *err, char *addr, int port); int anetUnixConnect(char *err, char *path); int anetUnixNonBlockConnect(char *err, char *path); int anetRead(int fd, char *buf, int count); int anetResolve(char *err, char *host, char *ipbuf); int anetTcpServer(char *err, int port, char *bindaddr); int anetUnixServer(char *err, char *path); int anetTcpAccept(char *err, int serversock, char *ip, int *port); int anetUnixAccept(char *err, int serversock); int anetWrite(int fd, char *buf, int count); int anetNonBlock(char *err, int fd); int anetTcpNoDelay(char *err, int fd); int anetTcpKeepAlive(char *err, int fd); anetTcpServer()是建立网络套接字服务器的方法,即对 socket()、 bind()、 listen()等一系列操作的封装,并返回 socket的 fd。 anetUnixServer()建立本地套 接字服务器。 anetTcpConnect()、 anetTcpNonBlockConnect()用于建立阻塞型和非阻塞型 网络套接字连接。 anetUnixConnect()、 anetUnixNonBlockConnect()用于建立阻 塞型和非阻塞型的本地套接字连接。 anetNonBlock()将一个 fd设置为非阻塞型。 anetTcpNoDelay()将 TCP连接设为 非延迟的,即屏蔽 Nagle算法。 anetTcpKeepAlive()开启连接检测,避免对方宕机、网 络中断时 fd永远阻塞。 anetRead()和 anetWrite()用于套接字上的读写功能, anetTcpAccept()和 anetUnixAccept()分别在网络套接字和本地套接字上新增连接。 Redis通过这些 API封装了所有套接字操作,但这些操作并没有什么特别之处,不再 详细分析其实现。 3.3. 客户端连接 (networking.h/networking.c, redis.c/redis.h) 本节关注 Redis的并发框架,及客户端连接的建立和请求处理。上述事件模型,及网 络操作,拼凑出 Redis的服务器并发框架。 Redis源代码分析 · Arch_Platform 15 / 32 初始化服务器 main()中先调用 initServerConfig()初始化服务器配置。包括端口、物理文件等 基本信息,虚拟内存的各种设置,及 command table(server.commands)等。前文已说 过, Redis的 dict支持自定义各种行为(例如 Hash函数,比较函数等),初始化 command table时,首先用 commandTableDictType指定将 command dict的各种行为,然后 populateCommand()初始化各种命令。 Redis中命令由结构体 redisCommand定义,后 文分析。这里, Redis会将一系列的命令载入 command table中。 然后 main()调用 initServer()初始化事件轮询,紧接着会调用 anetTcpServer ()初始化服务器。 initServer() { // .... server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr); // ... if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) oom("creating file event"); // ... } 完成服务器的初始化后, Redis已开始监听端口,通过 aeCreateFileEvent()监听 端口的 READABLE事件,若该事件触发,则表示有新连接建立。 acceptTcpHandler()响 应 READABLE事件,调用 anetTcpAccept()建立连接,获得客户端连接的 fd。获得 fd 后,通过 createClient()方法,创建连接相关信息,保存在结构体 redisClient中。该 结构体保存了连接的 fd,输入 buf,输出 buf等一系列的信息。 Redis在处理请求时完全 是用单线程异步模式,通过 epoll监听所有连接的读写事件,并通过相应的响应函数处理。 Redis使用这样的设计来满足需求,很大程度上因为它服务的是高并发的短连接,且请求处 理事件非常短暂。这个模型下,一旦有请求阻塞了服务,整个 Redis的服务将受影响。 Redis采用单线程模型可以避免系统上下文切换的开销。 Redis的连接由结构体 redisClient(redis.h)定义,默认使用非阻塞、非延迟型 的连接。连接由 createClient()方法创建,由 freeClient()方法释放。 createClient()的过程中,在连接的 fd上监听 READABLE事件,当该事件触发时说明接 收到客户端请求,调用 readQueryFromClient()方法处理请求。然后初始化连接的输入 缓冲区,参数列表等等。最后将连接加入全局的连接列表 server.clients中。 /* With multiplexing we need to take per-clinet state. * Clients are taken in a liked list. */ typedef struct redisClient { int fd; redisDb *db; int dictid; sds querybuf; int argc; robj **argv; int reqtype; int multibulklen; /* number of multi bulk arguments left to read */ long bulklen; /* length of bulk argument in multi bulk request */ list *reply; int sentlen; time_t lastinteraction; /* time of the last interaction, used for timeout 最 后一次交互的时间戳 */ int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... 连 接的状态标记,初始时为 0 */ Redis源代码分析 · Arch_Platform 16 / 32 int slaveseldb; /* slave selected db, if this client is a slave */ int authenticated; /* when requirepass is non-NULL */ int replstate; /* replication state if this is a slave */ int repldbfd; /* replication DB file descriptor */ long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ blockingState bpop; /* blocking state */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ /* Response buffer */ int bufpos; char buf[REDIS_REPLY_CHUNK_BYTES]; } redisClient; 以下几种情况会使用 freeClient()释放连接:从客户端读数据错误、已到达服务器 最大连接数 (server.maxclients)、 客户端的状态标记中,有三个状态会让 Redis不再处理客户端命令: REDIS_BLOCKED(处理阻塞操作)和 REDIS_IO_WAIT(等待虚拟内存)会让客户端读入 数据后阻塞,不处理命令。 REDIS_CLOSE_AFTER_REPLY标记会让客户端输出数据后立即 关闭,此时也不再处理输入的命令。 3.4. 命令处理 Redis的命令由结构体 redisCommand定义,包含了指向命令处理函数的指针。 typedef void redisCommandProc(redisClient *c); typedef void redisVmPreloadProc(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); struct redisCommand { char *name; redisCommandProc *proc; int arity; int flags; redisVmPreloadProc *vm_preload_proc; int vm_firstkey; int vm_lastkey; int vm_keystep; }; Redis在 readQueryFromClient()中处理客户端的 Input,每次尝试读取长度为 REDIS_IOBUF_LEN(默认为 1024)的数据,输入的数据保存在 redisClient结构体的 querybuf中 sdscatlen(c->querybuf,buf,nread),然后调用 processInputBuffer()处理数据。 若客户端处于 ready状态(即 flags中无 REDIS_BLOCKED、 REDIS_IO_WAIT和 REDIS_CLOSE_AFTER_REPLY)状态,则尝试处理输入数据。若客户端没有通过 reqtype 指定使用的协议,则根据数据的第一个字节判断,若 querybuf[0]==’*’,则使用 MULTIBULK协议,否则为 INLINE协议,分别调用对应的协议解包函数并处理命令,完成后 重置客户端 (resetClient())。 Redis源代码分析 · Arch_Platform 17 / 32 void processInputBuffer(redisClient *c) { while(sdslen(c->querybuf)) { if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; if (!c->reqtype) { if (c->querybuf[0] == '*') { c->reqtype = REDIS_REQ_MULTIBULK; } else { c->reqtype = REDIS_REQ_INLINE; } } if (c->reqtype == REDIS_REQ_INLINE) { if (processInlineBuffer(c) != REDIS_OK) break; } else if (c->reqtype == REDIS_REQ_MULTIBULK) { if (processMultibulkBuffer(c) != REDIS_OK) break; } else { redisPanic("Unknown request type"); } if (c->argc == 0) { resetClient(c); } else { if (processCommand(c) == REDIS_OK) resetClient(c); } } } INLINE协议非常简单,按换行符 (‘\r\n’)分隔,一行是一条命令。命令中参数用空 格分隔,通过 processInlineBuffer()解析后,存放在 argc/argv中,其中 argv是 redisObject对象的数组。 MULTIBULK协议 //TODO processCommand()中处理所有客户端的命令。首先是 ”quit”, Redis接收到这个 命令后将客户端连接断开;否则,使用 lookupCommand()从 command table中查找命 令,然后通过命令的处理函数 (cmd->proc())处理命令。 以 ‘get’命令为例,它在 readonlyCommandTable中定义,在 initServerConfig ()初始化过程中载入 command table,若服务器接收到 get命令,则通过 lookupCommand()找到对应的 redisCommand结构体的实例,通过处理函数 getCommand()处理命令。 struct redisCommand readonlyCommandTable[] = { {"get",getCommand,2,0,NULL,1,1,1}, // .... } redis将命令处理函数的实现写在 t_***.c中,例如 t_string.c实现了字符串操作 相关的命令,包括 getCommand()。 Redis源代码分析 · Arch_Platform 18 / 32 4. 虚拟内存 虚拟内存是 Redis中最复杂的模块,它几乎影响了 Redis所有模块的功能。 Redis作 为一个内存数据库,数据量受内存制约,为了存储更多的数据, Redis在 2.0加入了虚拟内 存功能,允许将不常访问的数据的 value放入磁盘中。 虚拟内存部分的分析分为两部分:获取在 vm中的数据的过程,数据在 vm中换入换出策 略。分析前者前,先看 Redis为支持虚拟内存的数据结构。 Redis底层用结构体 redisObject表示保存的数据项。 /* The actual Redis Object */ #define REDIS_LRU_CLOCK_MAX ((1<<21)-1) /* Max value of obj->lru */ #define REDIS_LRU_CLOCK_RESOLUTION 10 /* LRU clock resolution in seconds */ typedef struct redisObject { unsigned type:4; unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */ unsigned encoding:4; unsigned lru:22; /* lru time (relative to server.lruclock) */ int refcount; void *ptr; /* VM fields are only allocated if VM is active, otherwise the * object allocation function will just allocate * sizeof(redisObjct) minus sizeof(redisObjectVM), so using * Redis without VM active will not have any overhead. */ } robj; 其中 type表示 value的数据类型: string、 list、 set; storage表示存储的位 置, Redis共定义了四种存储状态,分别为内存,磁盘(虚拟内存),正在从内存换出到 磁盘和正在从磁盘换入内存; encoding; lru与 server.lruclock相关; refcount为 引用计数。 #define REDIS_VM_MEMORY 0 /* The object is on memory */ #define REDIS_VM_SWAPPED 1 /* The object is on disk */ #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */ #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */ 与此同时, Redis定义结构体 vmPointer,表示被从内存换出到磁盘上的数据项。 /* The VM pointer structure - identifies an object in the swap file. * * This object is stored in place of the value * object in the main key->value hash table representing a database. * Note that the first fields (type, storage) are the same as the redisObject * structure so that vmPointer strucuters can be accessed even when casted * as redisObject structures. * * This is useful as we don't know if a value object is or not on disk, but we * are always able to read obj->storage to check this. For vmPointer * structures "type" is set to REDIS_VMPOINTER (even if without this field * is still possible to check the kind of object from the value of 'storage').*/ typedef struct vmPointer { unsigned type:4; unsigned storage:2; /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ unsigned notused:26; unsigned int vtype; /* type of the object stored in the swap file */ off_t page; /* the page at witch the object is stored on disk */ off_t usedpages; /* number of pages used on disk */ } vmpointer; Redis源代码分析 · Arch_Platform 19 / 32 这个设计非常巧妙, vmpointer的前 32个字节与 redisObject是可以互相转换的。 vmPointer的具体定义待后文再行分析。 off_t vm_page_size; off_t vm_pages; off_t vm_next_page; /* Next probably empty page */ off_t vm_near_pages; /* Number of pages allocated sequentially */ unsigned char *vm_bitmap; /* Bitmap of free/used pages */ Redis将内存按页 (Page)管理,页的大小与数目在服务启动时指定。 Redis通过位图 索引 server.vm_bitmap管理页,其中 0表示未用, 1表示已用。 Redis中,一个页最多只 存放一条数据,而一条数据可以拆分存放在多个页中。 /* An I/O thread process an element taken from the io_jobs queue and * put the result of the operation in the io_done list. While the * job is being processed, it's put on io_processing queue. */ list *io_newjobs; /* List of VM I/O jobs yet to be processed */ list *io_processing; /* List of VM I/O jobs being processed */ list *io_processed; /* List of VM I/O jobs already processed */ Redis中,将 VM相关的读写都用任务 (job)来描述及执行, io_newjobs表示新增的 job, processing和 processed分别表示正在执行和完成的 job。 Job总共有 3种: LOAD、 PREPARE_SWAP和 DO_SWAP,分别为从磁盘读入内存、计算换出数据需要交换的页 及换出到磁盘。 /* VM threaded I/O request message */ #define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */ #define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */ #define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */ typedef struct iojob { int type; /* Request type, REDIS_IOJOB_* */ redisDb *db;/* Redis database */ robj *key; /* This I/O request is about swapping this key */ robj *id; /* Unique identifier of this job: this is the object to swap for REDIS_IOREQ_*_SWAP, or the vmpointer objct for REDIS_IOREQ_LOAD. */ robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */ off_t page; /* Swap page where to read/write the object */ off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */ int canceled; /* True if this command was canceled by blocking side of VM */ pthread_t thread; /* ID of the thread processing this entry */ } iojob; job的结构体种,记录了相关的信息。任何 Job,都通过 queueIOJob()加入 server.io_newjobs,当 Redis发现能启动新的 VM线程,或者有空闲的 VM线程时,会执 行 job,线程的入口为 IOThreadEntryPoint,该函数负责执行 job,并将 job从 server.io_newjobs移入 server.io_processing。然后 Redis通过管道在线程间交 流, VM线程将完成 job的信号通过管道发送到主线程,主线程进行善后。 Job具体的执行过 程,在后文的数据读取过程中有更详细的分析。 4.1. 数据读取过程 对于 getCommand(),从 Redis中查询数据, Redis先用 lookupKey()从哈希表中 查找对应的 dictEntity,校验其合法性后,如果服务器没有后台备份进程在工作,就更新 它的时间戳。但如果启动了虚拟内存功能,就必须判断 dictEntity的存储状态。如果该 dictEntity正在内存中 (REDIS_VM_MEMORY),则不做操作;如果它正在被换出到磁盘 Redis源代码分析 · Arch_Platform 20 / 32 (REDIS_VM_SWAPPING),则停止换出操作;如果它在磁盘,则调用 vmLoadObject()将 dictEntity从磁盘读出;如果它正在从磁盘换入到内存,则先调用 vmLoadObject()将 它立即从磁盘读出。具体来看这个过程,在命令处理的入口 processCommand()中, if (server.vm_enabled && server.vm_max_threads > 0 && blockClientOnSwappedKeys(c,cmd)) return REDIS_ERR; call(c,cmd); 在虚拟内存开启,并以多线程模式执行时,会调用 blockClienkOnSwappedKeys (),将可能已经换出到磁盘的数据加载到内存,并在加载未完成前阻塞客户端连接(将客 户端状态置为 REDIS_IO_WAIT)。在 blockClienkOnSwappedKeys()中,如果命令设 置了预加载函数 (vm_preload_proc),则调用预加载函数处理,否则调用 waitForMultipleSwappedKeys(),将需要的 key载入内存。 int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) { if (cmd->vm_preload_proc != NULL) { cmd->vm_preload_proc(c,cmd,c->argc,c->argv); } else { waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv); } /* If the client was blocked for at least one key, mark it as blocked. */ if (listLength(c->io_keys)) { c->flags |= REDIS_IO_WAIT; aeDeleteFileEvent(server.el,c->fd,AE_READABLE); server.vm_blocked_clients++; return 1; } else { return 0; } } waitForMultipleSwappedKeys和命令设置的预加载函数的载入过程均由 waitForSwappedKey()开始,该函数首先从数据库中找到 cmd需要的数据项的 dictEntity对象,根据它存储的位置,如果为 REDIS_VM_MEMORY,则直接返回,无需阻 塞客户端;如果为 REDIS_VM_SWAPPING,则用 vmCancelThreadedIOJob(o)取消正在 进行的换出操作,并直接返回,无需阻塞客户端。其余情况,均阻塞客户端。 每个客户端的结构体均有链表 c->io_keys,保存所有等待 swapping的对象的 key。 如果 cmd需要的数据项在磁盘上,则将它的 key加入 c->io_keys。然后将 {c, key}加入 c->db->io_keys,这个 db其实是全局的 server->db,它为每个 key,保存了一个阻塞 在它上面的客户端的队列。 接下来,如果该 key已经换出到磁盘,并且没有 job将它从磁盘读出,则创建一个 job,将它从磁盘换出, job中用 vmpoiner对象标记需要换出的 dictEntity。 int waitForSwappedKey(redisClient *c, robj *key) { struct dictEntry *de; robj *o; list *l; /* If the key does not exist or is already in RAM we don't need to * block the client at all. */ de = dictFind(c->db->dict,key->ptr); if (de == NULL) return 0; o = dictGetEntryVal(de); if (o->storage == REDIS_VM_MEMORY) { Redis源代码分析 · Arch_Platform 21 / 32 return 0; } else if (o->storage == REDIS_VM_SWAPPING) { /* We were swapping the key, undo it! */ vmCancelThreadedIOJob(o); return 0; } /* OK: the key is either swapped, or being loaded just now. */ /* Add the key to the list of keys this client is waiting for. * This maps clients to keys they are waiting for. */ listAddNodeTail(c->io_keys,key); incrRefCount(key); /* Add the client to the swapped keys => clients waiting map. */ de = dictFind(c->db->io_keys,key); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); retval = dictAdd(c->db->io_keys,key,l); incrRefCount(key); redisAssert(retval == DICT_OK); } else { l = dictGetEntryVal(de); } listAddNodeTail(l,c); /* Are we already loading the key from disk? If not create a job */ if (o->storage == REDIS_VM_SWAPPED) { iojob *j; vmpointer *vp = (vmpointer*)o; o->storage = REDIS_VM_LOADING; j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_LOAD; j->db = c->db; j->id = (robj*)vp; j->key = key; incrRefCount(key); j->page = vp->page; j->val = NULL; j->canceled = 0; j->thread = (pthread_t) -1; lockThreadedIO(); queueIOJob(j); unlockThreadedIO(); } return 1; } 由于 vm可能有多线程操作,所以需要将 job队列操作放入临界区。 queueIOJob的过 程非常简单,将 job加入 server->io_newjobs队列,若当前的 IO线程数少于上限,则创 建一个 IO线程处理该 Job。 void queueIOJob(iojob *j) { redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n", (void*)j, j->type, (char*)j->key->ptr); listAddNodeTail(server.io_newjobs,j); Redis源代码分析 · Arch_Platform 22 / 32 if (server.io_active_threads < server.vm_max_threads) spawnIOThread(); } IO线程的入口为 IOThreadEntryPoint(),该函数循环处理 server- >io_newjobs中的 job,正在被处理的任务,从 server->io_newjobs中移入 server- >processing,根据 job类型选择不同的处理方法,如 REDIS_IOJOB_LOAD的 job,由 vmReadObjectFromSwap()处理,将磁盘的数据读入内存。完成的 job被移入 server- >processed。最后,通过 server->io_ready_pipe_write通知其它线程,处理 server->processed中的 job,每当完成一个 job时,向 io_ready_pipe写入一个字 符。 server->io_ready_pipe_read和 server->io_ready_pipe_write向线程间 提供通信服务,在虚拟内存初始化过程 vmInit()中,监听 io_ready_pipe_read上的读 事件,事件响应函数 vmThreadedIOCompletedJob处理完成的 job。 vmThreadedIOCompletedJob根据 job的类型处理其 “善后 ”工作,例如 REDIS_IOJOB_LOAD,此时 key已被从 vm中换入内存,调用 handleClientsBlockedOnSwappedKey唤醒被该 key阻塞的客户端。如前文所述, c- >db->io_keys(即 server->db->io_keys)中保存了 {key, c},即阻塞在该 key上的 所有客户端。对于 db->io_keys中的所有 c,调用 dontWaitForSwappedKey()将他们从 db->io_keys中删除;同时, c->io_keys里保存了该客户端需要阻塞读取的 key,也将 被换出的 key从 c->io_keys中删除。最后,若 c已经不再被任何 key阻塞,将它加入 server.io_ready_clients,准备进行 “正常 ”的读写操作。 最后一个问题,阻塞状态的客户端,加入 server.io_ready_clients回到正常状 态后,在哪里重新监听 READABLE事件?前文分析 Redis的事件机制时,曾提到 aeMain() 在每次 epoll循环前,会执行 beforeSleep()方法, Redis在该方法中检查 server.io_ready_clients,若有 ready的客户端,则重新在它上面监听 READABLE事 件,并处理它的命令。 4.2. 数据交换策略 Redis的虚拟内存系统有两种可用的换入换出策略:阻塞式 (Blocking Virtual Memory)和多线程非阻塞式 (Threaded Virtual Memory IO),相关代码都在 vm.c 中。 Redis在第一次建库时(无论是 AOF备份还是 Snapshot备份),如果需要将数据换出 到 vm,均使用阻塞式交换;之后 Redis会在 serverCron()中与 vm交换数据。 /* Swap a few keys on disk if we are over the memory limit and VM * is enbled. Try to free objects from the free list first. */ if (vmCanSwapOut()) { while (server.vm_enabled && zmalloc_used_memory() > server.vm_max_memory) { int retval = (server.vm_max_threads == 0) ? vmSwapOneObjectBlocking() : vmSwapOneObjectThreaded(); // ... if (retval == REDIS_ERR || server.vm_max_threads > 0) break; } } Redis源代码分析 · Arch_Platform 23 / 32 vmSwapOneObjectBlocking()是阻塞式交换, vmSwapOneObjectThreaded() 是多线程交换,最终交换的代码都在 vmSwapOneObject()中。 Redis每次从每个库中随 机选择 5个数据,用 computeObjectSwappability()测试它们换出到磁盘的可行性,最 后选择 1个最需要换出到磁盘的,用阻塞或非阻塞的方式将它换出到磁盘。 多线程模式下,换出的方式和读入类似,也是创建一个类型为 REDIS_IOJOB_PREPARE_SWAP的 job,启动该线程后,按 Snapshot备份的编码方式,将 需要换出的数据编码,计算其长度,进而得到需要换出的页的数目。然后寻找一块连续的满 足要换出的页的数目的区域,创建类型为 REDIS_IOJOB_DO_SWAP的 job,进行换出,并 标记该数据的状态。 Redis在 vm中寻找连续空闲区域的算法比较简单,直接进行简单的枚 举:如果当前区域空闲,则判断是否有满足需求连续的空闲区域;如果当前区域有数据,则 标记遇到有数据的区域的次数,如果连续 REDIS_VM_MAX_RANDOM_JUMP/4次遇到非空闲 区域,则将指针往前跳跃一个不超过 REDIS_VM_MAX_RANDOM_JUMP的随机数。 阻塞式换出就比较简单了,只是将多线程模式下的几个步骤阻塞式执行。 5. 备份机制 Redis是内存数据库,支持两种方式 ——snapshot和 aof,将数据从内存 dump到磁 盘。 snapshot是快照式备份,每次将 Redis全库 dump到磁盘, aof是流水式备份,每次将 Redis数据库的修改日志 dump到磁盘,并定期整理日志。 5.1. Snapshot Redis支持 Snapshot(快照)式备份。 Redis可以自动检测备份时机,设置每 N秒检 查,若发生 M次以上数据更新操作,则开始 Snapshot备份,也可以由客户端发送 save或 bgsave命令手动启动备份。 save是阻塞式备份,备份过程中 Redis会停止服务; bgsave是后台备份, Redis将 新建一个备份进程负责将全库 dump到磁盘。 saveCommand()中,首先检查是否有 bgsave的后台备份进程,若没有,则执行 rdbSave()进行全库备份。 bgsaveCommand()中,也会检查是否有 bgsave的后台备份进程,若没有,则执行 rdbSaveBackground(),启动备份子进程,在后台全库备份。备份子进程最终也是通过 rdbSave()备份数据库。 rdbSaveBackground()中,首先通过前文所说的 waitEmptyIOJobsQueue()检查 是否有 vm的 IO线程在进行数据交换,如果有则阻塞,等待所有 vm的 IO线程完成工作。然后 fork()备份子进程。在父进程中, server.bgsavechildpid记录了备份子进程的 pid, 并通过 updateDictResizePolicy()禁止 Hash表自动扩容。在子进程中,如果启用了 vm,则打开 vm swap文件,然后调用 rdbSave()执行备份。因为之前已经检查了所有 vm的 IO线程是否完成,因此这里打开 vm的 swap文件并不会和 vm的 IO线程冲突。 int rdbSaveBackground(char *filename) { pid_t childpid; if (server.bgsavechildpid != -1) return REDIS_ERR; if (server.vm_enabled) waitEmptyIOJobsQueue(); server.dirty_before_bgsave = server.dirty; Redis源代码分析 · Arch_Platform 24 / 32 if ((childpid = fork()) == 0) { /* Child */ if (server.vm_enabled) vmReopenSwapFile(); if (server.ipfd > 0) close(server.ipfd); if (server.sofd > 0) close(server.sofd); if (rdbSave(filename) == REDIS_OK) { _exit(0); } else { _exit(1); } } else { /* Parent */ if (childpid == -1) { redisLog(REDIS_WARNING,"Can't save in background: fork: %s", strerror(errno)); return REDIS_ERR; } redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid); server.bgsavechildpid = childpid; updateDictResizePolicy(); return REDIS_OK; } return REDIS_OK; /* unreached */ } rdbSave()中首先通过 waitEmptyIOJobsQueue()检查 vm的 IO线程,并阻塞直到 所有 vm的 IO线程完成。因为备份过程中,需要备份 vm的 swap文件,如果有未完成的 vm的 IO线程,会导致数据不同步。 首先建立一个 tmp文件,向它写入备份数据。依次循环每个 db: server.db, server.db+1, ..., server.db+server.dbnum,通过 dict的迭代器 dictIterator访问 db中的所有数据,依次写入 tmp文件。 Redis在写备份文件时做了许多优化,例如 rdbSaveLen()用于写入长度信息,写入 的数字不超过 32bit整数。 Redis将长度数字按二进制长度分三类:小于 6bit, 6bit至 14bit, 14bit至 32bit,并用前缀 REDIS_RDB_6BITLEN(00)、 REDIS_RDB_14BITLEN(01)、 REDIS_RDB_32BITLEN(10)标示数字长度。将前缀与数 字一起写入磁盘。这个优化类似于 Protobuf里压缩数字的方法。类似的压缩还有很多,再 此不一一分析。 int rdbSaveLen(FILE *fp, uint32_t len) { unsigned char buf[2]; int nwritten; if (len < (1<<6)) { /* Save a 6 bit len */ buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6); if (rdbWriteRaw(fp,buf,1) == -1) return -1; nwritten = 1; } else if (len < (1<<14)) { /* Save a 14 bit len */ buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6); buf[1] = len&0xFF; if (rdbWriteRaw(fp,buf,2) == -1) return -1; nwritten = 2; } else { /* Save a 32 bit len */ buf[0] = (REDIS_RDB_32BITLEN<<6); Redis源代码分析 · Arch_Platform 25 / 32 if (rdbWriteRaw(fp,buf,1) == -1) return -1; len = htonl(len); if (rdbWriteRaw(fp,&len,4) == -1) return -1; nwritten = 1+4; } return nwritten; } 在备份文件中, Redis写入 type/key/value数据,并放弃所有过期的数据。特别 地,对于 vm中的数据,会先读入内存,再将数据写入备份文件。 父进程在 serverCron中检查备份进程,如果存在 bgsave进程或者 bgrewrite进程 ( AOF备份,下节分析),则 wait3()等待进程完成,完成后调用 backgroundSaveDoneHandler()或者 backgroundRewriteDoneHandler()处理。 snapshot式备份中, Redis会停止 vm的换入换出操作,停止更新 LRU策略中的标记 信息,但不会影响 Redis对内存中的数据的读写。 Redis通过调整 snapshot备份的粒度, 适应各种应用需求。 5.2. AOF 除了 Snapshot备份模式外 Redis还支持 AOF(流水式 )备份,它保存所有操作的 commit log,并能自动地进行全库备份和删除无用的 commit log。 Redis在写 commit log时, Redis并不是每次处理请求时都将请求写入磁盘,它会 用一段内存 Buffer缓存 commit log,并在一定时间将缓存中的内容统一写入磁盘。 为 了避免磁盘缓存的影响,可以设置三种策略进行 fsync():每次写操作后均调用 fsync (),每秒调用一次 fsync(),从不调用 fsync(),三种不同的策略获得不同的性能和一致 性。 Redis将所有操作保存在 commit log中, commit log的文件是追加写。当 commit log的大小无法承受时,可以手工通过 bgrewriteaof命令产生一个快照(这个不同于 Snapshot备份),具体构造方式后文分析。 AOF有四种操作: •feedAppendOnlyFile()-将命令写入 AOF。该方法在 call()中被调用,用于将 所有 Redis处理的命令写入 AOF;该方法在 Redis发现过期的 Keys被调用,记录清除过期 Key的操作 call()是 Redis中所有命令处理的入口,当启用了 AOF且数据有变化时 (server.dirty有变化),将命令用 feedAppendOnlyFile()写入 AOF,该函数中,将 命令编码后写入 server.aofbuf,当后台 rewriteaof进程存在时,同时将编码后的命令 写入 server.bgrewritebuf, bgrewritebuf的作用后文解释。 /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ if (server.bgrewritechildpid != -1) server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); Redis源代码分析 · Arch_Platform 26 / 32 •rewriteAppendOnlyFile()-产生快照,并更新 aof文件。 rewriteAppendOnlyFile()只在 rewriteAppendOnlyFileBackground()中 被调用。 Redis产生 AOF快照主要分为以下几步: (1) fork()一个 rewrite子进程,调用 rewriteAppendOnlyFile()产生快照。 (2) 当 rewrite进程开始后, rewriteAppendOnlyFile()扫描数据库中 所有数 据 ,将他们编码后写入临时文件。 AOF文件的编码形式为文本,即人肉可读的编码。 (3)rewrite时父进程照常接收请求,并将流水日志写入 server.bgrewriteaofbuf中。 (4)子进程完成工作,父进程在 serverCron()中通过 wait3()获知其状态后,调用 backgroundRewriteDoneHandler()将 server.bgrewriteaofbuf中的内容作为新 的 commit log,覆盖原有 server.aofbuf。 (5) 父进程将 rewrite子进程生成的临时文件改名,作为新的 aof文件。用于未来恢 复数据。 Redis的 AOF快照机制的基础,是数据库的大小一定小于操作日志的大小,否则产生 的快照文件可能远远超过 commit log文件的大小。因此,此处有一点值得考虑, rewrite过程是否可以只写入 aofbuf中改变了的数据的快照? •flushAppendOnlyFile()-将内存 buffer中的 commit log写到磁盘上。 Redis在 beforeSleep()中,及关闭 AOF功能时,将内存 buffer中的 log写入磁盘。 •loadAppendOnlyFile()-重放 AOF文件。该方法在 Redis启动时被调用,从 AOF 文件中重建数据库。 Redis通过 AOF重建时,构造一个虚拟的 client(),向自己发送重建的命令。而恢复 数据只需要将 AOF快照重新载入数据库,并回放 commit log中的操作。 6. 主从同步 Redis支持主从同步,其官方文档 http://www.redis.io/topics/ replication 简要介绍了它的主从同步机制。翻译官方文档的文章, Redis的主从同步 有以下特点: •一个 Master可以有多个 Slave •Slave可以接受其它 Slave的连接,因此 Masters和 Slaves之间可以行程一个网络 •同步时,作为 Master的一端(可能其状态是 Slave)的同步是非阻塞的,它可以继 续处理命令;作为 Slave一端的第一次同步是阻塞的。 •主从机制可以用于提高扩展性,比如可以用多个 Slaves处理复杂的读请求。 •可以通过 Slave备份数据,以降低 Master的负载。 6.1. 建立连接 接下来我们看 Redis中 Slave和 Master是如何建立连接的。 Redis通过 server.replstate表明 Master和 Slave的状态,初始时,状态均为 Redis源代码分析 · Arch_Platform 27 / 32 REDIS_REPL_NONE。 server.masterhost和 server.masterport为 Master的地址, Master结点的这两个变量为 NULL。 Slave的 server.replstate状态除了 REDIS_REPL_NONE外,还有 /* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ #define REDIS_REPL_CONNECT 1 /* Must connect to master */ #define REDIS_REPL_TRANSFER 2 /* Receiving .rdb from master */ #define REDIS_REPL_CONNECTED 3 /* Connected to master */ Master对于每个 Slave都维护一个状态,在结构体 redisClient.replstate中, 因为 Slave对于 Master,其本质也是一个客户端。 Master对每个 Slave标记状态,有四 种: /* Slave replication state - from the point of view of master * Note that in SEND_BULK and ONLINE state the slave receives new updates * in its output queue. In the WAIT_BGSAVE state instead the server is waiting * to start the next background saving in order to send updates to it. */ #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */ #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */ #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */ #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */ REDIS_REPL_WAIT_BGSAVE_START表示 Master准备 dump数据到磁盘,任何新建 的 Slave连接均处于该状态; Master开始数据 dump后,将 Slave的状态修改为 REDIS_REPL_WAIT_BGSAVE_END,当 dump完成后,通知 Slave,状态变为 REDIS_REPL_SEND_BULK,准备向其发送数据;当发送完成, Slave就绪后,状态变为 REDIS_REPL_ONLINE,至此 Master可以向状态为 REDIS_REPL_ONLINE的 Slave发送更 新命令。 对于 Slave,如果 server.replstate!=REDIS_REPL_CONNECTED,则无法处理 命令(获取 Slave状态的命令除外);对于 Master,如果 c.replstate! =REDIS_REPL_ONLINE,也不会向它发送命令。 serverCron()中每隔 10秒会调用 replicationCron(),处理主从同步相关的功 能。 replicationCron()会检测同步数据传输超时, Slave与 Master连接断开等问题。 在分析可能的故障前,先分析建立连接的过程。 6.1.1. Master Redis启动时可以通过配置文件指定为 Master状态或 Slave状态, Slave也可以通过 Sync命令与另一个 Master(可能是 Master,也可能是与 Master有连接的 Slave)建立 主从连接。下面分析 syncCommand()的处理。 首先,如果发送 sync命令的结点已经是 Slave状态,则无需再次同步。如果接受 sync指令的结点,本身也是 Slave,且没有与任何 Master建立连接,则无法响应 sync命 令。如果 Slave与 Master之间还有其它交互命令没有处理完,也无法继续响应 sync命令。 然后 Master开始检测 bgsave状态,如果已经有 bgsave进程,再判断是否正在响应 其它 Slave的 sync命令,且正在 dump数据( Slave的 replstate为 REDIS_REPL_BGSAVE_END),则直接将新 Slave的 replstate状态置为 REDIS_REPL_BGSAVE_END,并准备将 dump好的数据传给新 Slave。如果没有 dump好的 Redis源代码分析 · Arch_Platform 28 / 32 数据,则将新 Slave的状态置为 REDIS_REPL_BGSAVE_START,并等待 bgsave进程完 成,之后再启动 bgsave,为 Slave dump数据。 如果没有 bgsave进程,则启动新的 bgsave进程,并将 Slave的状态置为 REDIS_REPL_BGSAVE_END。 最后,将新 Slave加入 Master的 server.slaves列表。 void syncCommand(redisClient *c) { /* ignore SYNC if aleady slave or in monitor mode */ if (c->flags & REDIS_SLAVE) return; /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) { addReplyError(c,"Can't SYNC while not connected with my master"); return; } /* SYNC can't be issued when the server has pending data to send to * the client about already issued commands. We need a fresh reply * buffer registering the differences between the BGSAVE and the current * dataset, so that we can copy to other slaves if needed. */ if (listLength(c->reply) != 0) { addReplyError(c,"SYNC is invalid with pending input"); return; } redisLog(REDIS_NOTICE,"Slave ask for synchronization"); /* Here we need to check if there is a background saving operation * in progress, or if it is required to start one */ if (server.bgsavechildpid != -1) { /* Ok a background save is in progress. Let's check if it is a good * one for replication, i.e. if there is another slave that is * registering differences since the server forked to save */ redisClient *slave; listNode *ln; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break; } if (ln) { /* Perfect, the server is already registering differences for * another slave. Set the right state, and copy the buffer. */ listRelease(c->reply); c->reply = listDup(slave->reply); c->replstate = REDIS_REPL_WAIT_BGSAVE_END; redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { /* No way, we need to wait for the next BGSAVE in order to * register differences */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC"); } } else { /* Ok we don't have a BGSAVE in progress, let's start one */ redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC"); Redis源代码分析 · Arch_Platform 29 / 32 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) { redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE"); addReplyError(c,"Unable to perform background save"); return; } c->replstate = REDIS_REPL_WAIT_BGSAVE_END; } c->repldbfd = -1; c->flags |= REDIS_SLAVE; c->slaveseldb = 0; listAddNodeTail(server.slaves,c); return; } 如前文 Snapshot快照分析,当 bgsave进程备份完成时,在 serverCron()中会调用 backgroundSaveDoneHandler()处理,该函数中调用 updateSlavesWaitingBgsave()处理等待 bgsave的 Slave结点。 updateSlavesWaitingBgsave()检查 server.slaves列表,当发现状态为 REDIS_REPL_WAIT_BGSAVE_START的结点时,会再次启动 bgsave进程进行 dump数据。 当发现状态为 REDIS_REPL_WAIT_BGSAVE_START的结点时,会准备将数据发送给 Slave,并将 Slave的状态修改为 REDIS_REPL_SEND_BULK。 Master在 Slave上监听 WRITEABLE事件,当 Slave可写时,调用 sendBulkToSlave()将数据发送到 Slave。 在 Master中, sendBulkToSlave()将数据从磁盘读入,通过 socket发送到 Slave,完成后删除 WRITEABLE事件,并将 Slave状态改为 REDIS_REPL_ONLINE。然后 再次监听 Slave的 WRITEABLE事件,当 Slave可写时,表明 Slave已经准备好开始同步指 令。 6.1.2. Slave Redis可以在配置文件中指定为 Slave模式,并指定 Master的地址,这样的 Slave结 点启动时即为 REDIS_REPL_ONLINE状态,然后在 replicationCron()中调用 syncWithMaster()与 Master同步。 syncWithMaster()中,向 Master发送 sync命令,并准备接收数据的目录。然后通 过 readSyncBulkPayload()处理 Master发送的 dump的数据。 Slave通过 syncRead() 和 syncWrite()与 Master传输数据,这两个方法底层是异步 IO的,但封装成阻塞型,因 此 Slave第一次同步是阻塞的。 replicationCron()会定时 Master的 dump数据是否发送完成,如果长时间没有收 到 dump数据的数据包, Slave会通过 replicationAbortSyncTransfer()取消数据同 步。 6.2. 指令同步 在 call()中, Master会将改变了数据的命令通过 replicationFeedSlaves()同 步到 Slaves中。 Slave以处理普通命令的流程处理这些 Master发来的命令。 特别地在 serverCron()中, Master会删除过期的数据, Slave则等待 Master同步 DEL指令将过期数据删除。 Redis源代码分析 · Arch_Platform 30 / 32 replicationCron()中, Master会定时向所有 Slave发送心跳指令,同时, Slave的 replicationCron()会通过 Ping指令检查 Slave与 Master的连接状态,如果 Slave长时间没有收到主机的 Ping,则会断开与主机的连接。 6.3. 主从转换 slaveof可以转换 Redis中结点的状态。对于 Slave结点, slaveof no one可以将 Slave与 Master断开,并将 Slave转为主机。对于 Master, slaveof ip port可以讲 主机转为 Slave。 Redis的主从转换非常简单,如果同步未完成,则取消同步;否则,直接改变 server.masterhost等状态。 Redis源代码分析 · Arch_Platform 31 / 32 参考文献 [1] Redis, http://redis.io [2] Binary-safe, http://en.wikipedia.org/wiki/Binary_safe [3] malloc_size(), Mac OS X Manual Page, http:// developer.apple.com/library/mac/#documentation/Darwin/Reference/ ManPages/man3/malloc_size.3.html [4] BeansDB, http://code.google.com/p/beansdb/ [5] Redis进阶教程 -aof(append only file)日志文件 , http://lgone.com/ html/y2010/757.html [6] Redis源代码分析 , http://www.petermao.com/category/redis Redis源代码分析 · Arch_Platform 32 / 32

下载文档,方便阅读与编辑

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 20 金币 [ 分享文档获得金币 ] 19 人已下载

下载文档

相关文档