原创 竹径 2025-06-16 18:30 上海
通过开启多线程 IO,并设置合适的 CPU 数量,可提升访问请求至一倍以上。Redis6.0多线程 IO 仍存在一些不足,未充分利用 CPU 核心,在最新的 Valkey8.0 版本中,引入异步 IO 将进一步大幅提升 Valkey 性能。
💡 Redis6.0引入多线程IO,主要用于处理网络数据的读写和协议解析,而命令执行仍保持单线程,默认不开启。通过设置io-threads参数大于1来启用,但不能超过128,且在运行时不可动态修改。
💡 Redis6.0多线程IO的执行流程包括:主线程接收连接,分配socket给IO线程;IO线程读取并解析请求数据;主线程执行命令;主线程将回写事件分配给IO线程;IO线程回写数据。主线程和IO线程分工协作,提高效率。
💡 主线程在beforeSleep函数中,将待读数据客户端分发给IO线程,并等待IO线程完成读操作。IO线程负责从socket读取数据、解析命令,主线程则负责执行命令。这种模式实现了读写分离,提高了并发处理能力。
原创 竹径 2025-06-16 18:30 上海
通过开启多线程 IO,并设置合适的 CPU 数量,可提升访问请求至一倍以上。Redis6.0多线程 IO 仍存在一些不足,未充分利用 CPU 核心,在最新的 Valkey8.0 版本中,引入异步 IO 将进一步大幅提升 Valkey 性能。
io-threads 表示IO线程数量, io-threads 设置为1时(代码中默认值),表示只使用主线程,不开启多线程IO。因此,若要配置开启多线程IO,需要设置 io-threads 大于1,但不可以超过最大值128。但在默认情况下,Redis只将多线程IO用于向客户端写数据,因为作者认为通常使用多线程执行读数据的操作帮助不是很大。如果需要使用多线程用于读数据和解析数据,则需要将参数 io-threads-do-reads 设置为 yes 。此两项配置参数在Redis运行期间无法通过 config set 命令修改,并且开启SSL时,不支持多线程IO特性。若机器CPU将至少超过4核时,则建议开启,并且至少保留一个备用CPU核,使用超过8个线程可能并不会有多少帮助。# io-threads 4 IO 线程数量
# io-threads-do-reads no 读数据及数据解析是否也用 IO 线程
如果 io_threads_num 的数量为1,则只运行主线程, io_threads_num 的IO线程数量不允许超过 128。序号为0的线程是主线程,因此实际的工作线程数目是io-threads - 1。初始化流程为包括主线程在内的每个线程分配list列表,用于后续保存待处理的客户端。为主线程以外的其他IO线程初始化互斥对象mutex,但是立即调用pthread_mutex_lock占有互斥量,将io_threads_pending[i]设置为0,接着创建对应的IO工作线程。占用互斥量是为了创建IO工作线程后,可暂时等待后续启动IO线程的工作,因为IOThreadMain函数在io_threads_pending[id] == 0时也调用了获取mutex,所以此时无法继续向下运行,等待启动。在startThreadedIO函数中会释放mutex来启动IO线程工作。何时调用startThreadedIO打开多线程IO,具体见下文的「多线程IO动态暂停与开启」。IO 线程主函数IO线程主函数代码如下所示:/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
从IO线程主函数逻辑可以看到:如果IO线程等待处理任务数量为0,则IO线程一直在空循环,因此后面主线程给IO线程分发任务后,需要设置IO线程待处理任务数 io_threads_pending[id] ,才会触发IO线程工作。如果IO线程等待处理任务数量为0,并且未获取到mutex锁,则会等待获取锁,暂停运行,由于主线程在创建IO线程之前先获取了锁,因此IO线程刚启动时是暂停运行状态,需要等待主线程释放锁,启动IO线程。IO线程待处理任务数为0时,获取到锁并再次释放锁,是为了让主线程可以暂停IO线程。只有io_threads_pending[id]不为0时,则继续向下执行操作,根据io_threads_op决定是读客户端还是写客户端,从这里也可以看出IO线程要么同时读,要么同时写。void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
如果开启多线程,并且开启多线程读(io_threads_do_reads 为 yes),则将客户端标记为CLIENT_PENDING_READ,并且加入clients_pending_read列表。然后readQueryFromClient函数中就立即返回,主线程没有执行从客户端连接中读取的数据相关逻辑,读取了客户端数据行为等待后续各个IO线程执行。void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;
......以下省略
}
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
先检查是否开启多线程,以及是否开启多线程读数据(io_threads_do_reads),未开启直接返回。检查队列clients_pending_read长度,为0直接返回,说明没有待读事件。遍历clients_pending_read队列,通过RR算法,将队列中的客户端循环分配给各个IO线程,包括主线程本身。设置io_threads_op = IO_THREADS_OP_READ,并且将io_threads_pending数组中各个位置值设置为对应各个IO线程分配到的客户端数量,如上面介绍,目的是为了使IO线程工作。主线程开始读取客户端数据,因为主线程也分配了任务。主线程阻塞等待,直到所有的IO线程都完成读数据工作。主线程执行命令。IO 线程读数据在IO线程主函数中,如果 io_threads_op == IO_THREADS_OP_READ ,则调用readQueryFromClient从网络中读取数据。IO 线程读取数据后,不会执行命令。在readQueryFromClient函数中,最后会执行processInputBuffer函数,在processInputBuffe函数中,如IO线程检查到客户端设置了CLIENT_PENDING_READ标志,则不执行命令,直接返回。/* When threaded I/O is also enabled for the reading + parsing side, the
* readable handler will just put normal clients into a queue of clients to
* process (instead of serving them synchronously). This function runs
* the queue using the I/O threads, and process them in order to accumulate
* the reads in the buffers, and also parse the first command available
* rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
if (!io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
/* Run the list of clients again to process the new buffers. */
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
}
processInputBuffer(c);
}
return processed;
}
......省略
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
...... 省略
在beforeSleep函数中调用handleClientsWithPendingWritesUsingThreads。void clientInstallWriteHandler(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the slave can actually receive
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket. This way before re-entering the event
* loop, we can try to directly write to the client sockets avoiding
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
}
判断clients_pending_write队列的长度,如果为0则直接返回。判断是否开启了多线程,若只有很少的客户端需要写,则不使用多线程IO,直接在主线程完成写操作。如果使用多线程IO来完成写数据,则需要判断是否先开启多线程IO(因为会动态开启与暂停)。遍历clients_pending_write队列,通过RR算法,循环将所有客户端分配给各个IO线程,包括主线程自身。设置io_threads_op = IO_THREADS_OP_WRITE,并且将io_threads_pending数组中各个位置值设置为对应的各个IO线程分配到的客户端数量,目的是为了使IO线程工作。主线程开始写客户端数据,因为主线程也分配了任务,写完清空任务队列。阻塞等待,直到所有IO线程完成写数据工作。再次遍历所有客户端,如果有需要,为客户端在事件循环上安装写句柄函数,等待事件回调。int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
在写数据流程handleClientsWithPendingWritesUsingThreads函数中,stopThreadedIOIfNeeded返回0的话,就会执行下面的startThreadedIO函数,开启多线程IO。int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
/* Return ASAP if IO threads are disabled (single threaded mode). */
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
从上面的代码中可以看出:开启多线程IO是通过释放mutex锁来让IO线程开始执行读数据或者写数据动作。暂停多线程IO则是通过加锁来让IO线程暂时不执行读数据或者写数据动作,此处加锁后,IO线程主函数由于无法获取到锁,因此会暂时阻塞。void startThreadedIO(void) {
serverAssert(server.io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
server.io_threads_active = 1;
}
void stopThreadedIO(void) {
/* We may have still clients with pending reads when this function
* is called: handle them before stopping the threads. */
handleClientsWithPendingReadsUsingThreads();
serverAssert(server.io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
server.io_threads_active = 0;
}
redis-benchmark -h 172.xx.xx.xx -t set,get -n 1000000 -r 100000000 --threads ${threadsize} -d ${datasize} -c ${clientsize}
单线程 threadsize 为 1,多线程 threadsize 为 4
datasize为value 大小,分别设置为 128/512/1024
clientsize 为客户端数量,分别设置为 256/2000
如:./redis-benchmark -h 172.xx.xx.xx -t set,get -n 1000000 -r 100000000 --threads 4 -d 1024 -c 256
AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。
鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑