Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Include/internal/pycore_dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ _PyDict_NotifyEvent(PyDict_WatchEvent event,
PyObject *value)
{
assert(Py_REFCNT((PyObject*)mp) > 0);
int watcher_bits = mp->_ma_watcher_tag & DICT_WATCHER_MASK;
uint64_t tag = FT_ATOMIC_LOAD_UINT64_RELAXED(mp->_ma_watcher_tag);
int watcher_bits = tag & DICT_WATCHER_MASK;
if (watcher_bits) {
RARE_EVENT_STAT_INC(watched_dict_modification);
_PyDict_SendEvent(watcher_bits, event, mp, key, value);
Expand Down Expand Up @@ -364,7 +365,8 @@ PyDictObject *_PyObject_MaterializeManagedDict_LockHeld(PyObject *);
static inline Py_ssize_t
_PyDict_UniqueId(PyDictObject *mp)
{
return (Py_ssize_t)(mp->_ma_watcher_tag >> DICT_UNIQUE_ID_SHIFT);
uint64_t tag = FT_ATOMIC_LOAD_UINT64_RELAXED(mp->_ma_watcher_tag);
return (Py_ssize_t)(tag >> DICT_UNIQUE_ID_SHIFT);
}

static inline void
Expand Down
3 changes: 3 additions & 0 deletions Include/internal/pycore_dict_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ extern "C" {
# error "this header requires Py_BUILD_CORE define"
#endif

#include "pycore_lock.h" // PyMutex

Comment thread
yoney marked this conversation as resolved.
Outdated
#define DICT_MAX_WATCHERS 8
#define DICT_WATCHED_MUTATION_BITS 4

struct _Py_dict_state {
uint32_t next_keys_version;
PyDict_WatchCallback watchers[DICT_MAX_WATCHERS];
PyMutex watcher_mutex; // Protects the watchers array (free-threaded builds)
Comment thread
yoney marked this conversation as resolved.
Outdated
};

#define _dict_state_INIT \
Expand Down
6 changes: 6 additions & 0 deletions Include/internal/pycore_pyatomic_ft_wrappers.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ extern "C" {
_Py_atomic_load_uint16_relaxed(&value)
#define FT_ATOMIC_LOAD_UINT32_RELAXED(value) \
_Py_atomic_load_uint32_relaxed(&value)
#define FT_ATOMIC_LOAD_UINT64_RELAXED(value) \
_Py_atomic_load_uint64_relaxed(&value)
#define FT_ATOMIC_LOAD_ULONG_RELAXED(value) \
_Py_atomic_load_ulong_relaxed(&value)
#define FT_ATOMIC_STORE_PTR_RELAXED(value, new_value) \
Expand Down Expand Up @@ -125,6 +127,8 @@ extern "C" {
_Py_atomic_load_ullong_relaxed(&value)
#define FT_ATOMIC_ADD_SSIZE(value, new_value) \
(void)_Py_atomic_add_ssize(&value, new_value)
#define FT_ATOMIC_ADD_UINT64(value, new_value) \
(void)_Py_atomic_add_uint64(&value, new_value)
#define FT_MUTEX_LOCK(lock) PyMutex_Lock(lock)
#define FT_MUTEX_UNLOCK(lock) PyMutex_Unlock(lock)

Expand All @@ -144,6 +148,7 @@ extern "C" {
#define FT_ATOMIC_LOAD_UINT8_RELAXED(value) value
#define FT_ATOMIC_LOAD_UINT16_RELAXED(value) value
#define FT_ATOMIC_LOAD_UINT32_RELAXED(value) value
#define FT_ATOMIC_LOAD_UINT64_RELAXED(value) value
#define FT_ATOMIC_LOAD_ULONG_RELAXED(value) value
#define FT_ATOMIC_STORE_PTR_RELAXED(value, new_value) value = new_value
#define FT_ATOMIC_STORE_PTR_RELEASE(value, new_value) value = new_value
Expand Down Expand Up @@ -182,6 +187,7 @@ extern "C" {
#define FT_ATOMIC_LOAD_ULLONG_RELAXED(value) value
#define FT_ATOMIC_STORE_ULLONG_RELAXED(value, new_value) value = new_value
#define FT_ATOMIC_ADD_SSIZE(value, new_value) (void)(value += new_value)
#define FT_ATOMIC_ADD_UINT64(value, new_value) (void)(value += new_value)
#define FT_MUTEX_LOCK(lock) do {} while (0)
#define FT_MUTEX_UNLOCK(lock) do {} while (0)

Expand Down
89 changes: 89 additions & 0 deletions Lib/test/test_free_threading/test_dict_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import unittest

from test.support import import_helper, threading_helper

_testcapi = import_helper.import_module("_testcapi")

ITERS = 100
NTHREADS = 20


@threading_helper.requires_working_threading()
class TestDictWatcherThreadSafety(unittest.TestCase):
# Watcher kinds from _testcapi
EVENTS = 0 # appends dict events as strings to global event list

def test_concurrent_add_clear_watchers(self):
"""Race AddWatcher and ClearWatcher from multiple threads.

Uses more threads than available watcher slots (5 user slots out
of DICT_MAX_WATCHERS=8).
"""
results = []

def worker():
for _ in range(ITERS):
try:
wid = _testcapi.add_dict_watcher(self.EVENTS)
except RuntimeError:
continue # All slots taken
self.assertGreaterEqual(wid, 0)
results.append(wid)
_testcapi.clear_dict_watcher(wid)

threading_helper.run_concurrently(worker, NTHREADS)

# Verify at least some watchers were successfully added
self.assertGreater(len(results), 0)

def test_concurrent_watch_unwatch(self):
"""Race Watch and Unwatch on the same dict from multiple threads."""
wid = _testcapi.add_dict_watcher(self.EVENTS)
dicts = [{} for _ in range(10)]

def worker():
for _ in range(ITERS):
for d in dicts:
_testcapi.watch_dict(wid, d)
for d in dicts:
_testcapi.unwatch_dict(wid, d)

try:
threading_helper.run_concurrently(worker, NTHREADS)

# Verify watching still works after concurrent watch/unwatch
_testcapi.watch_dict(wid, dicts[0])
dicts[0]["key"] = "value"
events = _testcapi.get_dict_watcher_events()
self.assertIn("new:key:value", events)
finally:
_testcapi.clear_dict_watcher(wid)

def test_concurrent_modify_watched_dict(self):
"""Race dict mutations (triggering callbacks) with watch/unwatch."""
wid = _testcapi.add_dict_watcher(self.EVENTS)
d = {}
_testcapi.watch_dict(wid, d)

def mutator():
for i in range(ITERS):
d[f"key_{i}"] = i
d.pop(f"key_{i}", None)

def toggler():
for i in range(ITERS):
_testcapi.watch_dict(wid, d)
d[f"toggler_{i}"] = i
_testcapi.unwatch_dict(wid, d)

workers = [mutator, toggler] * (NTHREADS // 2)
try:
threading_helper.run_concurrently(workers)
events = _testcapi.get_dict_watcher_events()
self.assertGreater(len(events), 0)
finally:
_testcapi.clear_dict_watcher(wid)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Made :c:func:`PyDict_AddWatcher`, :c:func:`PyDict_ClearWatcher`,
:c:func:`PyDict_Watch`, and :c:func:`PyDict_Unwatch` thread-safe on the
:term:`free threaded <free threading>` build.
26 changes: 15 additions & 11 deletions Modules/_testcapi/watchers.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "pycore_function.h" // FUNC_MAX_WATCHERS
#include "pycore_interp_structs.h" // CODE_MAX_WATCHERS
#include "pycore_context.h" // CONTEXT_MAX_WATCHERS
#include "pycore_lock.h" // _PyOnceFlag

/*[clinic input]
module _testcapi
Expand All @@ -18,6 +19,14 @@ module _testcapi
// Test dict watching
static PyObject *g_dict_watch_events = NULL;
static int g_dict_watchers_installed = 0;
static _PyOnceFlag g_dict_watch_once = {0};

static int
_init_dict_watch_events(void *arg)
{
g_dict_watch_events = PyList_New(0);
return g_dict_watch_events ? 0 : -1;
}

static int
dict_watch_callback(PyDict_WatchEvent event,
Expand Down Expand Up @@ -106,13 +115,10 @@ add_dict_watcher(PyObject *self, PyObject *kind)
if (watcher_id < 0) {
return NULL;
}
if (!g_dict_watchers_installed) {
assert(!g_dict_watch_events);
if (!(g_dict_watch_events = PyList_New(0))) {
return NULL;
}
if (_PyOnceFlag_CallOnce(&g_dict_watch_once, _init_dict_watch_events, NULL) < 0) {
return NULL;
}
g_dict_watchers_installed++;
_Py_atomic_add_int(&g_dict_watchers_installed, 1);
return PyLong_FromLong(watcher_id);
}

Expand All @@ -122,10 +128,8 @@ clear_dict_watcher(PyObject *self, PyObject *watcher_id)
if (PyDict_ClearWatcher(PyLong_AsLong(watcher_id))) {
return NULL;
}
g_dict_watchers_installed--;
if (!g_dict_watchers_installed) {
assert(g_dict_watch_events);
Py_CLEAR(g_dict_watch_events);
if (_Py_atomic_add_int(&g_dict_watchers_installed, -1) == 1) {
PyList_Clear(g_dict_watch_events);
}
Py_RETURN_NONE;
}
Expand Down Expand Up @@ -164,7 +168,7 @@ _testcapi_unwatch_dict_impl(PyObject *module, int watcher_id, PyObject *dict)
static PyObject *
get_dict_watcher_events(PyObject *self, PyObject *Py_UNUSED(args))
{
if (!g_dict_watch_events) {
if (_Py_atomic_load_int(&g_dict_watchers_installed) <= 0) {
PyErr_SetString(PyExc_RuntimeError, "no watchers active");
return NULL;
}
Expand Down
42 changes: 31 additions & 11 deletions Objects/dictobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -7842,13 +7842,19 @@ validate_watcher_id(PyInterpreterState *interp, int watcher_id)
PyErr_Format(PyExc_ValueError, "Invalid dict watcher ID %d", watcher_id);
return -1;
}
if (!interp->dict_state.watchers[watcher_id]) {
PyDict_WatchCallback cb = FT_ATOMIC_LOAD_PTR_RELAXED(
interp->dict_state.watchers[watcher_id]);
Comment thread
yoney marked this conversation as resolved.
if (cb == NULL) {
PyErr_Format(PyExc_ValueError, "No dict watcher set for ID %d", watcher_id);
return -1;
}
return 0;
}

// In free-threaded builds, Add/Clear serialize on watcher_mutex and publish
// callbacks with release stores. SendEvent reads them lock-free using
// acquire loads.

int
PyDict_Watch(int watcher_id, PyObject* dict)
{
Expand All @@ -7860,7 +7866,9 @@ PyDict_Watch(int watcher_id, PyObject* dict)
if (validate_watcher_id(interp, watcher_id)) {
return -1;
}
((PyDictObject*)dict)->_ma_watcher_tag |= (1LL << watcher_id);
Py_BEGIN_CRITICAL_SECTION(dict);
((PyDictObject*)dict)->_ma_watcher_tag |= (1ULL << watcher_id);
Py_END_CRITICAL_SECTION();
Comment thread
yoney marked this conversation as resolved.
Outdated
return 0;
}

Expand All @@ -7875,36 +7883,47 @@ PyDict_Unwatch(int watcher_id, PyObject* dict)
if (validate_watcher_id(interp, watcher_id)) {
return -1;
}
((PyDictObject*)dict)->_ma_watcher_tag &= ~(1LL << watcher_id);
Py_BEGIN_CRITICAL_SECTION(dict);
((PyDictObject*)dict)->_ma_watcher_tag &= ~(1ULL << watcher_id);
Comment thread
yoney marked this conversation as resolved.
Outdated
Py_END_CRITICAL_SECTION();
return 0;
}

int
PyDict_AddWatcher(PyDict_WatchCallback callback)
{
int watcher_id = -1;
PyInterpreterState *interp = _PyInterpreterState_GET();

FT_MUTEX_LOCK(&interp->dict_state.watcher_mutex);
Comment thread
yoney marked this conversation as resolved.
Outdated
/* Some watchers are reserved for CPython, start at the first available one */
for (int i = FIRST_AVAILABLE_WATCHER; i < DICT_MAX_WATCHERS; i++) {
if (!interp->dict_state.watchers[i]) {
interp->dict_state.watchers[i] = callback;
return i;
FT_ATOMIC_STORE_PTR_RELEASE(interp->dict_state.watchers[i], callback);
watcher_id = i;
goto done;
}
}

PyErr_SetString(PyExc_RuntimeError, "no more dict watcher IDs available");
return -1;
done:
FT_MUTEX_UNLOCK(&interp->dict_state.watcher_mutex);
return watcher_id;
}

int
PyDict_ClearWatcher(int watcher_id)
{
int res = 0;
PyInterpreterState *interp = _PyInterpreterState_GET();
FT_MUTEX_LOCK(&interp->dict_state.watcher_mutex);
if (validate_watcher_id(interp, watcher_id)) {
return -1;
res = -1;
goto done;
}
interp->dict_state.watchers[watcher_id] = NULL;
return 0;
FT_ATOMIC_STORE_PTR_RELEASE(interp->dict_state.watchers[watcher_id], NULL);
done:
FT_MUTEX_UNLOCK(&interp->dict_state.watcher_mutex);
return res;
}

static const char *
Expand All @@ -7929,7 +7948,8 @@ _PyDict_SendEvent(int watcher_bits,
PyInterpreterState *interp = _PyInterpreterState_GET();
for (int i = 0; i < DICT_MAX_WATCHERS; i++) {
if (watcher_bits & 1) {
PyDict_WatchCallback cb = interp->dict_state.watchers[i];
PyDict_WatchCallback cb = FT_ATOMIC_LOAD_PTR_ACQUIRE(
interp->dict_state.watchers[i]);
if (cb && (cb(event, (PyObject*)mp, key, value) < 0)) {
// We don't want to resurrect the dict by potentially having an
// unraisablehook keep a reference to it, so we don't pass the
Expand Down
12 changes: 8 additions & 4 deletions Python/optimizer_analysis.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "pycore_opcode_metadata.h"
#include "pycore_opcode_utils.h"
#include "pycore_pystate.h" // _PyInterpreterState_GET()
#include "pycore_pyatomic_ft_wrappers.h" // FT_MUTEX_LOCK/UNLOCK
#include "pycore_tstate.h" // _PyThreadStateImpl
#include "pycore_uop_metadata.h"
#include "pycore_long.h"
Expand Down Expand Up @@ -117,14 +118,15 @@ static int
get_mutations(PyObject* dict) {
assert(PyDict_CheckExact(dict));
PyDictObject *d = (PyDictObject *)dict;
return (d->_ma_watcher_tag >> DICT_MAX_WATCHERS) & ((1 << DICT_WATCHED_MUTATION_BITS)-1);
uint64_t tag = FT_ATOMIC_LOAD_UINT64_RELAXED(d->_ma_watcher_tag);
return (tag >> DICT_MAX_WATCHERS) & ((1 << DICT_WATCHED_MUTATION_BITS) - 1);
}

static void
increment_mutations(PyObject* dict) {
assert(PyDict_CheckExact(dict));
PyDictObject *d = (PyDictObject *)dict;
d->_ma_watcher_tag += (1 << DICT_MAX_WATCHERS);
FT_ATOMIC_ADD_UINT64(d->_ma_watcher_tag, 1ULL << DICT_MAX_WATCHERS);
}

/* The first two dict watcher IDs are reserved for CPython,
Expand Down Expand Up @@ -467,8 +469,10 @@ optimize_uops(

// Make sure that watchers are set up
PyInterpreterState *interp = _PyInterpreterState_GET();
if (interp->dict_state.watchers[GLOBALS_WATCHER_ID] == NULL) {
interp->dict_state.watchers[GLOBALS_WATCHER_ID] = globals_watcher_callback;
if (FT_ATOMIC_LOAD_PTR_RELAXED(interp->dict_state.watchers[GLOBALS_WATCHER_ID]) == NULL) {
Comment thread
colesbury marked this conversation as resolved.
Outdated
FT_ATOMIC_STORE_PTR_RELEASE(
interp->dict_state.watchers[GLOBALS_WATCHER_ID],
globals_watcher_callback);
interp->type_watchers[TYPE_WATCHER_ID] = type_watcher_callback;
}

Expand Down
1 change: 1 addition & 0 deletions Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ _Py_COMP_DIAG_POP
&(runtime)->allocators.mutex, \
&(runtime)->_main_interpreter.types.mutex, \
&(runtime)->_main_interpreter.code_state.mutex, \
&(runtime)->_main_interpreter.dict_state.watcher_mutex, \
}

static void
Expand Down
1 change: 1 addition & 0 deletions Tools/c-analyzer/cpython/ignored.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ Modules/_testcapi/object.c - MyObject_dealloc_called -
Modules/_testcapi/object.c - MyType -
Modules/_testcapi/structmember.c - test_structmembersType_OldAPI -
Modules/_testcapi/watchers.c - g_dict_watch_events -
Modules/_testcapi/watchers.c - g_dict_watch_once -
Modules/_testcapi/watchers.c - g_dict_watchers_installed -
Modules/_testcapi/watchers.c - g_type_modified_events -
Modules/_testcapi/watchers.c - g_type_watchers_installed -
Expand Down
Loading