Skip to content

MDEV-39492 Parallel Query: Study how to create worker threads#5099

Open
mariadb-RexJohnston wants to merge 1 commit into
mainfrom
13.0-MDEV-39492-2
Open

MDEV-39492 Parallel Query: Study how to create worker threads#5099
mariadb-RexJohnston wants to merge 1 commit into
mainfrom
13.0-MDEV-39492-2

Conversation

@mariadb-RexJohnston
Copy link
Copy Markdown
Member

Introduces parallel_worker_threads variable to control the number of worker threads created by a parallel execution query.

2 new files, sql_parallel_workers.h sql_parallel_workers.cc which contain structures for the creation, management and deletion of parallel worker threads (pwt_ in the name). Main management class created in the stack in JOIN::exec, implemented for the top level select.

Current parallel_worker_thread_func sleeps for 10 seconds, generates a warning, signals the main thread, sleeps 10 seconds, signals the main thread again, sets it's finished flag and cleans it's THD.

The main thread loops through worker threads, looking for finished thread and cleans them up if they have finished.
It then waits for a signal, then processes it's message queue.

The thread management data is allocated on the stack in JOIN::exec. Everything else is allocated using my_malloc() and my_free().

Threads are registed in server_threads, so are visible in information_schema.processlist and the show processlist command.

We check that a kill query on a parallel worker is passed onto it's manager and the query is properly aborted, and that a kill connection is handled properly in parallel_worker.test.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a framework for parallel query execution, introducing worker thread management, background thread creation, and an error propagation system. It adds Performance Schema instrumentation and a new system variable, parallel_worker_threads, to control the number of workers. The code review identified several critical issues, including a race condition in worker status checks, missing synchronization when accessing the global thread list, and potential double-free and memory leak scenarios during error handling. Suggestions were also made to use monotonic time for timeouts and to adopt more specific naming for global macros.

Comment thread sql/sql_parallel_workers.cc
Comment thread sql/sql_parallel_workers.cc Outdated
Comment thread sql/sql_parallel_workers.cc Outdated
Comment thread sql/sql_parallel_workers.h
Comment thread include/my_pthread.h
Comment on lines +627 to +637
#ifdef _WIN32
#define MAX_THREAD_NAME 256
#elif defined(__linux__)
#define MAX_THREAD_NAME 16
#elif defined(__FreeBSD__) || defined(__OpenBSD__)
#define MAX_THREAD_NAME 19
#include <pthread_np.h>
#elif defined(__apple_build_version__)
#include <sys/proc_info.h>
#define MAX_THREAD_NAME MAXTHREADNAMESIZE
#endif
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The macro name MAX_THREAD_NAME is quite generic and could potentially conflict with definitions in system headers or other libraries. It is recommended to use a more specific prefix, such as MY_MAX_THREAD_NAME or MARIADB_MAX_THREAD_NAME.

Comment thread sql/sql_parallel_workers.cc Outdated
Comment thread sql/sql_parallel_workers.cc
@mariadb-RexJohnston
Copy link
Copy Markdown
Member Author

/gemini review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a framework for Parallel Worker Threads (PWT), adding the parallel_worker_threads system variable and management logic for worker threads. Feedback focuses on replacing placeholder instrumentation macros with specific Performance Schema keys for memory and synchronization. The reviewer also suggested providing more descriptive file headers, using server-consistent time functions, and correcting style violations regarding spacing in function signatures and class declarations.

Comment thread sql/sql_parallel_workers.cc Outdated
Comment thread sql/sql_parallel_workers.cc Outdated
Comment thread sql/sql_parallel_workers.cc Outdated
Comment thread sql/sql_parallel_workers.cc Outdated
START: in lieu of work, wait 10 seconds, push out an error or a warning,
wait another 10 seconds then exit
*/
abs_timeout.tv_sec= time(0)+10;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In the MariaDB server core, it is preferred to use my_time() instead of time(0) for consistency and to ensure the use of the server's internal time representation. Additionally, the 10-second sleep is noted as placeholder logic for this study.

  abs_timeout.tv_sec= my_time() + 10;

Comment thread sql/sql_parallel_workers.cc Outdated
Comment thread sql/sql_parallel_workers.h Outdated
Comment thread sql/sql_parallel_workers.h Outdated
@mariadb-RexJohnston
Copy link
Copy Markdown
Member Author

/gemini review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a framework for parallel worker threads (PWT), including the parallel_worker_threads system variable and logic for thread management, error handling, and synchronization. The review identifies several critical issues regarding thread safety, specifically the need to hold LOCK_thread_count when modifying the global server_threads list. Additionally, feedback highlights potential memory corruption from bypassing constructors when using my_malloc for objects inheriting from ilink, and suggests using more idiomatic practices such as THD::awake() for thread signaling and ensuring proper initialization of worker structures.

Comment on lines +93 to +95
*event= (pwt_queued_event*) my_malloc(key_memory_pwt_queued_event,
sizeof(pwt_queued_event),
MYF(0));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The pwt_queued_event class inherits from ilink, which has a constructor that initializes its internal pointers. Using my_malloc bypasses this constructor, leaving the ilink part of the object uninitialized. This can lead to memory corruption or crashes when the object is inserted into an I_List. It is safer to use new (std::nothrow) or call the constructor via placement new.

  *event= new (std::nothrow) pwt_queued_event();

on a copy of our THD pointer
*/
thd_detach_thd(save);
server_threads.erase(thd);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accessing the global server_threads list requires holding the LOCK_thread_count mutex to prevent race conditions with other threads (e.g., SHOW PROCESSLIST or other thread creations/destructions).

  mysql_mutex_lock(&LOCK_thread_count);
  server_threads.erase(thd);
  mysql_mutex_unlock(&LOCK_thread_count);

workers[i].thd->pwt_worker_info= workers+i;
workers[i].finished= workers[i].joined= false;
workers[i].killed= NOT_KILLED;
server_threads.insert(workers[i].thd); // show processlist
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accessing the global server_threads list requires holding the LOCK_thread_count mutex to ensure thread safety.

      mysql_mutex_lock(&LOCK_thread_count);
      server_threads.insert(workers[i].thd);               // show processlist
      mysql_mutex_unlock(&LOCK_thread_count);

if (mysql_thread_create(key_thread_pwt, &workers[i].pthread, nullptr,
parallel_worker_thread_func, &workers[i]))
{
server_threads.erase(workers[i].thd);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accessing the global server_threads list requires holding the LOCK_thread_count mutex.

        mysql_mutex_lock(&LOCK_thread_count);
        server_threads.erase(workers[i].thd);
        mysql_mutex_unlock(&LOCK_thread_count);


mysql_mutex_lock(&LOCK_pwt_thread);
pwt_queued_event *event;
while ((event= parallel_messages.get()))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

I_List::get() returns a pointer to the base class ilink. You should cast it to pwt_queued_event* to avoid compilation warnings or errors depending on the compiler settings.

  while ((event= (pwt_queued_event*) parallel_messages.get()))

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I_List<pwt_queued_event> parallel_messages;

template
class I_List :private base_ilist
...
inline T* get() ...

so no.

Comment thread sql/sql_parallel_workers.cc Outdated
Comment on lines +302 to +304
workers= (pwt_worker *) my_malloc(key_memory_pwt_workers,
n * sizeof(pwt_worker),
MYF(0));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The pwt_worker structure contains pointers and other members (like parallel_scan_job) that are not explicitly initialized. Using my_malloc without MY_ZEROFILL leaves these members with garbage values, which could cause issues if they are accessed before being set. Consider using MY_ZEROFILL to ensure a clean state.

    workers= (pwt_worker *) my_malloc(key_memory_pwt_workers,
                                      n * sizeof(pwt_worker),
                                      MYF(MY_WME | MY_ZEROFILL));

Comment on lines +380 to +382
workers[i].thd->query_string= CSET_STRING(workers[i].info,
strlen(workers[i].info),
workers[i].thd->query_charset());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

THD::query_string is a LEX_CSTRING, which consists of only two fields: str and length. The CSET_STRING macro is typically used for structures that include metadata (like charset) and will likely cause a compilation error here. You should assign the string and length directly.

      workers[i].thd->query_string.str= workers[i].info;
      workers[i].thd->query_string.length= strlen(workers[i].info);

Comment on lines +498 to +500
mysql_mutex_lock(&workers[i].thd->LOCK_thd_kill);
workers[i].thd->killed= thd->killed;
mysql_mutex_unlock(&workers[i].thd->LOCK_thd_kill);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of manually locking LOCK_thd_kill and setting the killed state, it is more idiomatic and robust to use the THD::awake() method, which handles the internal state and signaling correctly.

          workers[i].thd->awake(thd->killed);

// process queue
mysql_mutex_lock(&LOCK_pwt_thread);
pwt_queued_event *event;
while ((event= parallel_messages.get()))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

I_List::get() returns ilink*. A cast to pwt_queued_event* is required.

      while ((event= (pwt_queued_event*) parallel_messages.get()))

Introduces parallel_worker_threads variable to control the number
of worker threads created by a parallel execution query.

2 new files, sql_parallel_workers.h sql_parallel_workers.cc which
contain structures for the creation, management and deletion of
parallel worker threads (pwt_ in the name).  Main management
class created in the stack in JOIN::exec, implemented for the
top level select.

Current parallel_worker_thread_func sleeps for 10 seconds, generates
a warning, signals the main thread, sleeps 10 seconds, signals the
main thread again, sets it's finished flag and cleans it's THD.

The main thread loops through worker threads, looking for finished
thread and cleans them up if they have finished.
It then waits for a signal, then processes it's message queue.

The thread management data is allocated on the stack in JOIN::exec.
Everything else is allocated using my_malloc() and my_free().

Threads are registed in server_threads, so are visible in
information_schema.processlist and the show processlist command.

We check that a kill query on a parallel worker is passed onto it's
manager and the query is properly aborted, and that a kill connection
is handled properly in parallel_worker.test.
@spetrunia
Copy link
Copy Markdown
Member

There is a compile failure:
https://buildbot.mariadb.org/#/builders/906/builds/9506/steps/8/logs/stdio .
Others I looked at were irrelevant test failures.

Register our new threads in server_threads.

Called from the management thread for applicable queries at the top level.
*/
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the convention and return true when there was an error, return false when everything went ok.

workers[i].thd->pwt_worker_info= workers+i;
workers[i].finished= workers[i].joined= false;
workers[i].killed= NOT_KILLED;
server_threads.insert(workers[i].thd); // show processlist
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've called create_background_thd and the comment there says:

/**
  Create a THD that only has auxiliary functions
  It will never be added to the global connection list
  server_threads. It does not represent any client connection.

Adjust that comment?

wait_max.tv_sec= time(0)+1; // wait 1s
mysql_mutex_lock(&LOCK_pwt_manager);
thd->ENTER_COND(&COND_pwt_new_message, &LOCK_pwt_manager,
&stage_waiting_for_work_from_sql_thread, &old_stage);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Replication system's stage. This is confusing, let's add a new stage.

Comment thread sql/sql_select.cc
ANALYZE_START_TRACKING(thd, &explain->time_tracker);
res= exec_inner();
ANALYZE_STOP_TRACKING(thd, &explain->time_tracker);

if (pwt.workers)
pwt.join_parallel_workers(thd);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out this is executed after the query result is sent to the client.
One can see that the query has returned the result (and so there is no way to return anything any new messages from the workers (e.g. errors) to the client anymore).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will provide a patch for this.

@spetrunia
Copy link
Copy Markdown
Member

Please check and take this into your commit:

commit c2e2deea25f2774694fa3e2ff07f1bd9305b47cc (HEAD -> 13.0-MDEV-39492-2-review-input, origin/13.0-MDEV-39492-2-review-input)
Author: Sergei Petrunia <[email protected]>
Date:   Thu May 21 14:14:34 2026 +0300

    MDEV-39492 Parallel Query: Review input 1: cleanup earlier
    
    Do cleanup before we've finished sending the result to the client.
    This way, one can see the errors (and eventually warnings) marshalled
    back to the main thread and returned to the user:
    
    MariaDB [test]> set parallel_worker_threads=10;
    Query OK, 0 rows affected (0.001 sec)
    
    MariaDB [test]> select seq from seq_1_to_10;
    ERROR 4103 (HY000): Argument to the worker_busted_function() function does not belong to the range [0,1]

@spetrunia
Copy link
Copy Markdown
Member

Looking at I_S.PROCESSLIST, I can see:

MariaDB [test]> select * from information_schema.processlist;
+----+------+-----------+------+---------+------+----------------------------------+----------------------------------------------+-----------+-------+-----------+----------+-------------+-----------------+---------------+-----------+----------+----------------------------------------------+--------+----------------+
| ID | USER | HOST      | DB   | COMMAND | TIME | STATE                            | INFO                                         | TIME_MS   | STAGE | MAX_STAGE | PROGRESS | MEMORY_USED | MAX_MEMORY_USED | EXAMINED_ROWS | SENT_ROWS | QUERY_ID | INFO_BINARY                                  | TID    | TMP_SPACE_USED |
+----+------+-----------+------+---------+------+----------------------------------+----------------------------------------------+-----------+-------+-----------+----------+-------------+-----------------+---------------+-----------+----------+----------------------------------------------+--------+----------------+
| 15 | root | localhost | test | Query   |   15 | Sending data                     | parallel worker 5                            | 15944.933 |     0 |         0 |    0.000 |       32328 |           32328 |             0 |         0 |        0 | parallel worker 5                            | 458277 |              0 |
| 14 | root | localhost | test | Query   |   15 | Sending data                     | parallel worker 5                            | 15944.933 |     0 |         0 |    0.000 |       32328 |           32328 |             0 |         0 |        0 | parallel worker 5                            | 458276 |              0 |
| 13 | root | localhost | test | Query   |   15 | Sending data                     | parallel worker 5                            | 15944.933 |     0 |         0 |    0.000 |       32328 |           32328 |             0 |         0 |        0 | parallel worker 5                            | 458275 |              0 |
| 12 | root | localhost | test | Query   |   15 | Sending data                     | parallel worker 5                            | 15944.933 |     0 |         0 |    0.000 |       32328 |           32328 |             0 |         0 |        0 | parallel worker 5                            | 458274 |              0 |
| 11 | root | localhost | test | Query   |   15 | Sending data                     | parallel worker 5                            | 15944.933 |     0 |         0 |    0.000 |       32328 |           32328 |             0 |         0 |        0 | parallel worker 5                            | 458273 |              0 |
| 10 | root | localhost | test | Query   |   15 | Sending data                     | parallel worker 5                            | 15944.933 |     0 |         0 |    0.000 |       32328 |           32328 |             0 |         0 |        0 | parallel worker 5                            | 458272 |              0 |
|  9 | root | localhost | test | Query   |   15 | Sending data                     | parallel worker 5                            | 15944.933 |     0 |         0 |    0.000 |       32328 |           32328 |             0 |         0 |        0 | parallel worker 5                            | 458271 |              0 |
|  8 | root | localhost | test | Query   |   15 | Sending data                     | parallel worker 5                            | 15944.933 |     0 |         0 |    0.000 |       32328 |           32328 |             0 |         0 |        0 | parallel worker 5                            | 458270 |              0 |
|  7 | root | localhost | test | Query   |   15 | Sending data                     | parallel worker 5                            | 15944.933 |     0 |         0 |    0.000 |       32328 |           32328 |             0 |         0 |        0 | parallel worker 5                            | 458269 |              0 |
|  6 | root | localhost | test | Query   |   15 | Sending data                     | parallel worker 5                            | 15944.933 |     0 |         0 |    0.000 |       32328 |           32328 |             0 |         0 |        0 | parallel worker 5                            | 458268 |              0 |
|  5 | root | localhost | test | Query   |   15 | Waiting for work from SQL thread | select seq from seq_1_to_10                  | 15944.933 |     0 |         0 |    0.000 |      103264 |          103264 |            10 |        10 |        5 | select seq from seq_1_to_10                  | 458263 |              0 |

It's nice that one can see workers here.
All workers are "parallel worker 5" which is confusing. Please make them Thread X: parallel worker Y .

@spetrunia
Copy link
Copy Markdown
Member

  // signal manager there is something in the queue,
  mysql_cond_signal(&worker->manager->COND_pwt_new_message);

This looks like signaling while not owning the mutex that is used to wait on the condition?

At the first glance, looks wrong. Gemini tells this when I ask it "What is the problem of using mysql_cond_signal without locking associated mutex?" What is the problem of using mysql_cond_signal wi....pdf

Please clarify.

@mariadb-RexJohnston
Copy link
Copy Markdown
Member Author

This looks like signaling while not owning the mutex that is used to wait on the condition?

I was trying to avoid using mutex's where not strictly required. The state transitions in the manager mean that if we loose the signal, we pick it up on the next iteration. Here is Claude's explanation.

There is no predicate-loop, and the wait is unconditional. So the "lost wakeup" race looks like:

  1. Worker pushes event under LOCK_pwt_thread, releases it (line 142).
  2. Manager has just finished a pass, re-locks LOCK_pwt_manager, calls ENTER_COND, but has not yet
    entered cond_timedwait.
  3. Worker calls mysql_cond_signal at line 199 → signal is lost (no waiter parked yet).
  4. Manager finally parks in cond_timedwait and only wakes via the 1-second timeout.

Correctness is preserved by the 1 s timeout — but you eat up to a second of latency per missed
signal. Line 228 has the same shape, and it can fire on the last worker finishing, so the whole
query can stall up to a second at shutdown.

The state changes themselves are fine — parallel_messages is pushed under LOCK_pwt_thread, finished
is set under LOCK_worker, and the manager rechecks both under the right mutexes after wakeup. The
only thing the mutex around the signal buys you is not losing the wakeup.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

3 participants