mysql的thread_running数量分析
发布时间:2021-12-17 09:41:41 所属栏目:MySql教程 来源:互联网
导读:本篇内容主要讲解mysql的thread_running数量分析,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习mysql的thread_running数量分析吧! thread pool的原理: 已在server层完成解析; 层创建多组常驻线程,用于接收客
本篇内容主要讲解“mysql的thread_running数量分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“mysql的thread_running数量分析”吧! thread pool的原理: 已在server层完成解析; 层创建多组常驻线程,用于接收客户端连接发送的query并代为执行,而不是为每个连接单独创建一个线程。 层进行running thread数量判断,如果达到阈值则直接报错或sleep。 状态变量记录了当前并发执行stmt/command的数量,执行前加1执行后减1; 突然飙高的诱因: 客户端连接暴增; 系统性能瓶颈,如CPU,IO或者mem swap; 异常sql; 会表现出hang住的假象。 执行,为此引入两个阈值low_watermark和high_watermark,以及变量threads_running_ctl_mode(selects或者all ); 前,检查thread_running, 若其已达high_watermark阈值则直接拒绝执行并返回错误:mysql server is too busy 若其位于low和high之间,则sleep 5ms,然后继续尝试,累计等待100ms后则执行 对于已经开启事务和super用户,不做限制 控制query类型:SELECTS/ALL,默认为SELECTS,表示只影响SELECT语句 源码见注1 优化为基于FIFO的cond-wait/signal(实现8个FIFO); 高水位限流(这点保持不变); 低水位优化;其他解决方案:mariadb开发thread pool,percona在其上实现了优先队列; 优势:思路与thread pool一致,但代码更简洁(不到1000行);而且增加了特定query的过滤; 代码见注2 新增thread_active记录并发线程数,位于mysql_execute_command(sql解析之后),高水位则在query解析之前判断; 只统计select/DML,而commit/rollback则放过。 采用FIFO,当thread_active >= thread_running_low_watermark时进程进入FIFO等待,其他线程执行完sql后唤醒FIFO; 内,同时引入threads_running_wait_timeout控制线程在FIFO最大等待时间,超时则直接报错返回。 引入8个FIFO,降低了进出FIFO的锁竞争,线程采用RR分配到不同fifo,每个队列限制并发运行线程为threads_running_low_watermark/8。 ,开始执行query,[解析后进行低水位判断,若通过则执行],执行当前sql完毕后,thread可能发起新query,则重复[]过程。 :进入FIFO排队最长时间,等待超时后sql被拒,默认100,单位为毫秒ms。 当前并发SELECT/INSERT/UPDATE/DELETE执行的线程数目; :当前进入到FIFO中等待的线程数目; 未打补丁版本,设置innodb_thread_concurrency=0 未打补丁版本,innodb_thread_concurrency=32 低水位限流补丁版本(活跃线程数不超过64) 注1 http://www.gpfeng.com/wp-content/d/file/uploads/2013/09/threads_running_control.txt +static my_bool thread_running_control(THD *thd, ulong tr) +{ + int slept_cnt= 0; + ulong tr_low, tr_high; + DBUG_ENTER("thread_running_control"); + + /* + Super user/slave thread will not be affected at any time, + transactions that have already started will continue. + */ + if ( thd->security_ctx->master_access & SUPER_ACL|| --对于super权限的用户和已经开启的事务不做限制 + thd->in_active_multi_stmt_transaction() || + thd->slave_thread) + DBUG_RETURN(FALSE); + + /* + To promise that tr_low will never be greater than tr_high, + as values may be changed between these two statements. + eg. + (low, high) = (200, 500) + 1. read low = 200 + 2. other sessions: set low = 20; set high = 80 + 3. read high = 80 + Don't take a lock here to avoid lock contention. + */ + do + { + tr_low= thread_running_low_watermark; + tr_high= thread_running_high_watermark; + + } while (tr_low > tr_high); + +check_buzy: + + /* tr_high is promised to be non-zero.*/ + if ((tr_low == 0 && tr < tr_high) || (tr_low != 0 && tr < tr_low)) + DBUG_RETURN(FALSE); + + if (tr >= tr_high) + { + int can_reject= 1; + + /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */ + if (thread_running_ctl_mode == 0) + { + int query_is_select= 0; + if (thd->query_length() >= 8) + { + char *p= thd->query(); --读取query text的前6个字符,以判断是否为select + if (my_toupper(system_charset_info, p[0]) == 'S' && + my_toupper(system_charset_info, p[1]) == 'E' && + my_toupper(system_charset_info, p[2]) == 'L' && + my_toupper(system_charset_info, p[3]) == 'E' && + my_toupper(system_charset_info, p[4]) == 'C' && + my_toupper(system_charset_info, p[5]) == 'T') + + query_is_select= 1; + } + + if (!query_is_select) + can_reject= 0; + } + + if (can_reject) + { + inc_thread_rejected(); + DBUG_RETURN(TRUE); + } + else + DBUG_RETURN(FALSE); + } + + if (tr_low != 0 && tr >= tr_low) + { + /* + If total slept time exceed 100ms and thread running does not + reach high watermark, let it in. + */ + if (slept_cnt >= 20) + DBUG_RETURN(FALSE); + + dec_thread_running() + + /* wait for 5ms. */ + my_sleep(5000UL); + + slept_cnt++; + tr= inc_thread_running() - 1; + + goto check_buzy; + } + + DBUG_RETURN(FALSE); +} + +/** Perform one connection-level (COM_XXXX) command. @param command type of command to perform @@ -1016,7 +1126,8 @@ thd->set_query_id(get_query_id()); if (!(server_command_flags[command] & CF_SKIP_QUERY_ID)) next_query_id(); - inc_thread_running(); + /* remember old value of thread_running for *thread_running_control*. */ + int32 tr= inc_thread_running() - 1; if (!(server_command_flags[command] & CF_SKIP_QUESTIONS)) statistic_increment(thd->status_var.questions, &LOCK_status); @@ -1129,6 +1240,13 @@ { if (alloc_query(thd, packet, packet_length)) break; // fatal error is set + + if (thread_running_control(thd, (ulong)tr)) + { + my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0)); + break; + } + MYSQL_QUERY_START(thd->query(), thd->thread_id, (char *) (thd->db ? thd->db : ""), &thd->security_ctx->priv_user[0]) 注2 http://www.gpfeng.com/wp-content/d/file/uploads/2014/01/tr-control.diff_.txt +/** Perform one connection-level (COM_XXXX) command. @param command type of command to perform @@ -1177,7 +1401,7 @@ command= COM_SHUTDOWN; } thd->set_query_id(next_query_id()); - inc_thread_running(); + int32 tr= inc_thread_running(); if (!(server_command_flags[command] & CF_SKIP_QUESTIONS)) statistic_increment(thd->status_var.questions, &LOCK_status); @@ -1209,6 +1433,15 @@ goto done; } + if (command == COM_QUERY && alloc_query(thd, packet, packet_length)) + goto endof_case; // fatal error is set + + if (thread_running_control_high(thd, tr)) + { + my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0)); + goto endof_case; + } + switch (command) { case COM_INIT_DB: { @@ -1311,8 +1544,6 @@ } case COM_QUERY: { - if (alloc_query(thd, packet, packet_length)) - break; // fatal error is set MYSQL_QUERY_START(thd->query(), thd->thread_id, (char *) (thd->db ? thd->db : ""), &thd->security_ctx->priv_user[0], @@ -1751,6 +1982,7 @@ my_message(ER_UNKNOWN_COM_ERROR, ER(ER_UNKNOWN_COM_ERROR), MYF(0)); break; } +endof_case: done: DBUG_ASSERT(thd->derived_tables == NULL && @@ -2502,12 +2734,37 @@ Opt_trace_array trace_command_steps(&thd->opt_trace, "steps"); DBUG_ASSERT(thd->transaction.stmt.cannot_safely_rollback() == FALSE); + bool count_active= false; if (need_traffic_control(thd, lex->sql_command)) { thd->killed = THD::KILL_QUERY; goto error; } + + switch (lex->sql_command) { + + case SQLCOM_SELECT: + case SQLCOM_UPDATE: + case SQLCOM_UPDATE_MULTI: + case SQLCOM_DELETE: + case SQLCOM_DELETE_MULTI: + case SQLCOM_INSERT: + case SQLCOM_INSERT_SELECT: + case SQLCOM_REPLACE: + case SQLCOM_REPLACE_SELECT: + count_active= true; + break; + default: + break; + } + + if (count_active && thread_running_control_low_enter(thd)) + { + my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, myf(0)); + goto error; + } + status_var_increment(thd->status_var.com_stat[lex->sql_command]); switch (gtid_pre_statement_checks(thd)) @@ -4990,6 +5247,9 @@ finish: + if (count_active) + thread_running_control_low_exit(thd); + DBUG_ASSERT(!thd->in_active_multi_stmt_transaction() || thd->in_multi_stmt_transaction_mode()); +static my_bool thread_running_control_high(THD *thd, int32 tr) +{ + int32 tr_high; + DBUG_ENTER("thread_running_control_high"); + + tr_high= (int32)thread_running_high_watermark; + + /* thread_running_ctl_mode: 0 -> SELECTS, 1 -> ALL. */ + if ((!tr_high || tr <= tr_high) || + thd->transaction.is_active() || + thd->get_command() != COM_QUERY || + thd->security_ctx->master_access & SUPER_ACL || + thd->slave_thread) + DBUG_RETURN(FALSE); + + const char *query= thd->query(); + uint32 len= thd->query_length(); + + if ((!has_prefix(query, len, "SELECT", 6) && thread_running_ctl_mode == 0) || --不再是逐个字符判断 + has_prefix(query, len, "COMMIT", 6) || + has_prefix(query, len, "ROLLBACK", 8)) + DBUG_RETURN(FALSE); + + /* confirm again*/ + if (tr > tr_high && get_thread_running() > tr_high) + { + __sync_add_and_fetch(&thread_rejected, 1); + DBUG_RETURN(TRUE); + } + + DBUG_RETURN(FALSE); +} + +static my_bool thread_running_control_low_enter(THD *thd) +{ + int res= 0; + int32 tr_low; + my_bool ret= FALSE; + my_bool slept= FALSE; + struct timespec timeout; + Thread_conc_queue *queue; + DBUG_ENTER("thread_running_control_low_enter"); + + /* update global status */ + __sync_add_and_fetch(&thread_active, 1); + + tr_low= (int32)queue_tr_low_watermark; + queue= thread_conc_queues + thd->query_id % N_THREAD_CONC_QUEUE; + + queue->lock();--问1:在进行低水位判断前,先锁定FIFO,避免低水位验证失败时无法获取FIFO锁进而不能放入FIFO; + +retry: + + if ((!tr_low || queue->thread_active < tr_low) || + (thd->lex->sql_command != SQLCOM_SELECT && thread_running_ctl_mode == 0) || + (!slept && (thd->transaction.is_active() || + thd->security_ctx->master_access & SUPER_ACL || thd->slave_thread))) + { + queue->thread_active++; --判断是否满足进入FIFO条件,如不满足则立即更新thread_active++,解锁queue并退出; + queue->unlock(); + DBUG_RETURN(ret); + } + + if (!slept) + { + queue->unlock(); + + /* sleep for 500 us */ + my_sleep(500); + slept= TRUE; + queue->lock(); + + goto retry; + } + + /* get a free wait-slot */ + Thread_wait_slot *slot= queue->pop_free(); + + /* can't find a free wait slot, must let the query enter */ + if (!slot)-- 当FIFO都满了,即无法把当前线程放入,则必须放行让该sql正常执行 + { + queue->thread_active++; + queue->unlock(); + DBUG_RETURN(ret); + } + + slot->signaled= false; + slot->wait_ended= false; + + /* put slot into waiting queue. */ + queue->push_back_wait(slot); + queue->thread_wait++; + + queue->unlock(); + + /* update global status */ + thd_proc_info(thd, "waiting in server fifo"); + __sync_sub_and_fetch(&thread_active, 1); + __sync_add_and_fetch(&thread_wait, 1); + + /* cond-wait for at most thread_running_wait_timeout(ms). */ + set_timespec_nsec(timeout, thread_running_wait_timeout_ns); + + mysql_mutex_lock(&slot->mutex); + while (!slot->signaled) + { + res= mysql_cond_timedwait(&slot->cond, &slot->mutex, &timeout); + /* no need to signal if cond-wait timedout */ + slot->signaled= true; + } + mysql_mutex_unlock(&slot->mutex); + + queue->lock(); + queue->thread_wait--; + queue->thread_active++; + + /* remove slot from waiting queue. */ + queue->remove_wait(slot); + /* put slot into the free queue for reuse. */ + queue->push_back_free(slot); + + queue->unlock(); + + /* update global status */ + __sync_sub_and_fetch(&thread_wait, 1); + __sync_add_and_fetch(&thread_active, 1); + thd_proc_info(thd, 0); + + if (res == ETIMEDOUT || res == ETIME) + { + ret= TRUE; // indicate that query is rejected. + __sync_add_and_fetch(&thread_rejected, 1); + } + + DBUG_RETURN(ret); +} 到此,相信大家对“mysql的thread_running数量分析”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习! (编辑:92站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |