Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Include/internal/pycore_dict_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ extern "C" {

struct _Py_dict_state {
uint32_t next_keys_version;
PyMutex watcher_mutex; // Protects the watchers array (free-threaded builds)
_PyOnceFlag watcher_setup_once; // One-time optimizer watcher setup
PyDict_WatchCallback watchers[DICT_MAX_WATCHERS];
};

Expand Down
2 changes: 2 additions & 0 deletions Include/internal/pycore_pyatomic_ft_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ extern "C" {
#define FT_ATOMIC_ADD_SSIZE(value, new_value) \
(void)_Py_atomic_add_ssize(&value, new_value)
#define FT_MUTEX_LOCK(lock) PyMutex_Lock(lock)
#define FT_MUTEX_LOCK_FLAGS(lock, flags) PyMutex_LockFlags(lock, flags)
#define FT_MUTEX_UNLOCK(lock) PyMutex_Unlock(lock)

#else
Expand Down Expand Up @@ -201,6 +202,7 @@ extern "C" {
#define FT_ATOMIC_STORE_ULLONG_RELAXED(value, new_value) value = new_value
#define FT_ATOMIC_ADD_SSIZE(value, new_value) (void)(value += new_value)
#define FT_MUTEX_LOCK(lock) do {} while (0)
#define FT_MUTEX_LOCK_FLAGS(lock, flags) do {} while (0)
#define FT_MUTEX_UNLOCK(lock) do {} while (0)

#endif
Expand Down
89 changes: 89 additions & 0 deletions Lib/test/test_free_threading/test_dict_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import unittest

from test.support import import_helper, threading_helper

_testcapi = import_helper.import_module("_testcapi")

ITERS = 100
NTHREADS = 20


@threading_helper.requires_working_threading()
class TestDictWatcherThreadSafety(unittest.TestCase):
# Watcher kinds from _testcapi
EVENTS = 0 # appends dict events as strings to global event list

def test_concurrent_add_clear_watchers(self):
"""Race AddWatcher and ClearWatcher from multiple threads.

Uses more threads than available watcher slots (5 user slots out
of DICT_MAX_WATCHERS=8).
"""
results = []

def worker():
for _ in range(ITERS):
try:
wid = _testcapi.add_dict_watcher(self.EVENTS)
except RuntimeError:
continue # All slots taken
self.assertGreaterEqual(wid, 0)
results.append(wid)
_testcapi.clear_dict_watcher(wid)

threading_helper.run_concurrently(worker, NTHREADS)

# Verify at least some watchers were successfully added
self.assertGreater(len(results), 0)

def test_concurrent_watch_unwatch(self):
"""Race Watch and Unwatch on the same dict from multiple threads."""
wid = _testcapi.add_dict_watcher(self.EVENTS)
dicts = [{} for _ in range(10)]

def worker():
for _ in range(ITERS):
for d in dicts:
_testcapi.watch_dict(wid, d)
for d in dicts:
_testcapi.unwatch_dict(wid, d)

try:
threading_helper.run_concurrently(worker, NTHREADS)

# Verify watching still works after concurrent watch/unwatch
_testcapi.watch_dict(wid, dicts[0])
dicts[0]["key"] = "value"
events = _testcapi.get_dict_watcher_events()
self.assertIn("new:key:value", events)
finally:
_testcapi.clear_dict_watcher(wid)

def test_concurrent_modify_watched_dict(self):
"""Race dict mutations (triggering callbacks) with watch/unwatch."""
wid = _testcapi.add_dict_watcher(self.EVENTS)
d = {}
_testcapi.watch_dict(wid, d)

def mutator():
for i in range(ITERS):
d[f"key_{i}"] = i
d.pop(f"key_{i}", None)

def toggler():
for i in range(ITERS):
_testcapi.watch_dict(wid, d)
d[f"toggler_{i}"] = i
_testcapi.unwatch_dict(wid, d)

workers = [mutator, toggler] * (NTHREADS // 2)
try:
threading_helper.run_concurrently(workers)
events = _testcapi.get_dict_watcher_events()
self.assertGreater(len(events), 0)
finally:
_testcapi.clear_dict_watcher(wid)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Made :c:func:`PyDict_AddWatcher`, :c:func:`PyDict_ClearWatcher`,
:c:func:`PyDict_Watch`, and :c:func:`PyDict_Unwatch` thread-safe on the
:term:`free threaded <free threading>` build.
26 changes: 15 additions & 11 deletions Modules/_testcapi/watchers.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "pycore_function.h" // FUNC_MAX_WATCHERS
#include "pycore_interp_structs.h" // CODE_MAX_WATCHERS
#include "pycore_context.h" // CONTEXT_MAX_WATCHERS
#include "pycore_lock.h" // _PyOnceFlag

/*[clinic input]
module _testcapi
Expand All @@ -18,6 +19,14 @@ module _testcapi
// Test dict watching
static PyObject *g_dict_watch_events = NULL;
static int g_dict_watchers_installed = 0;
static _PyOnceFlag g_dict_watch_once = {0};

static int
_init_dict_watch_events(void *arg)
{
g_dict_watch_events = PyList_New(0);
return g_dict_watch_events ? 0 : -1;
}

static int
dict_watch_callback(PyDict_WatchEvent event,
Expand Down Expand Up @@ -106,13 +115,10 @@ add_dict_watcher(PyObject *self, PyObject *kind)
if (watcher_id < 0) {
return NULL;
}
if (!g_dict_watchers_installed) {
assert(!g_dict_watch_events);
if (!(g_dict_watch_events = PyList_New(0))) {
return NULL;
}
if (_PyOnceFlag_CallOnce(&g_dict_watch_once, _init_dict_watch_events, NULL) < 0) {
return NULL;
}
g_dict_watchers_installed++;
_Py_atomic_add_int(&g_dict_watchers_installed, 1);
return PyLong_FromLong(watcher_id);
}

Expand All @@ -122,10 +128,8 @@ clear_dict_watcher(PyObject *self, PyObject *watcher_id)
if (PyDict_ClearWatcher(PyLong_AsLong(watcher_id))) {
return NULL;
}
g_dict_watchers_installed--;
if (!g_dict_watchers_installed) {
assert(g_dict_watch_events);
Py_CLEAR(g_dict_watch_events);
if (_Py_atomic_add_int(&g_dict_watchers_installed, -1) == 1) {
PyList_Clear(g_dict_watch_events);
}
Py_RETURN_NONE;
}
Expand Down Expand Up @@ -164,7 +168,7 @@ _testcapi_unwatch_dict_impl(PyObject *module, int watcher_id, PyObject *dict)
static PyObject *
get_dict_watcher_events(PyObject *self, PyObject *Py_UNUSED(args))
{
if (!g_dict_watch_events) {
if (_Py_atomic_load_int(&g_dict_watchers_installed) <= 0) {
PyErr_SetString(PyExc_RuntimeError, "no watchers active");
return NULL;
}
Expand Down
42 changes: 31 additions & 11 deletions Objects/dictobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -8015,13 +8015,19 @@ validate_watcher_id(PyInterpreterState *interp, int watcher_id)
PyErr_Format(PyExc_ValueError, "Invalid dict watcher ID %d", watcher_id);
return -1;
}
if (!interp->dict_state.watchers[watcher_id]) {
PyDict_WatchCallback cb = FT_ATOMIC_LOAD_PTR_RELAXED(
interp->dict_state.watchers[watcher_id]);
Comment thread
yoney marked this conversation as resolved.
if (cb == NULL) {
PyErr_Format(PyExc_ValueError, "No dict watcher set for ID %d", watcher_id);
return -1;
}
return 0;
}

// In free-threaded builds, Add/Clear serialize on watcher_mutex and publish
// callbacks with release stores. SendEvent reads them lock-free using
// acquire loads.

int
PyDict_Watch(int watcher_id, PyObject* dict)
{
Expand All @@ -8033,7 +8039,8 @@ PyDict_Watch(int watcher_id, PyObject* dict)
if (validate_watcher_id(interp, watcher_id)) {
return -1;
}
FT_ATOMIC_OR_UINT64(((PyDictObject*)dict)->_ma_watcher_tag, (1LL << watcher_id));
FT_ATOMIC_OR_UINT64(((PyDictObject*)dict)->_ma_watcher_tag,
1ULL << watcher_id);
return 0;
}

Expand All @@ -8048,36 +8055,48 @@ PyDict_Unwatch(int watcher_id, PyObject* dict)
if (validate_watcher_id(interp, watcher_id)) {
return -1;
}
FT_ATOMIC_AND_UINT64(((PyDictObject*)dict)->_ma_watcher_tag, ~(1LL << watcher_id));
FT_ATOMIC_AND_UINT64(((PyDictObject*)dict)->_ma_watcher_tag,
~(1ULL << watcher_id));
return 0;
}

int
PyDict_AddWatcher(PyDict_WatchCallback callback)
{
int watcher_id = -1;
PyInterpreterState *interp = _PyInterpreterState_GET();

FT_MUTEX_LOCK_FLAGS(&interp->dict_state.watcher_mutex,
_Py_LOCK_DONT_DETACH);
/* Some watchers are reserved for CPython, start at the first available one */
for (int i = FIRST_AVAILABLE_WATCHER; i < DICT_MAX_WATCHERS; i++) {
if (!interp->dict_state.watchers[i]) {
interp->dict_state.watchers[i] = callback;
return i;
FT_ATOMIC_STORE_PTR_RELEASE(interp->dict_state.watchers[i], callback);
watcher_id = i;
goto done;
}
}

PyErr_SetString(PyExc_RuntimeError, "no more dict watcher IDs available");
return -1;
done:
FT_MUTEX_UNLOCK(&interp->dict_state.watcher_mutex);
return watcher_id;
}

int
PyDict_ClearWatcher(int watcher_id)
{
int res = 0;
PyInterpreterState *interp = _PyInterpreterState_GET();
FT_MUTEX_LOCK_FLAGS(&interp->dict_state.watcher_mutex,
_Py_LOCK_DONT_DETACH);
if (validate_watcher_id(interp, watcher_id)) {
return -1;
res = -1;
goto done;
}
interp->dict_state.watchers[watcher_id] = NULL;
return 0;
FT_ATOMIC_STORE_PTR_RELEASE(interp->dict_state.watchers[watcher_id], NULL);
done:
FT_MUTEX_UNLOCK(&interp->dict_state.watcher_mutex);
return res;
}

static const char *
Expand All @@ -8102,7 +8121,8 @@ _PyDict_SendEvent(int watcher_bits,
PyInterpreterState *interp = _PyInterpreterState_GET();
for (int i = 0; i < DICT_MAX_WATCHERS; i++) {
if (watcher_bits & 1) {
PyDict_WatchCallback cb = interp->dict_state.watchers[i];
PyDict_WatchCallback cb = FT_ATOMIC_LOAD_PTR_ACQUIRE(
interp->dict_state.watchers[i]);
if (cb && (cb(event, (PyObject*)mp, key, value) < 0)) {
// We don't want to resurrect the dict by potentially having an
// unraisablehook keep a reference to it, so we don't pass the
Expand Down
20 changes: 15 additions & 5 deletions Python/optimizer_analysis.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "pycore_opcode_metadata.h"
#include "pycore_opcode_utils.h"
#include "pycore_pystate.h" // _PyInterpreterState_GET()
#include "pycore_pyatomic_ft_wrappers.h" // FT_ATOMIC_*
#include "pycore_tstate.h" // _PyThreadStateImpl
#include "pycore_uop_metadata.h"
#include "pycore_long.h"
Expand Down Expand Up @@ -127,7 +128,7 @@ static void
increment_mutations(PyObject* dict) {
assert(PyDict_CheckExact(dict));
PyDictObject *d = (PyDictObject *)dict;
FT_ATOMIC_ADD_UINT64(d->_ma_watcher_tag, (1 << DICT_MAX_WATCHERS));
FT_ATOMIC_ADD_UINT64(d->_ma_watcher_tag, 1ULL << DICT_MAX_WATCHERS);
}

/* The first two dict watcher IDs are reserved for CPython,
Expand Down Expand Up @@ -156,6 +157,17 @@ type_watcher_callback(PyTypeObject* type)
return 0;
}

static int
_setup_optimizer_watchers(void *Py_UNUSED(arg))
{
PyInterpreterState *interp = _PyInterpreterState_GET();
FT_ATOMIC_STORE_PTR_RELEASE(
interp->dict_state.watchers[GLOBALS_WATCHER_ID],
globals_watcher_callback);
interp->type_watchers[TYPE_WATCHER_ID] = type_watcher_callback;
return 0;
}

static void
watch_type(PyTypeObject *type, _PyBloomFilter *filter)
{
Expand Down Expand Up @@ -580,10 +592,8 @@ optimize_uops(

// Make sure that watchers are set up
PyInterpreterState *interp = _PyInterpreterState_GET();
if (interp->dict_state.watchers[GLOBALS_WATCHER_ID] == NULL) {
interp->dict_state.watchers[GLOBALS_WATCHER_ID] = globals_watcher_callback;
interp->type_watchers[TYPE_WATCHER_ID] = type_watcher_callback;
}
_PyOnceFlag_CallOnce(&interp->dict_state.watcher_setup_once,
_setup_optimizer_watchers, NULL);

_Py_uop_abstractcontext_init(ctx, dependencies);
_Py_UOpsAbstractFrame *frame = _Py_uop_frame_new(ctx, (PyCodeObject *)func->func_code, NULL, 0);
Expand Down
1 change: 1 addition & 0 deletions Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ _Py_COMP_DIAG_POP
&(runtime)->allocators.mutex, \
&(runtime)->_main_interpreter.types.mutex, \
&(runtime)->_main_interpreter.code_state.mutex, \
&(runtime)->_main_interpreter.dict_state.watcher_mutex, \
}

static void
Expand Down
1 change: 1 addition & 0 deletions Tools/c-analyzer/cpython/ignored.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ Modules/_testcapi/object.c - MyObject_dealloc_called -
Modules/_testcapi/object.c - MyType -
Modules/_testcapi/structmember.c - test_structmembersType_OldAPI -
Modules/_testcapi/watchers.c - g_dict_watch_events -
Modules/_testcapi/watchers.c - g_dict_watch_once -
Modules/_testcapi/watchers.c - g_dict_watchers_installed -
Modules/_testcapi/watchers.c - g_type_modified_events -
Modules/_testcapi/watchers.c - g_type_watchers_installed -
Expand Down
Loading