diff --git a/.github/workflows/sanitizers.yaml b/.github/workflows/sanitizers.yaml index 84a1080aed..15fe013d5b 100644 --- a/.github/workflows/sanitizers.yaml +++ b/.github/workflows/sanitizers.yaml @@ -108,7 +108,7 @@ jobs: - name: Set CGO flags run: | { - echo "CGO_CFLAGS=$CFLAGS -I${PWD}/watcher/target/include -DFRANKENPHP_TEST $(php-config --includes)" + echo "CGO_CFLAGS=$CFLAGS -I${PWD}/watcher/target/include $(php-config --includes)" echo "CGO_LDFLAGS=$LDFLAGS $(php-config --ldflags) $(php-config --libs)" } >> "$GITHUB_ENV" - name: Run tests diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index d98b61ac18..51d59caf4a 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -62,7 +62,7 @@ jobs: # TODO: remove this workaround when fixed upstream run: sudo apt-get install --reinstall -y libbrotli-dev - name: Set CGO flags - run: echo "CGO_CFLAGS=-I${PWD}/watcher/target/include -DFRANKENPHP_TEST $(php-config --includes)" >> "${GITHUB_ENV}" + run: echo "CGO_CFLAGS=-I${PWD}/watcher/target/include $(php-config --includes)" >> "${GITHUB_ENV}" - name: Build run: go build - name: Build testcli binary @@ -138,7 +138,7 @@ jobs: echo "GEN_STUB_SCRIPT=${PWD}/php-${PHP_VERSION}/build/gen_stub.php" >> "${GITHUB_ENV}" - name: Set CGO flags run: | - echo "CGO_CFLAGS=-DFRANKENPHP_TEST $(php-config --includes)" >> "${GITHUB_ENV}" + echo "CGO_CFLAGS=$(php-config --includes)" >> "${GITHUB_ENV}" echo "CGO_LDFLAGS=$(php-config --ldflags) $(php-config --libs)" >> "${GITHUB_ENV}" - name: Install gotestsum run: go install gotest.tools/gotestsum@latest @@ -175,7 +175,7 @@ jobs: - name: Set CGO flags run: | { - echo "CGO_CFLAGS=-I/opt/homebrew/include/ -DFRANKENPHP_TEST $(php-config --includes)" + echo "CGO_CFLAGS=-I/opt/homebrew/include/ $(php-config --includes)" echo "CGO_LDFLAGS=-L/opt/homebrew/lib/ $(php-config --ldflags) $(php-config --libs)" } >> "${GITHUB_ENV}" - name: Build diff --git a/bgworker.go b/bgworker.go new file mode 100644 index 0000000000..7381900197 --- /dev/null +++ b/bgworker.go @@ -0,0 +1,522 @@ +package frankenphp + +// #include +// #include "frankenphp.h" +import "C" +import ( + "errors" + "fmt" + "log/slog" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + "unsafe" +) + +const ( + // defaultMaxBackgroundWorkers is the default safety cap for catch-all + // background workers when the user doesn't set max_threads. Caps the + // number of distinct lazy-started instances from a single catch-all. + defaultMaxBackgroundWorkers = 16 + + // defaultEnsureTimeout is the default deadline applied when ensure() is + // called without an explicit timeout. + defaultEnsureTimeout = 30 * time.Second +) + +// bootFailureInfo is the boot-phase crash metadata surfaced by ensure(). +type bootFailureInfo struct { + entrypoint string + exitStatus int + failureCount int + phpError string +} + +// Scope is an opaque per-php_server isolation boundary; the +// zero value is the global/embed scope. Obtain values via +// NextScope. +type Scope uint64 + +var scopeCounter atomic.Uint64 + +// NextScope returns a fresh scope value. Each php_server +// block should call this once during provisioning. +func NextScope() Scope { + return Scope(scopeCounter.Add(1)) +} + +// scopeLabels maps Scope -> human-readable label registered by the embedder +// (e.g. the Caddy module). Read by ScopeLabel; written by SetScopeLabel. +var scopeLabels sync.Map + +// SetScopeLabel attaches a human-readable label to a scope so future +// metric/log emitters can render it as e.g. server="api.example.com" +// instead of an opaque numeric id. Empty labels are ignored. Embedders +// (Caddy module, custom hosts) own the labelling policy. +func SetScopeLabel(s Scope, label string) { + if label == "" { + return + } + scopeLabels.Store(s, label) +} + +// ScopeLabel returns the label registered for s. When none is set +// (including the zero/global scope), it returns the numeric id so +// callers always get a non-empty value. +func ScopeLabel(s Scope) string { + if v, ok := scopeLabels.Load(s); ok { + return v.(string) + } + return strconv.FormatUint(uint64(s), 10) +} + +// backgroundLookups maps scope -> lookup. Scope 0 is the global/embed scope. +var backgroundLookups map[Scope]*backgroundWorkerLookup + +// backgroundWorkerLookup resolves a user-facing worker name to its *worker; +// catchAll is the fallback when byName misses. +type backgroundWorkerLookup struct { + byName map[string]*worker + catchAll *worker +} + +func newBackgroundWorkerLookup() *backgroundWorkerLookup { + return &backgroundWorkerLookup{ + byName: make(map[string]*worker), + } +} + +// resolve returns the worker for the given name, falling back to catchAll. +func (l *backgroundWorkerLookup) resolve(name string) *worker { + if w, ok := l.byName[name]; ok { + return w + } + return l.catchAll +} + +// buildBackgroundWorkerLookups maps each declared bg worker into its scope's +// lookup. Per-scope name collisions are caught here because bg workers +// intentionally skip the global workersByName map (so two scopes can share +// a user-facing name). +func buildBackgroundWorkerLookups(workers []*worker, opts []workerOpt) (map[Scope]*backgroundWorkerLookup, error) { + lookups := make(map[Scope]*backgroundWorkerLookup) + + for i, o := range opts { + if !o.isBackgroundWorker { + continue + } + + scope := o.scope + lookup, ok := lookups[scope] + if !ok { + lookup = newBackgroundWorkerLookup() + lookups[scope] = lookup + } + + w := workers[i] + w.scope = scope + // Named workers and a catch-all's eager num>0 pool both wait on + // the template's slot; lazy-spawned catch-all instances each get + // their own in catchAllNames. + if w.backgroundWorker == nil { + w.backgroundWorker = newBackgroundWorkerState() + } + + phpName := strings.TrimPrefix(w.name, "m#") + if phpName != "" && phpName != w.fileName { + if _, exists := lookup.byName[phpName]; exists { + return nil, fmt.Errorf("duplicate background worker name %q in the same scope", phpName) + } + lookup.byName[phpName] = w + } else { + if lookup.catchAll != nil { + return nil, fmt.Errorf("duplicate catch-all background worker in the same scope") + } + cap := defaultMaxBackgroundWorkers + if o.maxThreads > 0 { + cap = o.maxThreads + } + w.catchAllCap = cap + w.catchAllNames = make(map[string]*backgroundWorkerState) + lookup.catchAll = w + } + } + + if len(lookups) == 0 { + return nil, nil + } + return lookups, nil +} + +// reserveBackgroundWorkerThreads resolves max_threads defaults and +// returns the thread budget to add to the pool. Mutates opt.workers +// in place and pre-registers totalWorkers so a bg-only deployment +// has the metric initialised. +func reserveBackgroundWorkerThreads(opt *opt) int { + reserved := 0 + for i, w := range opt.workers { + if !w.isBackgroundWorker { + continue + } + phpName := strings.TrimPrefix(w.name, "m#") + isCatchAll := phpName == "" || phpName == w.fileName + + if w.maxThreads == 0 { + switch { + case isCatchAll: + // Lazy cap default for any catch-all. + opt.workers[i].maxThreads = defaultMaxBackgroundWorkers + case w.num == 0: + // Single-thread budget for a lazy named worker. + opt.workers[i].maxThreads = 1 + } + } + + var extra int + if isCatchAll { + // eager pool + lazy cap (independent budgets) + extra = w.num + opt.workers[i].maxThreads + } else { + extra = w.num + if opt.workers[i].maxThreads > extra { + extra = opt.workers[i].maxThreads + } + } + if extra < 1 { + extra = 1 + } + reserved += extra + metrics.TotalWorkers(w.name, extra) + } + return reserved +} + +// getLookup returns the background-worker lookup for the calling thread, +// resolving the scope via worker handler -> request context -> global. A +// scope with no declared workers falls through to scope 0 so embed-mode +// workers stay reachable; declared scopes stay strictly isolated. +func getLookup(thread *phpThread) *backgroundWorkerLookup { + if backgroundLookups == nil { + return nil + } + var scope Scope + if thread != nil { + // Both *workerThread and *backgroundWorkerThread expose their *worker + // via this anonymous interface; classic-mode threads don't and fall + // through to the request-context path. + if h, ok := thread.handler.(interface{ scopedWorker() *worker }); ok { + scope = h.scopedWorker().scope + } else if fc, ok := fromContext(thread.context()); ok { + scope = fc.scope + } + } + if scope != 0 { + if l := backgroundLookups[scope]; l != nil { + return l + } + } + return backgroundLookups[0] +} + +// startBackgroundWorker resolves `name` via lookup.byName / lookup.catchAll, +// lazy-starting the thread if needed, and returns the per-instance state +// slot the caller should wait on. Safe to call concurrently. +func startBackgroundWorker(thread *phpThread, bgWorkerName string) (*backgroundWorkerState, error) { + if bgWorkerName == "" { + return nil, fmt.Errorf("background worker name must not be empty") + } + lookup := getLookup(thread) + if lookup == nil { + return nil, fmt.Errorf("no background worker configured") + } + + // byName is keyed by the user-facing (m#-stripped) name. + if w, ok := lookup.byName[bgWorkerName]; ok { + sk, err := lazyStartNamedWorker(w) + if err != nil { + return nil, err + } + return sk, nil + } + + catchAll := lookup.catchAll + if catchAll == nil { + return nil, fmt.Errorf("no background worker configured for name %q", bgWorkerName) + } + + // Hold catchAllMu across thread reservation + entry publication so a + // failed allocation can't leave a phantom registration visible to + // concurrent callers. + catchAll.catchAllMu.Lock() + + if sk, ok := catchAll.catchAllNames[bgWorkerName]; ok { + catchAll.catchAllMu.Unlock() + return sk, nil + } + + if catchAll.catchAllCap > 0 && len(catchAll.catchAllNames) >= catchAll.catchAllCap { + catchAll.catchAllMu.Unlock() + return nil, fmt.Errorf("cannot start background worker %q: limit of %d reached (increase max threads or declare it as a named worker)", bgWorkerName, catchAll.catchAllCap) + } + + t := getInactivePHPThread() + if t == nil { + catchAll.catchAllMu.Unlock() + return nil, fmt.Errorf("no available PHP thread for background worker (increase max threads)") + } + + sk := newBackgroundWorkerState() + catchAll.catchAllNames[bgWorkerName] = sk + catchAll.catchAllMu.Unlock() + + convertToBackgroundWorkerThread(t, catchAll, bgWorkerName, sk) + + if globalLogger.Enabled(globalCtx, slog.LevelInfo) { + globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "background worker started", + slog.String("name", bgWorkerName)) + } + + return sk, nil +} + +// lazyStartNamedWorker returns the readiness slot the caller should +// wait on. For num=0 workers it spawns the first thread under +// bgLazyStartMu (idempotent); the snapshot captured under the lock +// stays consistent with any concurrent invalidateBackgroundEntry. +func lazyStartNamedWorker(w *worker) (*backgroundWorkerState, error) { + if w.num > 0 { + return w.backgroundWorker, nil + } + w.bgLazyStartMu.Lock() + defer w.bgLazyStartMu.Unlock() + if w.bgLazyStarted { + return w.backgroundWorker, nil + } + t := getInactivePHPThread() + if t == nil { + return nil, fmt.Errorf("no available PHP thread for background worker (increase max threads)") + } + sk := w.backgroundWorker + convertToBackgroundWorkerThread(t, w, w.name, sk) + w.bgLazyStarted = true + return sk, nil +} + +// isBootstrapEnsure reports whether the calling thread is inside an HTTP +// worker's boot phase. Bootstrap callers take the fail-fast path; runtime +// callers (bg workers, classic requests) take the tolerant lazy-start path. +func isBootstrapEnsure(thread *phpThread) bool { + handler, ok := thread.handler.(*workerThread) + return ok && handler.isBootingScript +} + +// formatBackgroundWorkerTimeoutError produces the timeout-error message +// for an ensure() that didn't reach combined readiness in time. Mentions +// the boot failure if one was recorded; otherwise reports which half of +// the readiness signal the worker missed. +func formatBackgroundWorkerTimeoutError(name string, sk *backgroundWorkerState, timeout time.Duration) string { + if info := sk.bootFailure.Load(); info != nil { + msg := fmt.Sprintf("background worker %q did not become ready within %s; last attempt %d failed (exit status %d, entrypoint %s)", + name, timeout, info.failureCount, info.exitStatus, info.entrypoint) + if info.phpError != "" { + msg += ": " + info.phpError + } + return msg + } + missing := []string{} + if !sk.hasHandle.Load() { + missing = append(missing, "frankenphp_get_worker_handle()") + } + if !sk.hasVars.Load() { + missing = append(missing, "frankenphp_set_vars()") + } + if len(missing) == 0 { + return fmt.Sprintf("background worker %q did not become ready within %s", name, timeout) + } + return fmt.Sprintf("background worker %q did not call %s within %s", name, strings.Join(missing, " and "), timeout) +} + +// errBackgroundWorkerNotInsideBgThread is returned by set_vars when called +// outside a bg-worker thread. Package-level so tests can match it. +var errBackgroundWorkerNotInsideBgThread = errors.New("frankenphp_set_vars() can only be called from a background worker") + +// go_frankenphp_ensure_background_worker lazy-starts each named bg worker +// (C side has validated names are non-empty + unique) and blocks until +// each reaches combined readiness (get_worker_handle + set_vars), aborts, +// or timeoutMs elapses (<=0 = default). Bootstrap callers (HTTP worker +// pre-handle_request) fail fast on boot failures; runtime callers wait +// out the restart/backoff cycle. +// +//export go_frankenphp_ensure_background_worker +func go_frankenphp_ensure_background_worker(threadIndex C.uintptr_t, names **C.char, nameLens *C.size_t, nameCount C.int, timeoutMs C.int64_t) *C.char { + thread := phpThreads[threadIndex] + timeout := time.Duration(int64(timeoutMs)) * time.Millisecond + if timeout <= 0 { + timeout = defaultEnsureTimeout + } + + n := int(nameCount) + nameSlice := unsafe.Slice(names, n) + nameLenSlice := unsafe.Slice(nameLens, n) + bootstrap := isBootstrapEnsure(thread) + + // Start each named worker first. Reserve their states so a shared + // deadline applies across the whole group (the caller gets one + // timeout value, not one per worker). + sks := make([]*backgroundWorkerState, n) + goNames := make([]string, n) + for i := 0; i < n; i++ { + goNames[i] = C.GoStringN(nameSlice[i], C.int(nameLenSlice[i])) + sk, err := startBackgroundWorker(thread, goNames[i]) + if err != nil { + return C.CString(err.Error()) + } + sks[i] = sk + } + + deadline := time.After(timeout) + if bootstrap { + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + for i, sk := range sks { + wait: + for { + select { + case <-sk.ready: + break wait + case <-sk.aborted: + return C.CString(sk.abortErr.Error()) + case <-deadline: + return C.CString(formatBackgroundWorkerTimeoutError(goNames[i], sk, timeout)) + case <-globalCtx.Done(): + return C.CString("frankenphp is shutting down") + case <-ticker.C: + if sk.bootFailure.Load() != nil { + return C.CString(formatBackgroundWorkerTimeoutError(goNames[i], sk, timeout)) + } + } + } + } + return nil + } + + for i, sk := range sks { + select { + case <-sk.ready: + case <-sk.aborted: + return C.CString(sk.abortErr.Error()) + case <-deadline: + return C.CString(formatBackgroundWorkerTimeoutError(goNames[i], sk, timeout)) + case <-globalCtx.Done(): + return C.CString("frankenphp is shutting down") + } + } + return nil +} + +// go_frankenphp_worker_ready marks the handle half of the combined +// readiness signal on the per-thread state slot. The slot's ready channel +// closes only once both handle and set_vars have fired. Idempotent. +// +//export go_frankenphp_worker_ready +func go_frankenphp_worker_ready(threadIndex C.uintptr_t) { + thread := phpThreads[threadIndex] + if thread == nil { + return + } + handler, ok := thread.handler.(*backgroundWorkerThread) + if !ok || handler == nil { + return + } + if sk := handler.backgroundWorker; sk != nil { + sk.markHandle() + } +} + +// go_frankenphp_set_vars is called from PHP when a background worker +// publishes its shared vars. The caller has already deep-copied the vars +// into persistent memory; here we swap the pointer under the state lock +// and hand back the old pointer so the C side can free it after the call. +// +//export go_frankenphp_set_vars +func go_frankenphp_set_vars(threadIndex C.uintptr_t, varsPtr unsafe.Pointer, oldPtr *unsafe.Pointer) *C.char { + thread := phpThreads[threadIndex] + + bgHandler, ok := thread.handler.(*backgroundWorkerThread) + if !ok || bgHandler.backgroundWorker == nil { + return C.CString(errBackgroundWorkerNotInsideBgThread.Error()) + } + + sk := bgHandler.backgroundWorker + + sk.mu.Lock() + *oldPtr = sk.varsPtr + sk.varsPtr = varsPtr + sk.varsVersion.Add(1) + sk.mu.Unlock() + + bgHandler.markBackgroundReady() + + return nil +} + +// go_frankenphp_get_vars resolves the named worker through the lookup +// (named or catch-all), checks sk.ready without starting the worker, and +// copies its vars into the return value. If the caller hasn't called +// ensure() first, this returns a "not ready" error. +// +// callerVersion / outVersion implement a per-request cache: +// - If callerVersion is non-nil and equals the current varsVersion, +// the copy is skipped; outVersion is still set so the C side can +// reuse its cached zval with pointer equality. +// - Otherwise returnValue receives a fresh deep copy and outVersion +// reports the version that copy corresponds to. +// +//export go_frankenphp_get_vars +func go_frankenphp_get_vars(threadIndex C.uintptr_t, name *C.char, nameLen C.size_t, returnValue *C.zval, callerVersion *C.uint64_t, outVersion *C.uint64_t) *C.char { + thread := phpThreads[threadIndex] + lookup := getLookup(thread) + if lookup == nil { + return C.CString("no background worker configured") + } + + goName := C.GoStringN(name, C.int(nameLen)) + var sk *backgroundWorkerState + if w, ok := lookup.byName[goName]; ok { + sk = w.backgroundWorker + } else if ca := lookup.catchAll; ca != nil { + ca.catchAllMu.Lock() + sk = ca.catchAllNames[goName] + ca.catchAllMu.Unlock() + } + if sk == nil { + return C.CString("background worker not found: " + goName + " (call frankenphp_ensure_background_worker first)") + } + + select { + case <-sk.ready: + default: + return C.CString("background worker not ready: " + goName + " (no set_vars call yet)") + } + + // Fast path: caller's cached version matches current. Skip the copy; + // the caller will reuse its cached zval. + if callerVersion != nil && outVersion != nil { + v := sk.varsVersion.Load() + *outVersion = C.uint64_t(v) + if uint64(*callerVersion) == v { + return nil + } + } + + sk.mu.RLock() + C.frankenphp_copy_persistent_vars(returnValue, sk.varsPtr) + if outVersion != nil { + *outVersion = C.uint64_t(sk.varsVersion.Load()) + } + sk.mu.RUnlock() + + return nil +} diff --git a/bgworker_test.go b/bgworker_test.go new file mode 100644 index 0000000000..331f88950e --- /dev/null +++ b/bgworker_test.go @@ -0,0 +1,54 @@ +package frankenphp_test + +import ( + "strings" + "testing" + "time" + + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/assert" +) + +// TestBackgroundWorker drives the minimal set_vars/get_vars path end-to-end: +// a background worker publishes three values, then an HTTP request on a +// separate thread reads them back by name. +func TestBackgroundWorker(t *testing.T) { + testDataDir := setupFrankenPHP(t, + frankenphp.WithWorkers("bg-basic", "testdata/bgworker/basic.php", 1, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(2), + ) + + // Give the background worker time to boot, publish, and park on the + // stop stream. set_vars is synchronous but the first scheduling of the + // bg worker thread is a race with Init returning, so a short wait is + // cheaper than a ready-channel hook for this test. + deadline := time.Now().Add(3 * time.Second) + var out string + for time.Now().Before(deadline) { + out = serveBody(t, testDataDir, "bgworker/reader.php") + if !strings.Contains(out, "MISSING") && strings.Contains(out, "has-ready-at=1") { + break + } + time.Sleep(50 * time.Millisecond) + } + + assert.Contains(t, out, "message=hello from background worker") + assert.Contains(t, out, "count=42") + assert.Contains(t, out, "has-ready-at=1") +} + +// TestBackgroundWorkerErrorPaths covers the misuse errors that don't need +// a running worker: get_vars on a nonexistent name, set_vars from outside +// a background worker, and get_worker_handle from outside a background +// worker. Runs as a non-worker request so none of the calls happen on a +// bg-worker thread. +func TestBackgroundWorkerErrorPaths(t *testing.T) { + testDataDir := setupFrankenPHP(t, frankenphp.WithNumThreads(2)) + + body := serveBody(t, testDataDir, "bgworker/errors.php") + assert.NotContains(t, body, "FAIL", "error-path script reported a failure:\n"+body) + assert.Contains(t, body, "OK missing:") + assert.Contains(t, body, "OK reject-non-bg:") + assert.Contains(t, body, "OK reject-handle:") +} diff --git a/bgworkerbatch_test.go b/bgworkerbatch_test.go new file mode 100644 index 0000000000..e7add54153 --- /dev/null +++ b/bgworkerbatch_test.go @@ -0,0 +1,111 @@ +package frankenphp_test + +import ( + "os" + "testing" + + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestEnsureBackgroundWorkerBatch ensures multiple workers in one call, +// each publishing its own identity. Verifies the batch path (array arg) +// shares one deadline across all workers. +func TestEnsureBackgroundWorkerBatch(t *testing.T) { + testDataDir := setupFrankenPHP(t, + frankenphp.WithWorkers("worker-a", "testdata/bgworker/named.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithWorkers("worker-b", "testdata/bgworker/named.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithWorkers("worker-c", "testdata/bgworker/named.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(6), + ) + + body := serveBody(t, testDataDir, "bgworker/batch-ensure.php") + assert.NotContains(t, body, "MISSING", "batch ensure should have started and published all workers:\n"+body) + assert.Contains(t, body, "worker-a=worker-a") + assert.Contains(t, body, "worker-b=worker-b") + assert.Contains(t, body, "worker-c=worker-c") +} + +// TestEnsureBackgroundWorkerBatchEmpty verifies that an empty array is +// rejected with a clear error rather than silently succeeding. +func TestEnsureBackgroundWorkerBatchEmpty(t *testing.T) { + testDataDir := setupFrankenPHP(t, + frankenphp.WithWorkers("bg", "testdata/bgworker/named.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(3), + ) + + php := `getMessage(); +}` + tmp := testDataDir + "bg-batch-empty.php" + require.NoError(t, os.WriteFile(tmp, []byte(php), 0644)) + t.Cleanup(func() { _ = os.Remove(tmp) }) + + body := serveBody(t, testDataDir, "bg-batch-empty.php") + assert.Contains(t, body, "OK ") + assert.Contains(t, body, "must not be empty") + assert.NotContains(t, body, "FAIL") +} + +// TestEnsureBackgroundWorkerBatchNonString verifies array-entry type +// validation: non-string elements produce a TypeError. +func TestEnsureBackgroundWorkerBatchNonString(t *testing.T) { + testDataDir := setupFrankenPHP(t, + frankenphp.WithWorkers("bg", "testdata/bgworker/named.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(3), + ) + + php := `getMessage(); +}` + tmp := testDataDir + "bg-batch-nonstring.php" + require.NoError(t, os.WriteFile(tmp, []byte(php), 0644)) + t.Cleanup(func() { _ = os.Remove(tmp) }) + + body := serveBody(t, testDataDir, "bg-batch-nonstring.php") + assert.Contains(t, body, "OK ") + assert.Contains(t, body, "must contain only strings") + assert.NotContains(t, body, "FAIL") +} + +// TestBackgroundWorkerServerFlag confirms that a bg worker sees +// FRANKENPHP_WORKER_BACKGROUND=true alongside FRANKENPHP_WORKER in +// $_SERVER, so scripts can branch without checking every function +// independently. +func TestBackgroundWorkerServerFlag(t *testing.T) { + testDataDir := setupFrankenPHP(t, + frankenphp.WithWorkers("flag-worker", "testdata/bgworker/bg-flag.php", 1, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(3), + ) + + // ensure() removes the race between Init returning and the eager + // bg-worker thread reaching its first set_vars. + php := `getMessage(); + return; + } catch (\RuntimeException $e) { + usleep(10000); + } +} +echo 'TIMEOUT'; +` + out := serveInlinePHP(t, testDataDir, "bg-enum-missing-reader.php", php) + + assert.NotContains(t, out, "NO_ERROR", "enum should not have materialized:\n"+out) + assert.NotContains(t, out, "TIMEOUT", "worker never published:\n"+out) + assert.Contains(t, out, "LogicException") + assert.Contains(t, out, "WorkerOnlyEnum", "missing class name must appear in the error:\n"+out) +} + +// TestBackgroundWorkerSignalingStreamResource confirms that the value +// returned by frankenphp_get_worker_handle() is a real PHP stream +// resource. Complements the bounded-wall-clock force-kill test: that +// one proves the pipe closes on shutdown, this one proves the handle +// is a proper resource in the first place (not null, not an int, not +// a user object). +func TestBackgroundWorkerSignalingStreamResource(t *testing.T) { + testDataDir := setupFrankenPHP(t, + frankenphp.WithWorkers("stream-worker", "testdata/bgworker/stream-probe.php", 1, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(3), + ) + + php := `getMessage(); +}` + tmp := testDataDir + "bg-ensure-undeclared.php" + require.NoError(t, os.WriteFile(tmp, []byte(php), 0644)) + t.Cleanup(func() { _ = os.Remove(tmp) }) + + body := serveBody(t, testDataDir, "bg-ensure-undeclared.php") + assert.Contains(t, body, "OK no background worker configured for name", "ensure of undeclared name should error:\n"+body) + assert.NotContains(t, body, "FAIL") +} + +// TestBackgroundWorkerBootFailureError confirms that an entrypoint which +// throws during boot surfaces the captured details through ensure()'s +// timeout error message: entrypoint path, attempt count, and the PHP +// RuntimeException message. Runs as a non-worker request so ensure uses +// the tolerant lazy-start path (no fail-fast). +func TestBackgroundWorkerBootFailureError(t *testing.T) { + testDataDir := setupFrankenPHP(t, + frankenphp.WithWorkers("boot-fail-worker", "testdata/bgworker/boot-fail.php", 0, + frankenphp.WithWorkerBackground()), + frankenphp.WithNumThreads(3), + ) + + php := `getMessage(); +}` + tmp := testDataDir + "bg-boot-fail.php" + require.NoError(t, os.WriteFile(tmp, []byte(php), 0644)) + t.Cleanup(func() { _ = os.Remove(tmp) }) + + body := serveBody(t, testDataDir, "bg-boot-fail.php") + assert.NotContains(t, body, "FAIL", "ensure should have thrown:\n"+body) + assert.Contains(t, body, `"boot-fail-worker"`) + assert.Contains(t, body, "bgworker/boot-fail.php", "entrypoint path must appear in the error:\n"+body) + assert.Contains(t, body, "attempt", "attempt count must appear:\n"+body) + assert.Contains(t, body, "intentional boot failure for test", "PHP exception message must be captured:\n"+body) +} diff --git a/bgworkerhelpers_test.go b/bgworkerhelpers_test.go new file mode 100644 index 0000000000..18fc3267c0 --- /dev/null +++ b/bgworkerhelpers_test.go @@ -0,0 +1,69 @@ +package frankenphp_test + +import ( + "io" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/require" +) + +// requireFileEventually asserts that `path` appears on disk before the +// deadline. Wraps require.Eventually so call sites stay short. +func requireFileEventually(t testing.TB, path string, msgAndArgs ...any) { + t.Helper() + require.Eventually(t, func() bool { + _, err := os.Stat(path) + return err == nil + }, 5*time.Second, 25*time.Millisecond, msgAndArgs...) +} + +// serveInlinePHP writes a small PHP fixture under testDataDir, serves it +// via ServeHTTP, removes it on cleanup, and returns the response body. +// The file name should not exist already; it is registered for +// t.Cleanup-driven removal. +func serveInlinePHP(t *testing.T, testDataDir, name, php string) string { + t.Helper() + path := testDataDir + name + require.NoError(t, os.WriteFile(path, []byte(php), 0644)) + t.Cleanup(func() { _ = os.Remove(path) }) + return serveBody(t, testDataDir, name) +} + +// setupFrankenPHP boots FrankenPHP with the given options, registers +// Shutdown as a t.Cleanup, and returns the absolute path to the testdata +// directory. Saves the boilerplate every bg-worker test repeats. +func setupFrankenPHP(t *testing.T, opts ...frankenphp.Option) (testDataDir string) { + t.Helper() + cwd, err := os.Getwd() + require.NoError(t, err) + testDataDir = cwd + "/testdata/" + require.NoError(t, frankenphp.Init(opts...)) + t.Cleanup(frankenphp.Shutdown) + return +} + +// serveBody runs `script` (relative to testDataDir, may include a query +// string) through FrankenPHP and returns the response body. ErrRejected is +// treated as a non-fatal outcome so worker-mode quirks don't fail tests +// that only care about the script's stdout. +func serveBody(t *testing.T, testDataDir, scriptAndQuery string, opts ...frankenphp.RequestOption) string { + t.Helper() + req := httptest.NewRequest("GET", "http://example.com/"+scriptAndQuery, nil) + reqOpts := append([]frankenphp.RequestOption{ + frankenphp.WithRequestDocumentRoot(testDataDir, false), + }, opts...) + fr, err := frankenphp.NewRequestWithContext(req, reqOpts...) + require.NoError(t, err) + + w := httptest.NewRecorder() + if err := frankenphp.ServeHTTP(w, fr); err != nil { + require.ErrorAs(t, err, &frankenphp.ErrRejected{}) + } + body, err := io.ReadAll(w.Result().Body) + require.NoError(t, err) + return string(body) +} diff --git a/bgworkerinternal_test.go b/bgworkerinternal_test.go new file mode 100644 index 0000000000..4aeaaf326d --- /dev/null +++ b/bgworkerinternal_test.go @@ -0,0 +1,195 @@ +package frankenphp + +import ( + "io" + "net/http/httptest" + "os" + "runtime" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestBackgroundWorkerRestartForceKillsStuckThread actually exercises the +// force-kill path: the fixture sleep()s without watching the stop pipe, +// so handler.drain() cannot wake it. RestartWorkers must go through the +// grace-period timeout and the force-kill primitive (pthread_kill on +// Linux/FreeBSD) to finish within the budget. Skips platforms where +// force-kill cannot interrupt a blocking syscall (macOS has no realtime +// signals, Windows non-alertable Sleep stays uninterruptible). +func TestBackgroundWorkerRestartForceKillsStuckThread(t *testing.T) { + if runtime.GOOS != "linux" && runtime.GOOS != "freebsd" { + t.Skipf("force-kill cannot interrupt blocking syscalls on %s", runtime.GOOS) + } + + prev := drainGracePeriod + drainGracePeriod = 2 * time.Second + t.Cleanup(func() { drainGracePeriod = prev }) + + cwd, _ := os.Getwd() + testDataDir := cwd + "/testdata/" + + require.NoError(t, Init( + WithWorkers("bg-stuck", testDataDir+"bgworker/stuck.php", 1, + WithWorkerBackground()), + WithNumThreads(2), + )) + t.Cleanup(Shutdown) + + // Wait until the bg worker published 'ready' (the line right before + // sleep(60)) so we know it is actually parked in the blocking + // syscall when the drain fires - that's the only way to prove the + // force-kill code path was exercised, not the stop-pipe EOF path. + readerPHP := `getMessage(); +}` + tmp := testDataDir + "bg-stuck-reader.php" + require.NoError(t, os.WriteFile(tmp, []byte(readerPHP), 0644)) + t.Cleanup(func() { _ = os.Remove(tmp) }) + + require.Eventually(t, func() bool { + req := httptest.NewRequest("GET", "http://example.com/bg-stuck-reader.php", nil) + fr, err := NewRequestWithContext(req, WithRequestDocumentRoot(testDataDir, false)) + if err != nil { + return false + } + w := httptest.NewRecorder() + _ = ServeHTTP(w, fr) + body, _ := io.ReadAll(w.Result().Body) + return strings.Contains(string(body), "ready=1") + }, 5*time.Second, 25*time.Millisecond, "bg worker never entered sleep()") + + start := time.Now() + RestartWorkers() + // Drain budget = grace period (2s) + slack for signal dispatch and + // drain completion. + assert.WithinDuration(t, start, time.Now(), 5*time.Second, + "drain must force-kill the stuck bg worker within the grace period") +} + +// TestEnsureBackgroundWorkerCatchAllNumPlusLazy declares a catch-all with +// an eager pool (num=1) and no explicit max_threads. The previous +// reservation logic only counted max(num, max_threads) and would +// undercount when num>0 with max_threads unset, leaving lazy ensure() +// callers without inactive thread slots even though the cap (16) said +// they were allowed. With the fix in place, all four lazy ensure()s +// must succeed alongside the eager pool thread. +func TestEnsureBackgroundWorkerCatchAllNumPlusLazy(t *testing.T) { + tmp := t.TempDir() + setupBgWorker(t, + WithWorkers("", "testdata/bgworker/named.php", 1, + WithWorkerBackground(), + WithWorkerEnv(map[string]string{"BG_SENTINEL_DIR": tmp}), + ), + WithNumThreads(2), + ) + + for _, name := range []string{"a", "b", "c", "d"} { + require.NoError(t, ensureBackgroundWorker(nil, name, 5*time.Second), "ensure(%s)", name) + } + for _, name := range []string{"a", "b", "c", "d"} { + requireSentinelEventually(t, filepath.Join(tmp, name), + "lazy catch-all instance %q should have written its sentinel", name) + } +} + +// TestBackgroundWorkerCatchAllEagerInstance proves that the eager +// num>0 pool of a catch-all worker actually boots and runs to +// readiness, alongside any lazy ensure() instances. The previous +// reservation logic was sufficient for the eager thread to land in a +// PHP slot but did not assert it actually executed; this guards +// against a regression where catch-all eager num is silently treated +// as a no-op. +func TestBackgroundWorkerCatchAllEagerInstance(t *testing.T) { + tmp := t.TempDir() + sentinel := filepath.Join(tmp, "eager") + setupBgWorker(t, + WithWorkers("", "testdata/bgworker/eager-catchall.php", 1, + WithWorkerBackground(), + WithWorkerEnv(map[string]string{"BG_EAGER_SENTINEL": sentinel}), + ), + WithNumThreads(2), + ) + requireSentinelEventually(t, sentinel, + "eager catch-all instance must reach readiness and write its sentinel") +} + +// TestEnsureBackgroundWorkerPostBootCrashLoopAborts proves that a worker +// which reaches readiness once (closing r.ready) and then crashes +// repeatedly post-readiness eventually surfaces the failure to ensure() +// callers via r.aborted, instead of leaving them with a stale "ready" +// signal pointing at a dead worker. Regression test for the silent- +// failure mode in the cap branch of afterScriptExecution. +func TestEnsureBackgroundWorkerPostBootCrashLoopAborts(t *testing.T) { + setupBgWorker(t, + WithWorkers("flapper", "testdata/bgworker/readiness-then-crash.php", 0, + WithWorkerBackground(), + WithWorkerMaxFailures(2), + ), + WithNumThreads(2), + ) + + // First call returns once the worker reaches readiness — the + // fixture's frankenphp_get_worker_handle() fires markReady before + // the script exit(1)s. + require.NoError(t, ensureBackgroundWorker(nil, "flapper", 5*time.Second)) + + // After at most max_consecutive_failures = 2 post-readiness crashes + // (each <1s with quadratic backoff capped at 1s), the cap branch + // fires abort() + invalidates the registry slot. Subsequent + // ensure() calls either see the prior abort, or lazy-spawn a fresh + // thread that ALSO crashes and aborts. Either way: error, not nil. + require.Eventually(t, func() bool { + return ensureBackgroundWorker(nil, "flapper", 1*time.Second) != nil + }, 10*time.Second, 100*time.Millisecond, + "ensure() must surface post-boot crash-loop instead of silently returning nil") +} + +// TestEnsureBackgroundWorkerCatchAllRespawnAfterCap proves that +// invalidateBackgroundEntry actually reclaims the catch-all slot after +// the cap fires: a fixture that crashes its first N boots and succeeds +// on boot N+1 must end up reaching readiness via a retried ensure(), +// rather than being permanently stuck behind the cap. +func TestEnsureBackgroundWorkerCatchAllRespawnAfterCap(t *testing.T) { + tmp := t.TempDir() + setupBgWorker(t, + WithWorkers("", "testdata/bgworker/fail-then-succeed.php", 0, + WithWorkerBackground(), + WithWorkerMaxThreads(2), + WithWorkerMaxFailures(2), + WithWorkerEnv(map[string]string{ + "BG_MARKER_DIR": tmp, + // Fail boots 1..3, succeed boot 4+. afterScriptExecution + // in threadbackgroundworker.go checks failureCount >= max + // before the failureCount++ at the bottom, so max=2 means + // the cap fires on the 3rd crash. + "BG_FAIL_UNTIL": "3", + }), + ), + WithNumThreads(3), + ) + + // First ensure() drives boots 1..3 (all crash, cap hits, abort + // fires + catch-all entry invalidated). + err := ensureBackgroundWorker(nil, "respawn", 5*time.Second) + require.Error(t, err, "first ensure() must hit the cap") + + // A retry must lazy-spawn a fresh thread (boot 4) that reaches + // readiness. Without invalidate, catchAllNames["respawn"] would + // still hold the prior aborted r and ensure() would short-circuit + // with the stale error. + require.Eventually(t, func() bool { + return ensureBackgroundWorker(nil, "respawn", 5*time.Second) == nil + }, 15*time.Second, 100*time.Millisecond, + "ensure() must lazy-spawn a fresh thread after the cap invalidates the slot") + + requireSentinelEventually(t, filepath.Join(tmp, "ready"), + "recovered worker must reach readiness") +} diff --git a/bgworkerinternalhelpers_test.go b/bgworkerinternalhelpers_test.go new file mode 100644 index 0000000000..cd028a84ba --- /dev/null +++ b/bgworkerinternalhelpers_test.go @@ -0,0 +1,32 @@ +package frankenphp + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// setupBgWorker boots FrankenPHP with the given options (internal-package +// variant), registers Shutdown as a t.Cleanup, and returns the absolute +// path to the testdata directory. +func setupBgWorker(t *testing.T, opts ...Option) (testDataDir string) { + t.Helper() + cwd, err := os.Getwd() + require.NoError(t, err) + testDataDir = cwd + "/testdata/" + require.NoError(t, Init(opts...)) + t.Cleanup(Shutdown) + return +} + +// requireSentinelEventually asserts that `path` appears on disk before the +// deadline. Wraps require.Eventually so call sites stay short. +func requireSentinelEventually(t testing.TB, path string, msgAndArgs ...any) { + t.Helper() + require.Eventually(t, func() bool { + _, err := os.Stat(path) + return err == nil + }, 5*time.Second, 10*time.Millisecond, msgAndArgs...) +} diff --git a/bgworkerpool_test.go b/bgworkerpool_test.go new file mode 100644 index 0000000000..591ce84965 --- /dev/null +++ b/bgworkerpool_test.go @@ -0,0 +1,70 @@ +package frankenphp_test + +import ( + "os" + "testing" + + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestBackgroundWorkerPool declares a named bg worker with num=3 (pool +// of three threads). All three threads should boot, share the same +// registered backgroundWorkerState, and the reader can see the pool's +// vars. This covers the lifted num>1 + max_threads>1 constraint. +func TestBackgroundWorkerPool(t *testing.T) { + testDataDir := setupFrankenPHP(t, + frankenphp.WithWorkers("pool-worker", "testdata/bgworker/pool.php", 3, + frankenphp.WithWorkerBackground(), + frankenphp.WithWorkerMaxThreads(3)), + frankenphp.WithNumThreads(6), + ) + + // Read the pool worker's vars via get_vars; all three threads share + // the same state so we don't need to target a specific one. ensure() + // waits for at least one pool thread's first set_vars so the eager + // start can't race the reader. + php := ` 0 { + return srv.Listen[0] + } + return "" +} + +// findHostInRoutes walks routes (recursing into Subroute handlers) to +// locate the route that contains target, then returns the first host of +// that route's host matcher. Returns "" if no enclosing route or no host +// matcher is found. +func findHostInRoutes(routes caddyhttp.RouteList, target caddyhttp.MiddlewareHandler) string { + for _, route := range routes { + if !routeContainsHandler(route, target) { + continue + } + for _, mset := range route.MatcherSets { + for _, m := range mset { + hp, ok := m.(*caddyhttp.MatchHost) + if !ok || hp == nil || len(*hp) == 0 { + continue + } + return (*hp)[0] + } + } + } + return "" +} + +func routeContainsHandler(route caddyhttp.Route, target caddyhttp.MiddlewareHandler) bool { + for _, h := range route.Handlers { + if h == target { + return true + } + if sub, ok := h.(*caddyhttp.Subroute); ok { + for _, r := range sub.Routes { + if routeContainsHandler(r, target) { + return true + } + } + } + } + return false +} + +// isAutoServerName reports whether name is one of Caddy's auto-assigned +// server names (srv0, srv1, ...). Anything else is treated as user-set. +func isAutoServerName(name string) bool { + if !strings.HasPrefix(name, "srv") || len(name) <= 3 { + return false + } + for _, c := range name[3:] { + if c < '0' || c > '9' { + return false + } + } + return true +} diff --git a/caddy/workerconfig.go b/caddy/workerconfig.go index c50f0d0688..2f51618d29 100644 --- a/caddy/workerconfig.go +++ b/caddy/workerconfig.go @@ -41,6 +41,8 @@ type workerConfig struct { MatchPath []string `json:"match_path,omitempty"` // MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick) MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"` + // Background marks this worker as a background (non-HTTP) worker. + Background bool `json:"background,omitempty"` options []frankenphp.WorkerOption requestOptions []frankenphp.RequestOption @@ -145,8 +147,10 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) { } wc.MaxConsecutiveFailures = v + case "background": + wc.Background = true default: - return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads", v) + return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads, background", v) } } @@ -154,6 +158,16 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) { return wc, d.Err(`the "file" argument must be specified`) } + if wc.Background { + // Named bg workers: num is the pool size; max_threads currently has + // no effect (no auto-scaling for bg workers in this build). Catch-all + // bg workers: num is unused at the declaration level, max_threads + // caps how many distinct names can be lazy-started via ensure(). + if len(wc.MatchPath) != 0 { + return wc, d.Err(`"match" is not supported for background workers`) + } + } + if frankenphp.EmbeddedAppPath != "" && filepath.IsLocal(wc.FileName) { wc.FileName = filepath.Join(frankenphp.EmbeddedAppPath, wc.FileName) } diff --git a/cgi.go b/cgi.go index d4d6a32d37..5a2ab36c17 100644 --- a/cgi.go +++ b/cgi.go @@ -44,7 +44,7 @@ var cStringHTTPMethods = map[string]*C.char{ // // TODO: handle this case https://github.com/caddyserver/caddy/issues/3718 // Inspired by https://github.com/caddyserver/caddy/blob/master/modules/caddyhttp/reverseproxy/fastcgi/fastcgi.go -func addKnownVariablesToServer(fc *frankenPHPContext, trackVarsArray *C.zval) { +func addKnownVariablesToServer(thread *phpThread, fc *frankenPHPContext, trackVarsArray *C.zval) { request := fc.request // Separate remote IP and port; more lenient than net.SplitHostPort var ip, port string @@ -113,6 +113,21 @@ func addKnownVariablesToServer(fc *frankenPHPContext, trackVarsArray *C.zval) { phpSelf := fc.scriptName + fc.pathInfo + // Worker identity for $_SERVER. Bg-worker threads override fc.worker + // because catch-all threads share the catch-all template *worker + // (whose name is the entrypoint path); the per-thread runtimeName is + // the actual ensure() argument. m# is stripped so PHP sees the user- + // facing name regardless of how the worker was registered. + var workerName string + var isBackgroundWorker bool + if bgHandler, ok := thread.handler.(*backgroundWorkerThread); ok { + workerName = strings.TrimPrefix(bgHandler.runtimeName, "m#") + isBackgroundWorker = true + } else if fc.worker != nil { + workerName = strings.TrimPrefix(fc.worker.name, "m#") + isBackgroundWorker = fc.worker.isBackgroundWorker + } + C.frankenphp_register_server_vars(trackVarsArray, C.frankenphp_server_vars{ // approximate total length to avoid array re-hashing: // 28 CGI vars + headers + environment @@ -156,6 +171,11 @@ func addKnownVariablesToServer(fc *frankenPHPContext, trackVarsArray *C.zval) { request_scheme: rs, // "http" or "https" ssl_protocol: sslProtocol, // values from tlsProtocol https: https, // "on" or empty + + // Worker identity (empty for non-worker requests) + worker_name: toUnsafeChar(workerName), + worker_name_len: C.size_t(len(workerName)), + is_background_worker: C._Bool(isBackgroundWorker), }) } @@ -188,7 +208,7 @@ func go_register_server_variables(threadIndex C.uintptr_t, trackVarsArray *C.zva fc := thread.frankenPHPContext() if fc.request != nil { - addKnownVariablesToServer(fc, trackVarsArray) + addKnownVariablesToServer(thread, fc, trackVarsArray) addHeadersToServer(thread.context(), fc.request, trackVarsArray) } diff --git a/context.go b/context.go index add8eff7bd..7eeef2fe5a 100644 --- a/context.go +++ b/context.go @@ -38,6 +38,10 @@ type frankenPHPContext struct { handlerParameters any handlerReturn any + // scope selects the per-php_server isolation boundary used by ensure() + // / get_vars calls made from this request. Zero is the global scope. + scope Scope + done chan any startedAt time.Time } diff --git a/docs/background-workers.md b/docs/background-workers.md new file mode 100644 index 0000000000..87c80417b2 --- /dev/null +++ b/docs/background-workers.md @@ -0,0 +1,269 @@ +# Background Workers + +Background workers are long-running PHP scripts that run outside the HTTP request cycle. +They observe their environment and publish variables that HTTP threads (both [workers](worker.md) and classic requests) can read in real time. + +## How It Works + +1. A background worker runs its own event loop (subscribe to Redis, watch files, poll an API, etc.). +2. It calls `frankenphp_set_vars()` to publish a snapshot of key-value pairs. +3. HTTP threads call `frankenphp_ensure_background_worker()` to declare a dependency and make sure the worker is running (lazy-started if needed, blocks until it has published at least once). +4. HTTP threads then call `frankenphp_get_vars()` to read the latest snapshot (pure read, no blocking, identical zval across repeated reads in one request). + +## Configuration + +Add `worker` directives with `background` to your [`php_server` or `php` block](config.md#caddyfile-config): + +```caddyfile +example.com { + php_server { + # Named background workers + worker /app/bin/console { + background + name config-watcher + } + worker /app/bin/console { + background + name feature-flags + } + + # Catch-all: handles any unlisted name via ensure_background_worker() + worker /app/bin/console { + background + } + } +} +``` + +- **Named** (with `name`): lazy-started on first `ensure_background_worker()` call. If `num` is set to a positive integer, that many threads start eagerly at boot (pool mode); with `num 0` (default) the first `ensure()` starts one thread. +- **Catch-all** (no `name`): lazy-started on demand for any name not matched by a `name` directive. `max_threads` caps the number of distinct names it can lazy-start (default 16). Without a catch-all, only declared names can be ensured. +- Each `php_server` block has its own isolated scope: two blocks can use the same worker names without conflict. +- `max_consecutive_failures`, `env`, and `watch` work the same as for HTTP workers. + +## PHP API + +### `frankenphp_ensure_background_worker(string|array $name, ?float $timeout = null): void` + +Declares a dependency on one or more background workers. Pass a single name or an array of names for batch dependency declaration; the timeout applies across all names in one call. `null` (the default) falls back to FrankenPHP's internal default deadline; a value `<= 0` raises `ValueError`. Behaviour depends on the caller context: + +- **In an HTTP worker script, before `frankenphp_handle_request()` (bootstrap)**: lazy-starts the worker (at-most-once) if not already running and blocks until it has called `set_vars()` at least once. Fails fast on boot failure (no exponential-backoff tolerance): if the first boot attempts fail, the exception is thrown right away with the captured details. Use this to declare dependencies up front so broken deps visibly fail the HTTP worker rather than let it serve degraded traffic. +- **Everywhere else (inside `frankenphp_handle_request()`, or classic request-per-process)**: lazy-starts the worker and waits up to `$timeout`, tolerating transient boot failures via exponential backoff. The first caller pays the startup cost; subsequent callers in the same FrankenPHP process see the worker already reserved and return almost immediately. This supports the common pattern of library code loaded after bootstrap declaring its own dependencies lazily. + +```php +// HTTP worker, bootstrap phase +frankenphp_ensure_background_worker('redis-watcher'); // fail-fast + +while (frankenphp_handle_request(function () { + $cfg = frankenphp_get_vars('redis-watcher'); // pure read +})) { gc_collect_cycles(); } + +// Non-worker mode, every request +frankenphp_ensure_background_worker('redis-watcher'); // tolerant +$cfg = frankenphp_get_vars('redis-watcher'); + +// Batch form, shared deadline across workers +frankenphp_ensure_background_worker(['redis-watcher', 'config-watcher'], 5.0); +``` + +- Throws `RuntimeException` on timeout, missing entrypoint, or boot failure. The exception contains the captured failure details when available: resolved entrypoint path, exit status, number of attempts, and the last PHP error (message, file, line). +- Pick a short `$timeout` (e.g. `1.0`) to fail fast; pick a longer one to tolerate slow/flaky startups. +- `ValueError` is raised for an empty names array; `TypeError` is raised if the array contains non-strings. + +### `frankenphp_get_vars(string $name): array` + +Pure read: returns the latest published vars from a running background worker. Does not start workers or wait for readiness. + +```php +$redis = frankenphp_get_vars('redis-watcher'); +// ['MASTER_HOST' => '10.0.0.1', 'MASTER_PORT' => 6379] +``` + +- Throws `RuntimeException` if the worker isn't running or hasn't called `set_vars()` yet. Call `frankenphp_ensure_background_worker()` first to ensure readiness. +- Within a single HTTP request, repeated calls with the same name return the **same** cached array: `$a === $b` holds, and the lookup is O(1) after the first call. +- Works in both worker and non-worker mode. + +### `frankenphp_set_vars(array $vars): void` + +Publishes vars from inside a background worker. Each call **replaces** the entire vars array atomically. + +Allowed value types: `null`, scalars (`bool`, `int`, `float`, `string`), nested `array`s whose values are also allowed types, and **enum** instances. Objects (other than enum cases), resources, and references are rejected. + +- Throws `RuntimeException` if not called from a background worker thread. +- Throws `ValueError` if values contain unsupported types. + +### `frankenphp_get_worker_handle(): resource` + +Returns a readable stream for receiving signals from FrankenPHP. On shutdown or restart the write end of the underlying pipe is closed, so `fgets()` returns `false` (EOF). Use `stream_select()` to wait between iterations instead of `sleep()`: + +```php +function background_worker_should_stop(float $timeout = 0): bool +{ + static $stream; + $stream ??= frankenphp_get_worker_handle(); + $s = (int) $timeout; + + return match (@stream_select(...[[$stream], [], [], $s, (int) (($timeout - $s) * 1e6)])) { + 0 => false, // timeout, keep going + false => true, // error, stop + default => false === fgets($stream), // EOF = stop + }; +} +``` + +> [!WARNING] +> Avoid `sleep()` or `usleep()` in background workers: they block at the C level and cannot be interrupted cleanly. Use `stream_select()` with the signaling stream instead. If a worker ignores the signal, FrankenPHP force-kills it on Linux, FreeBSD and Windows after a 30-second grace period (see `Runtime Behaviour`). + +## Examples + +### Simple polling worker + +```php + run_config_watcher(), + 'feature-flags' => run_feature_flags(), + default => throw new \RuntimeException("Unknown background worker: $command"), +}; + +function run_config_watcher(): void +{ + $redis = new Redis(); + $redis->pconnect('127.0.0.1'); + + do { + frankenphp_set_vars([ + 'maintenance' => (bool) $redis->get('maintenance_mode'), + 'feature_flags' => json_decode($redis->get('features'), true), + ]); + } while (!background_worker_should_stop(5.0)); // check every 5s +} +``` + +### Event-driven worker + +For real-time subscriptions (Redis pub/sub, SSE, WebSocket), use an async library and register the signaling stream on the event loop: + +```php +function run_redis_watcher(): void +{ + $signalingStream = frankenphp_get_worker_handle(); + $sentinel = Amp\Redis\createRedisClient('tcp://sentinel-host:26379'); + + $subscription = $sentinel->subscribe('+switch-master'); + + Amp\async(function () use ($subscription) { + foreach ($subscription as $message) { + [$name, $oldIp, $oldPort, $newIp, $newPort] = explode(' ', $message); + frankenphp_set_vars([ + 'MASTER_HOST' => $newIp, + 'MASTER_PORT' => (int) $newPort, + ]); + } + }); + + $master = $sentinel->rawCommand('SENTINEL', 'get-master-addr-by-name', 'mymaster'); + frankenphp_set_vars([ + 'MASTER_HOST' => $master[0], + 'MASTER_PORT' => (int) $master[1], + ]); + + Amp\EventLoop::onReadable($signalingStream, function ($id) use ($signalingStream) { + if (false === fgets($signalingStream)) { + Amp\EventLoop::cancel($id); // EOF = stop + } + }); + Amp\EventLoop::run(); +} +``` + +### HTTP worker depending on a background worker + +```php +boot(); + +// Declare dependencies once at bootstrap (fail-fast) +frankenphp_ensure_background_worker(['config-watcher', 'feature-flags']); + +while (frankenphp_handle_request(function () use ($app) { + $config = frankenphp_get_vars('config-watcher'); // pure read + + $_SERVER += [ + 'APP_REDIS_HOST' => $config['MASTER_HOST'], + 'APP_REDIS_PORT' => $config['MASTER_PORT'], + ]; + $app->handle($_GET, $_POST, $_COOKIE, $_FILES, $_SERVER); +})) { + gc_collect_cycles(); +} +``` + +### Non-worker mode + +```php + getenv('REDIS_HOST') ?: '127.0.0.1']; +} +``` + +## Runtime Behaviour + +- Background workers get dedicated threads: they do not reduce HTTP capacity. +- `max_execution_time` is automatically disabled for background workers. +- `$_SERVER['FRANKENPHP_WORKER']` carries the worker's declared (or catch-all-resolved) name. Pre-existing user code that only checks `isset($_SERVER['FRANKENPHP_WORKER'])` keeps working. +- `$_SERVER['FRANKENPHP_WORKER_BACKGROUND']` is `true` for background workers. +- `$_SERVER['argv'] = [$entrypoint, $name]` in background workers (for `bin/console`-style dispatching). +- Crash recovery: workers are automatically restarted with exponential backoff. During the restart window, `get_vars()` returns the last published data (stale but available) because vars are held in persistent memory across crashes. A warning is logged on crash. +- On shutdown/restart the signaling stream is closed (EOF). Well-behaved workers that check the stream exit within the 30-second grace period. Stuck workers are force-killed on Linux, FreeBSD, and Windows. + +## Readiness + +`ensure()` blocks until the worker has reached its main loop, which means **both**: + +1. The worker called `frankenphp_get_worker_handle()` (it has installed its drain signal and is parked in `stream_select`). +2. The worker called `frankenphp_set_vars()` at least once (it has published its initial state). + +Both halves are needed: a worker that registers the handle but never publishes vars hasn't actually finished bootstrapping, and a worker that publishes vars without holding the handle can't be drained gracefully. Each instance carries one combined-ready channel that closes exactly once when both halves have fired; `ensure()` waits on it (alongside an abort channel for `max_consecutive_failures` exhaustion and the per-call deadline). + +Readiness is sticky across crash-restarts: once a worker has announced "ready" once, the channel stays closed for any future `ensure()` caller, even after the script crashes and respawns. This is the right semantics for a long-lived dependency — callers don't pay the startup cost again if the worker is just briefly missing. + +If the worker crashes before reaching readiness, the boot-failure metadata (entrypoint, exit status, attempt count, and the captured `last_error_message`) is recorded on the readiness slot, so a timing-out `ensure()` raises a self-teaching `RuntimeException` with those details rather than a generic "did not call `frankenphp_get_worker_handle()` and `frankenphp_set_vars()` within Xs". + +For catch-all workers, each lazy-spawned name has its own readiness slot, so a stuck `foo` doesn't keep `ensure('bar')` waiting. For named pools (`num > 1`), the threads share one slot and the first to satisfy both halves wins. + +## Scoping + +Each `php_server` block gets its own isolated background-worker scope, so workers declared with the same `name` in different blocks do not collide. Resolution rules for `ensure()` / `get_vars()`: + +- A request inside a `php_server` block resolves first against that block's own declarations. If the block declares any background workers of its own, that lookup is authoritative and scope-isolated from every other block. +- A request inside a `php_server` block that declares **no** background workers falls back to the global/embed scope (workers declared at the top-level `frankenphp` directive or via the Go library). This makes a single globally-declared worker reachable from all otherwise-unconfigured blocks. +- Requests made outside any `php_server` block (e.g. when embedding FrankenPHP as a library) always resolve to the global/embed scope. + +## Limits + +- Named background workers with `num > 1` spin up a pool of threads that share the same published vars; `get_vars()` sees one consistent snapshot. +- Multiple named background workers in the same block can share the same entrypoint file. Each declaration keeps its own `env`, `watch`, and failure policy. +- Calling `ensure()` on a name that isn't declared and isn't covered by a catch-all raises `RuntimeException`. diff --git a/frankenphp.c b/frankenphp.c index 5e6ecb0f37..da69480173 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -16,6 +16,7 @@ #else #include #endif +#include
#include #include #include @@ -37,12 +38,7 @@ #include "_cgo_export.h" #include "frankenphp_arginfo.h" -#ifdef FRANKENPHP_TEST -/* The persistent_zval helpers are only compiled in when a consumer needs - * them. The step that lands the first real caller (background workers) - * will drop this guard. */ #include "zval.h" -#endif #if defined(PHP_WIN32) && defined(ZTS) ZEND_TSRMLS_CACHE_DEFINE() @@ -91,8 +87,37 @@ HashTable *main_thread_env = NULL; __thread uintptr_t thread_index; __thread bool is_worker_thread = false; +__thread bool is_background_worker = false; +__thread int worker_stop_fds[2] = {-1, -1}; +__thread php_stream *worker_signaling_stream = NULL; +__thread char *captured_last_php_error = NULL; __thread HashTable *sandboxed_env = NULL; +/* Per-thread cache for frankenphp_get_vars results. Maps worker name to + * { version, cached_zval }. When the Go side reports the version hasn't + * changed, the cached zval is returned with a refcount bump, giving the + * PHP caller the same HashTable pointer so repeated reads within a + * request run at O(1) without walking persistent memory every time. */ +typedef struct { + uint64_t version; + zval value; +} bg_vars_cache_entry; +__thread HashTable *bg_vars_cache = NULL; + +static void bg_vars_cache_dtor(zval *zv) { + bg_vars_cache_entry *entry = Z_PTR_P(zv); + zval_ptr_dtor(&entry->value); + free(entry); +} + +static void bg_vars_cache_reset(void) { + if (bg_vars_cache) { + zend_hash_destroy(bg_vars_cache); + free(bg_vars_cache); + bg_vars_cache = NULL; + } +} + #ifndef PHP_WIN32 static bool is_forked_child = false; static void frankenphp_fork_child(void) { is_forked_child = true; } @@ -203,13 +228,140 @@ void frankenphp_release_thread_for_kill(force_kill_slot slot) { #endif } +static void frankenphp_worker_close_stop_fds(void); + void frankenphp_update_local_thread_context(bool is_worker) { + /* A thread that previously ran as a bg worker can be recycled into an + * HTTP worker or a regular request thread; reset the bg-worker TLS so + * frankenphp_handle_request() and friends don't reject the caller. The + * cached worker_signaling_stream is already dangling here (its + * zend_resource was freed by php_request_shutdown's EG(regular_list) + * teardown), so just NULL it without dereferencing. */ + bool was_background_worker = is_background_worker; + is_worker_thread = is_worker; + is_background_worker = false; + + if (was_background_worker) { + frankenphp_worker_close_stop_fds(); + worker_signaling_stream = NULL; + } /* workers should keep running if the user aborts the connection */ PG(ignore_user_abort) = is_worker ? 1 : original_user_abort_setting; } +/* Background worker stop-pipe: anonymous pipe whose read end is exposed to + * the PHP script via frankenphp_get_worker_handle. When the Go side closes + * the write end (on drain), the read end reaches EOF so the script can + * return from stream_select and exit its loop. */ +static int frankenphp_worker_open_stop_pipe(void) { +#ifdef PHP_WIN32 + return _pipe(worker_stop_fds, 4096, _O_BINARY); +#else + return pipe(worker_stop_fds); +#endif +} + +static void frankenphp_worker_close_stop_fds(void) { + for (int i = 0; i < 2; i++) { + if (worker_stop_fds[i] >= 0) { +#ifdef PHP_WIN32 + _close(worker_stop_fds[i]); +#else + close(worker_stop_fds[i]); +#endif + worker_stop_fds[i] = -1; + } + } +} + +void frankenphp_set_background_worker(bool background) { + is_background_worker = background; + if (!background) { + return; + } + /* Drop the dangling stream pointer (see + * frankenphp_update_local_thread_context for why). */ + worker_signaling_stream = NULL; + /* Bg workers don't enforce max_execution_time; disarm any lingering timer. */ + zend_unset_timeout(); + + frankenphp_worker_close_stop_fds(); + if (frankenphp_worker_open_stop_pipe() != 0) { + worker_stop_fds[0] = -1; + worker_stop_fds[1] = -1; + } +} + +int frankenphp_worker_get_stop_fd_write(void) { + /* Transfer ownership to Go. Clearing the TLS slot prevents + * frankenphp_worker_close_stop_fds() from double-closing on recycle. */ + int fd = worker_stop_fds[1]; + worker_stop_fds[1] = -1; + return fd; +} + +void frankenphp_worker_close_fd(int fd) { + if (fd < 0) { + return; + } + /* Closing the write end of the stop pipe lands as EOF on the read end, + * so the PHP side's stream_select returns promptly. */ +#ifdef PHP_WIN32 + _close(fd); +#else + close(fd); +#endif +} + +static int frankenphp_worker_dup_fd(int fd) { +#ifdef PHP_WIN32 + return _dup(fd); +#else + return dup(fd); +#endif +} + +void frankenphp_copy_persistent_vars(zval *dst, void *persistent_ht) { + zval src; + ZVAL_ARR(&src, (HashTable *)persistent_ht); + persistent_zval_to_request(dst, &src); +} + +/* Capture PG(last_error_*) into the thread-local captured_last_php_error. + * Called before php_request_shutdown, which clears PG(last_error_*). + * Format: " in on line ". */ +static void frankenphp_capture_last_php_error(void) { + if (captured_last_php_error != NULL) { + free(captured_last_php_error); + captured_last_php_error = NULL; + } + if (PG(last_error_message) == NULL) { + return; + } + const char *msg = ZSTR_VAL(PG(last_error_message)); + size_t msg_len = ZSTR_LEN(PG(last_error_message)); + const char *file = + PG(last_error_file) ? ZSTR_VAL(PG(last_error_file)) : "unknown"; + size_t file_len = PG(last_error_file) ? ZSTR_LEN(PG(last_error_file)) : 7; + int line = PG(last_error_lineno); + size_t buf_len = msg_len + file_len + 32; + captured_last_php_error = malloc(buf_len); + if (captured_last_php_error != NULL) { + snprintf(captured_last_php_error, buf_len, "%.*s in %.*s on line %d", + (int)msg_len, msg, (int)file_len, file, line); + } +} + +/* Return and take ownership of the captured error; caller frees with + * C.free. NULL if nothing was captured. */ +char *frankenphp_get_last_php_error(void) { + char *s = captured_last_php_error; + captured_last_php_error = NULL; + return s; +} + static void frankenphp_update_request_context() { /* the server context is stored on the go side, still SG(server_context) needs * to not be NULL */ @@ -823,41 +975,280 @@ PHP_FUNCTION(frankenphp_log) { } } -#ifdef FRANKENPHP_TEST -/* Test-only entry point that exercises zval.h end-to-end: - * validate -> persist (request -> persistent memory) -> - * to_request (persistent -> fresh request memory) -> free persistent copy. - * Compiled only when FRANKENPHP_TEST is defined; never registered - * in production builds. */ -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX( - arginfo_frankenphp_test_persist_roundtrip, 0, 1, IS_MIXED, 0) -ZEND_ARG_TYPE_INFO(0, value, IS_MIXED, 0) -ZEND_END_ARG_INFO() +PHP_FUNCTION(frankenphp_set_vars) { + zval *vars = NULL; -PHP_FUNCTION(frankenphp_test_persist_roundtrip) { - zval *input; - ZEND_PARSE_PARAMETERS_START(1, 1) - Z_PARAM_ZVAL(input) + ZEND_PARSE_PARAMETERS_START(1, 1); + Z_PARAM_ARRAY(vars); ZEND_PARSE_PARAMETERS_END(); - if (!persistent_zval_validate(input)) { - zend_throw_exception(spl_ce_LogicException, - "persistent_zval: value type not supported " - "(only scalars, arrays, and enums are allowed)", - 0); - RETURN_THROWS(); + /* Validate every value up front so allocation only happens for a tree + * we can fully round-trip. */ + zval *val; + ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(vars), val) { + if (!persistent_zval_validate(val)) { + zend_value_error( + "frankenphp_set_vars(): values must be null, scalars, arrays, or " + "enums; objects (other than enums) and resources are not allowed"); + RETURN_THROWS(); + } } + ZEND_HASH_FOREACH_END(); zval persistent; - persistent_zval_persist(&persistent, input); - persistent_zval_to_request(return_value, &persistent); - persistent_zval_free(&persistent); + persistent_zval_persist(&persistent, vars); + + void *old = NULL; + char *error = + go_frankenphp_set_vars(thread_index, Z_ARRVAL(persistent), &old); + if (error) { + persistent_zval_free(&persistent); + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } + if (old != NULL) { + zval old_zv; + ZVAL_ARR(&old_zv, (HashTable *)old); + persistent_zval_free(&old_zv); + } } -static const zend_function_entry frankenphp_test_hook_functions[] = { - PHP_FE(frankenphp_test_persist_roundtrip, - arginfo_frankenphp_test_persist_roundtrip) PHP_FE_END}; -#endif +PHP_FUNCTION(frankenphp_get_vars) { + zend_string *name = NULL; + + ZEND_PARSE_PARAMETERS_START(1, 1); + Z_PARAM_STR(name); + ZEND_PARSE_PARAMETERS_END(); + + /* Look up a cached entry first so Go can short-circuit the copy when + * the background worker has not changed its vars since we last read + * them. On a cache hit we reuse the same zval, so $vars === $prev_vars + * holds across repeated reads within one request. */ + uint64_t caller_version = 0; + uint64_t out_version = 0; + bg_vars_cache_entry *cached = NULL; + if (bg_vars_cache) { + zval *entry_zv = zend_hash_find(bg_vars_cache, name); + if (entry_zv) { + cached = Z_PTR_P(entry_zv); + caller_version = cached->version; + } + } + + char *error = go_frankenphp_get_vars( + thread_index, (char *)ZSTR_VAL(name), ZSTR_LEN(name), return_value, + cached ? &caller_version : NULL, &out_version); + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } + + if (cached && out_version == caller_version) { + /* Cache hit: Go skipped the copy. Return the cached zval. */ + ZVAL_COPY(return_value, &cached->value); + return; + } + + /* Cache miss: store the fresh copy so subsequent reads within this + * request can be served without walking persistent memory. */ + if (!bg_vars_cache) { + bg_vars_cache = malloc(sizeof(HashTable)); + zend_hash_init(bg_vars_cache, 4, NULL, bg_vars_cache_dtor, 1); + } + bg_vars_cache_entry *entry = malloc(sizeof(*entry)); + entry->version = out_version; + ZVAL_COPY(&entry->value, return_value); + zval entry_zv; + ZVAL_PTR(&entry_zv, entry); + zend_hash_update(bg_vars_cache, name, &entry_zv); +} + +PHP_FUNCTION(frankenphp_ensure_background_worker) { + zval *names_zv; + double timeout = 0.0; + bool timeout_is_null = true; + + ZEND_PARSE_PARAMETERS_START(1, 2); + Z_PARAM_ZVAL(names_zv); + Z_PARAM_OPTIONAL + Z_PARAM_DOUBLE_OR_NULL(timeout, timeout_is_null); + ZEND_PARSE_PARAMETERS_END(); + + if (!timeout_is_null && timeout <= 0.0) { + zend_value_error("frankenphp_ensure_background_worker(): timeout must " + "be > 0 (pass null to use the default)"); + RETURN_THROWS(); + } + + /* Accept either a single string or an array of strings. For a single + * string we avoid the heap allocation by pointing at ZSTR fields + * directly via the on-stack one-element arrays. */ + char *single_name_ptr = NULL; + size_t single_name_len = 0; + char **name_ptrs = NULL; + size_t *name_lens = NULL; + int name_count = 0; + bool heap_allocated = false; + + if (Z_TYPE_P(names_zv) == IS_STRING) { + if (Z_STRLEN_P(names_zv) == 0) { + zend_value_error("frankenphp_ensure_background_worker(): name " + "must not be empty"); + RETURN_THROWS(); + } + /* Reject embedded NUL bytes: PHP's zend_string is length-tagged so + * arbitrary bytes are technically legal, but the bg-worker name flows + * through C-string-aware paths (logging, $_SERVER export) where an + * embedded NUL would silently truncate. Fail loudly instead. */ + if (memchr(Z_STRVAL_P(names_zv), '\0', Z_STRLEN_P(names_zv)) != NULL) { + zend_value_error("frankenphp_ensure_background_worker(): name " + "must not contain null bytes"); + RETURN_THROWS(); + } + single_name_ptr = Z_STRVAL_P(names_zv); + single_name_len = Z_STRLEN_P(names_zv); + name_ptrs = &single_name_ptr; + name_lens = &single_name_len; + name_count = 1; + } else if (Z_TYPE_P(names_zv) == IS_ARRAY) { + HashTable *ht = Z_ARRVAL_P(names_zv); + name_count = zend_hash_num_elements(ht); + if (name_count == 0) { + zend_value_error("frankenphp_ensure_background_worker(): names array " + "must not be empty"); + RETURN_THROWS(); + } + name_ptrs = emalloc(name_count * sizeof(*name_ptrs)); + name_lens = emalloc(name_count * sizeof(*name_lens)); + heap_allocated = true; + int idx = 0; + zval *v; + ZEND_HASH_FOREACH_VAL(ht, v) { + if (Z_TYPE_P(v) != IS_STRING) { + efree(name_ptrs); + efree(name_lens); + zend_type_error("frankenphp_ensure_background_worker(): names array " + "must contain only strings"); + RETURN_THROWS(); + } + if (Z_STRLEN_P(v) == 0) { + efree(name_ptrs); + efree(name_lens); + zend_value_error("frankenphp_ensure_background_worker(): names array " + "must not contain empty strings"); + RETURN_THROWS(); + } + /* Reject embedded NUL bytes: see single-string form above for the + * reasoning. Apply per-element so a mixed array is fully validated + * before any worker is started. */ + if (memchr(Z_STRVAL_P(v), '\0', Z_STRLEN_P(v)) != NULL) { + efree(name_ptrs); + efree(name_lens); + zend_value_error("frankenphp_ensure_background_worker(): names array " + "must not contain names with null bytes"); + RETURN_THROWS(); + } + /* Reject duplicates: O(n^2) is fine for the small batch sizes we + * expect here. */ + for (int j = 0; j < idx; j++) { + if (name_lens[j] == (size_t)Z_STRLEN_P(v) && + memcmp(name_ptrs[j], Z_STRVAL_P(v), name_lens[j]) == 0) { + efree(name_ptrs); + efree(name_lens); + zend_value_error( + "frankenphp_ensure_background_worker(): duplicate name %s", + Z_STRVAL_P(v)); + RETURN_THROWS(); + } + } + name_ptrs[idx] = Z_STRVAL_P(v); + name_lens[idx] = Z_STRLEN_P(v); + idx++; + } + ZEND_HASH_FOREACH_END(); + } else { + zend_type_error("frankenphp_ensure_background_worker(): name must be a " + "string or an array of strings"); + RETURN_THROWS(); + } + + /* Seconds (PHP) -> ms (Go time.Duration). null = use the Go-side + * default; <=0 was rejected upfront. */ + int64_t timeout_ms = 0; + if (!timeout_is_null) { + timeout_ms = (int64_t)(timeout * 1000.0); + } + char *error = go_frankenphp_ensure_background_worker( + thread_index, name_ptrs, name_lens, name_count, timeout_ms); + if (heap_allocated) { + efree(name_ptrs); + efree(name_lens); + } + + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } +} + +PHP_FUNCTION(frankenphp_get_worker_handle) { + ZEND_PARSE_PARAMETERS_NONE(); + + if (!is_background_worker) { + zend_throw_exception(spl_ce_RuntimeException, + "frankenphp_get_worker_handle() can only be called " + "from a background worker", + 0); + RETURN_THROWS(); + } + + /* Return the cached stream on repeat calls. The first call (below) is + * the readiness boundary that ensure() blocks on; subsequent calls just + * hand back the same stream. */ + if (worker_signaling_stream != NULL) { + php_stream_to_zval(worker_signaling_stream, return_value); + GC_ADDREF(Z_COUNTED_P(return_value)); + return; + } + + if (worker_stop_fds[0] < 0) { + zend_throw_exception(spl_ce_RuntimeException, + "failed to create background worker stop pipe", 0); + RETURN_THROWS(); + } + + /* DUP so closing the PHP stream doesn't affect worker_stop_fds[0]; the + * original stays owned by the C side for cleanup at worker restart. */ + int fd = frankenphp_worker_dup_fd(worker_stop_fds[0]); + if (fd < 0) { + zend_throw_exception(spl_ce_RuntimeException, + "failed to dup background worker stop fd", 0); + RETURN_THROWS(); + } + + php_stream *stream = php_stream_fopen_from_fd(fd, "rb", NULL); + if (!stream) { + frankenphp_worker_close_fd(fd); + zend_throw_exception(spl_ce_RuntimeException, + "failed to create stream from stop fd", 0); + RETURN_THROWS(); + } + + worker_signaling_stream = stream; + php_stream_to_zval(stream, return_value); + + /* Extra ref so PHP can't destroy the stream while TLS caches the pointer. */ + GC_ADDREF(Z_COUNTED_P(return_value)); + + /* Signal that this worker has reached its main loop. ensure() callers + * waiting on this worker unblock here. Idempotent on the Go side + * (sync.Once-guarded), so a worker that crashed and respawned signals + * again on each fresh boot without harm. */ + go_frankenphp_worker_ready(thread_index); +} PHP_MINIT_FUNCTION(frankenphp) { register_frankenphp_symbols(module_number); @@ -865,13 +1256,6 @@ PHP_MINIT_FUNCTION(frankenphp) { pthread_atfork(NULL, NULL, frankenphp_fork_child); #endif -#ifdef FRANKENPHP_TEST - if (zend_register_functions(NULL, frankenphp_test_hook_functions, NULL, - MODULE_PERSISTENT) == FAILURE) { - return FAILURE; - } -#endif - zend_function *func; // Override putenv @@ -1035,6 +1419,41 @@ void frankenphp_register_server_vars(zval *track_vars_array, ZVAL_EMPTY_STRING(&zv); zend_hash_update_ind(ht, frankenphp_strings.auth_type, &zv); zend_hash_update_ind(ht, frankenphp_strings.remote_ident, &zv); + + /* Worker identity in $_SERVER (HTTP and bg workers alike). The value is + * the user-facing worker name; pre-existing user code that only checks + * isset($_SERVER['FRANKENPHP_WORKER']) keeps working. */ + if (vars.worker_name && vars.worker_name_len > 0) { + zval name_zv; + ZVAL_STRINGL(&name_zv, vars.worker_name, vars.worker_name_len); + zend_hash_str_update(ht, "FRANKENPHP_WORKER", + sizeof("FRANKENPHP_WORKER") - 1, &name_zv); + } + if (vars.is_background_worker) { + zval bg_zv; + ZVAL_TRUE(&bg_zv); + zend_hash_str_update(ht, "FRANKENPHP_WORKER_BACKGROUND", + sizeof("FRANKENPHP_WORKER_BACKGROUND") - 1, &bg_zv); + + /* Bg workers have no real CLI argv; emulate one of the form + * [script_filename, worker_name] so user code can read the worker + * name via $argv[1] like a CLI script. Skipped when worker_name is + * empty (declared catch-all template threads with no identity). */ + if (vars.worker_name && vars.worker_name_len > 0 && vars.script_filename) { + zval argv_array; + array_init(&argv_array); + add_next_index_stringl(&argv_array, vars.script_filename, + vars.script_filename_len); + add_next_index_stringl(&argv_array, vars.worker_name, + vars.worker_name_len); + + zval argc_zv; + ZVAL_LONG(&argc_zv, 2); + + zend_hash_str_update(ht, "argv", sizeof("argv") - 1, &argv_array); + zend_hash_str_update(ht, "argc", sizeof("argc") - 1, &argc_zv); + } + } } /** Create an immutable zend_string that lasts for the whole process **/ @@ -1264,6 +1683,12 @@ static void *php_thread(void *arg) { zend_bailout(); } + /* php_request_startup re-arms max_execution_time; bg workers + * never enforce it, disarm again. */ + if (is_background_worker) { + zend_unset_timeout(); + } + zend_file_handle file_handle; zend_stream_init_filename(&file_handle, scriptName); @@ -1286,6 +1711,15 @@ static void *php_thread(void *arg) { has_attempted_shutdown = true; + /* Capture the last PHP error before php_request_shutdown clears it, + * so background-worker boot failures can surface the cause. */ + frankenphp_capture_last_php_error(); + + /* Invalidate the per-request get_vars cache before php_request_shutdown + * tears down request memory: the cached zvals live in request memory + * and can't be freed after shutdown runs. */ + bg_vars_cache_reset(); + /* shutdown the request, potential bailout to zend_catch */ php_request_shutdown((void *)0); frankenphp_free_request_context(); @@ -1305,6 +1739,7 @@ static void *php_thread(void *arg) { if (!has_attempted_shutdown) { /* php_request_shutdown() was not called, force a shutdown now */ reset_sandboxed_environment(); + bg_vars_cache_reset(); zend_try { php_request_shutdown((void *)0); } zend_catch {} zend_end_try(); diff --git a/frankenphp.go b/frankenphp.go index 52246d01c7..412c640f75 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -157,8 +157,16 @@ func Config() PHPConfig { func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { maxProcs := runtime.GOMAXPROCS(0) * 2 maxThreadsFromWorkers := 0 + // Bg workers reserve thread budget separately from HTTP workers so + // they don't double-count against the HTTP-worker admission check + // further down. + reservedThreads := reserveBackgroundWorkerThreads(opt) for i, w := range opt.workers { + if w.isBackgroundWorker { + continue + } + if w.num <= 0 { // https://github.com/php/frankenphp/issues/126 opt.workers[i].num = maxProcs @@ -200,7 +208,9 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { return 0, fmt.Errorf("num_threads (%d) must be greater than the number of worker threads (%d)", opt.numThreads, numWorkers) } - return numWorkers, nil + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads + return numWorkers + reservedThreads, nil } if maxThreadsIsSet && !numThreadsIsSet { @@ -209,7 +219,9 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { return 0, fmt.Errorf("max_threads (%d) must be greater than the number of worker threads (%d)", opt.maxThreads, numWorkers) } - return numWorkers, nil + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads + return numWorkers + reservedThreads, nil } if !numThreadsIsSet { @@ -221,7 +233,9 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { } opt.maxThreads = opt.numThreads - return numWorkers, nil + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads + return numWorkers + reservedThreads, nil } // both num_threads and max_threads are set @@ -233,7 +247,9 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { return 0, fmt.Errorf("max_threads (%d) must be greater than or equal to num_threads (%d)", opt.maxThreads, opt.numThreads) } - return numWorkers, nil + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads + return numWorkers + reservedThreads, nil } // Init starts the PHP runtime and the configured workers. diff --git a/frankenphp.h b/frankenphp.h index 31df007f18..fd69fdff7b 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -116,6 +116,12 @@ typedef struct frankenphp_server_vars { zend_string *request_scheme; zend_string *ssl_protocol; zend_string *https; + + /* Worker identity for $_SERVER. worker_name is m#-stripped Go-side, + * empty for non-worker requests. */ + char *worker_name; + size_t worker_name_len; + bool is_background_worker; } frankenphp_server_vars; /** @@ -226,6 +232,13 @@ size_t frankenphp_get_thread_memory_usage(uintptr_t thread_index); void frankenphp_force_kill_thread(force_kill_slot slot); void frankenphp_release_thread_for_kill(force_kill_slot slot); +/* Background worker primitives. */ +void frankenphp_set_background_worker(bool background); +int frankenphp_worker_get_stop_fd_write(void); +void frankenphp_worker_close_fd(int fd); +void frankenphp_copy_persistent_vars(zval *dst, void *persistent_ht); +char *frankenphp_get_last_php_error(void); + void register_extensions(zend_module_entry **m, int len); #endif diff --git a/frankenphp.stub.php b/frankenphp.stub.php index d6c85aa05f..7ed159f403 100644 --- a/frankenphp.stub.php +++ b/frankenphp.stub.php @@ -54,3 +54,46 @@ function mercure_publish(string|array $topics, string $data = '', bool $private * array $context Values of the array will be converted to the corresponding Go type (if supported by FrankenPHP) and added to the context of the structured logs using https://pkg.go.dev/log/slog#Attr */ function frankenphp_log(string $message, int $level = 0, array $context = []): void {} + +/** + * Declares a dependency on one or more background workers. Lazy-starts each + * worker that isn't already running, then blocks until every named worker + * has reached combined readiness — meaning it has called both + * frankenphp_get_worker_handle() AND frankenphp_set_vars() at least once + * — aborts after exhausting its max_consecutive_failures cap, or the + * shared timeout expires. Throws RuntimeException if no background + * worker is configured for any given name, if a worker fails to reach + * readiness within the timeout, or if a worker aborts during boot. + * + * The array form rejects empty arrays (ValueError), non-string elements + * (TypeError), empty strings, and duplicate names (ValueError) before + * any worker is started. + * + * @param string|string[] $name + * @param float|null $timeout deadline in seconds. null (the default) falls + * back to FrankenPHP's internal default + * timeout. A value <= 0 raises ValueError. + */ +function frankenphp_ensure_background_worker(string|array $name, ?float $timeout = null): void {} + +/** + * Publishes the given vars from a background worker. Only callable from a + * worker started with the `background` flag. Values must be null, scalars, + * arrays of allowed values, or enum cases. + */ +function frankenphp_set_vars(array $vars): void {} + +/** + * Reads the shared vars published by the named background worker. Throws if + * the worker is not declared, not running, or has not yet called set_vars. + */ +function frankenphp_get_vars(string $name): array {} + +/** + * Returns the stop-signal stream for the current background worker. The + * stream closes when FrankenPHP is draining the worker so the script can + * exit its loop gracefully. Only callable from inside a background worker. + * + * @return resource + */ +function frankenphp_get_worker_handle(): mixed {} diff --git a/frankenphp_arginfo.h b/frankenphp_arginfo.h index 4f2707cbca..d32063862d 100644 --- a/frankenphp_arginfo.h +++ b/frankenphp_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 60f0d27c04f94d7b24c052e91ef294595a2bc421 */ + * Stub hash: bc8623068235bd40c5eb8a2675e59ad928fc6ea9 */ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_handle_request, 0, 1, _IS_BOOL, 0) ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0) @@ -41,6 +41,21 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_log, 0, 1, IS_VOID, 0 ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, context, IS_ARRAY, 0, "[]") ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_ensure_background_worker, 0, 1, IS_VOID, 0) + ZEND_ARG_TYPE_MASK(0, name, MAY_BE_STRING|MAY_BE_ARRAY, NULL) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, timeout, IS_DOUBLE, 1, "null") +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_set_vars, 0, 1, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, vars, IS_ARRAY, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_get_vars, 0, 1, IS_ARRAY, 0) + ZEND_ARG_TYPE_INFO(0, name, IS_STRING, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_get_worker_handle, 0, 0, IS_MIXED, 0) +ZEND_END_ARG_INFO() ZEND_FUNCTION(frankenphp_handle_request); ZEND_FUNCTION(headers_send); @@ -49,7 +64,10 @@ ZEND_FUNCTION(frankenphp_request_headers); ZEND_FUNCTION(frankenphp_response_headers); ZEND_FUNCTION(mercure_publish); ZEND_FUNCTION(frankenphp_log); - +ZEND_FUNCTION(frankenphp_ensure_background_worker); +ZEND_FUNCTION(frankenphp_set_vars); +ZEND_FUNCTION(frankenphp_get_vars); +ZEND_FUNCTION(frankenphp_get_worker_handle); static const zend_function_entry ext_functions[] = { ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request) @@ -63,6 +81,10 @@ static const zend_function_entry ext_functions[] = { ZEND_FALIAS(apache_response_headers, frankenphp_response_headers, arginfo_apache_response_headers) ZEND_FE(mercure_publish, arginfo_mercure_publish) ZEND_FE(frankenphp_log, arginfo_frankenphp_log) + ZEND_FE(frankenphp_ensure_background_worker, arginfo_frankenphp_ensure_background_worker) + ZEND_FE(frankenphp_set_vars, arginfo_frankenphp_set_vars) + ZEND_FE(frankenphp_get_vars, arginfo_frankenphp_get_vars) + ZEND_FE(frankenphp_get_worker_handle, arginfo_frankenphp_get_worker_handle) ZEND_FE_END }; diff --git a/options.go b/options.go index a9cd2a2630..1f413dbddc 100644 --- a/options.go +++ b/options.go @@ -50,6 +50,8 @@ type workerOpt struct { onThreadShutdown func(int) onServerStartup func() onServerShutdown func() + isBackgroundWorker bool + scope Scope } // WithContext sets the main context to use. @@ -258,6 +260,31 @@ func WithWorkerOnServerShutdown(f func()) WorkerOption { } } +// EXPERIMENTAL: WithWorkerBackground marks this worker as a background +// (non-HTTP) worker. Background workers run outside the request cycle and +// publish shared variables via frankenphp_set_vars for HTTP threads to read +// via frankenphp_get_vars. +func WithWorkerBackground() WorkerOption { + return func(w *workerOpt) error { + w.isBackgroundWorker = true + + return nil + } +} + +// EXPERIMENTAL: WithWorkerScope assigns this worker to a given scope. +// Workers in the same scope share a background lookup; each php_server +// block gets its own scope so workers with the same name in different +// blocks don't collide. The zero value is the global/embed scope and is +// the default. +func WithWorkerScope(scope Scope) WorkerOption { + return func(w *workerOpt) error { + w.scope = scope + + return nil + } +} + func withExtensionWorkers(w *extensionWorkers) WorkerOption { return func(wo *workerOpt) error { wo.extensionWorkers = w diff --git a/phpthread.go b/phpthread.go index 5d94bd62f6..5c85e68918 100644 --- a/phpthread.go +++ b/phpthread.go @@ -105,6 +105,9 @@ func (thread *phpThread) shutdown() { return } + // Wake up handlers parked in a blocking C call (background workers' + // stream_select on the stop pipe). No-op for regular/worker handlers. + thread.handler.drain() close(thread.drainChan) // Arm force-kill after the grace period to wake any thread stuck in diff --git a/requestoptions.go b/requestoptions.go index 42cc3cf7c0..038898ccd9 100644 --- a/requestoptions.go +++ b/requestoptions.go @@ -154,6 +154,18 @@ func WithRequestLogger(logger *slog.Logger) RequestOption { } } +// WithRequestScope selects the scope for ensure / get_vars calls made +// from this request. Web-server integrations (such as the Caddy module) +// can call NextScope per independently-configured block to isolate its +// set of background workers. +func WithRequestScope(scope Scope) RequestOption { + return func(o *frankenPHPContext) error { + o.scope = scope + + return nil + } +} + // WithWorkerName sets the worker that should handle the request func WithWorkerName(name string) RequestOption { return func(o *frankenPHPContext) error { diff --git a/testdata/_executor.php b/testdata/_executor.php index 61a5319f11..31b87c79cb 100644 --- a/testdata/_executor.php +++ b/testdata/_executor.php @@ -1,7 +1,7 @@ 'cached-value', +]); + +$stream = frankenphp_get_worker_handle(); +if ($stream !== null) { + $read = [$stream]; + $write = null; + $except = null; + stream_select($read, $write, $except, null); +} diff --git a/testdata/background-worker-cache-identity.php b/testdata/background-worker-cache-identity.php new file mode 100644 index 0000000000..6200dd2072 --- /dev/null +++ b/testdata/background-worker-cache-identity.php @@ -0,0 +1,14 @@ + 'hello from background worker', + 'count' => 42, + 'ready_at' => microtime(true), +]); + +$stream = frankenphp_get_worker_handle(); +$read = [$stream]; +$write = null; +$except = null; +stream_select($read, $write, $except, null); diff --git a/testdata/bgworker/batch-ensure.php b/testdata/bgworker/batch-ensure.php new file mode 100644 index 0000000000..70e7903664 --- /dev/null +++ b/testdata/bgworker/batch-ensure.php @@ -0,0 +1,9 @@ +getMessage(), "\n"; +} diff --git a/testdata/bgworker/bg-flag.php b/testdata/bgworker/bg-flag.php new file mode 100644 index 0000000000..f83182f746 --- /dev/null +++ b/testdata/bgworker/bg-flag.php @@ -0,0 +1,20 @@ + $_SERVER['FRANKENPHP_WORKER'] ?? 'MISSING', + 'is_background' => $_SERVER['FRANKENPHP_WORKER_BACKGROUND'] ?? 'MISSING', +]); + +$stream = frankenphp_get_worker_handle(); +$read = [$stream]; +$write = null; +$except = null; +stream_select($read, $write, $except, null); diff --git a/testdata/bgworker/boot-fail.php b/testdata/bgworker/boot-fail.php new file mode 100644 index 0000000000..ab6d43d0bb --- /dev/null +++ b/testdata/bgworker/boot-fail.php @@ -0,0 +1,6 @@ + 1, 'phase' => 'pre-crash']); + file_put_contents($marker, '1'); + exit(1); +} + +frankenphp_set_vars(['count' => 2, 'phase' => 'post-restart']); + +$stream = frankenphp_get_worker_handle(); +$read = [$stream]; +$write = null; +$except = null; +stream_select($read, $write, $except, null); +@unlink($marker); diff --git a/testdata/bgworker/eager-catchall.php b/testdata/bgworker/eager-catchall.php new file mode 100644 index 0000000000..4a32c942ec --- /dev/null +++ b/testdata/bgworker/eager-catchall.php @@ -0,0 +1,16 @@ + marker so the script counts attempts; success path writes a +// "ready" sentinel. +set_time_limit(0); + +$markerDir = $_SERVER['BG_MARKER_DIR'] ?? ''; +if ($markerDir === '') { + exit(2); +} + +$attempt = 1; +while (file_exists($markerDir . '/boot' . $attempt)) { + $attempt++; +} +@touch($markerDir . '/boot' . $attempt); + +$failUntil = (int)($_SERVER['BG_FAIL_UNTIL'] ?? '2'); +if ($attempt <= $failUntil) { + exit(1); +} + +@touch($markerDir . '/ready'); +$stream = frankenphp_get_worker_handle(); +$read = [$stream]; +$write = null; +$except = null; +stream_select($read, $write, $except, null); diff --git a/testdata/bgworker/named.php b/testdata/bgworker/named.php new file mode 100644 index 0000000000..f392f316e3 --- /dev/null +++ b/testdata/bgworker/named.php @@ -0,0 +1,25 @@ + $name, + 'count' => 1, +]); + +$stream = frankenphp_get_worker_handle(); +$read = [$stream]; +$write = null; +$except = null; +stream_select($read, $write, $except, null); diff --git a/testdata/bgworker/no-handle.php b/testdata/bgworker/no-handle.php new file mode 100644 index 0000000000..a21029a1bc --- /dev/null +++ b/testdata/bgworker/no-handle.php @@ -0,0 +1,9 @@ + 1 means multiple threads share this worker name. +// Each thread runs in its own ZTS context but shares a single +// backgroundWorkerState (via the worker name), so set_vars writes from +// any pool thread are visible to readers. +frankenphp_set_vars([ + 'name' => $_SERVER['FRANKENPHP_WORKER'] ?? 'unknown', + 'pid' => getmypid(), +]); + +// Per-thread sentinel (random suffix) lets tests count distinct booted +// threads, which set_vars alone cannot show since the namespace is +// per-worker-name not per-thread. +if (!empty($_SERVER['BG_SENTINEL_DIR'])) { + $path = $_SERVER['BG_SENTINEL_DIR'] . '/pool-' . bin2hex(random_bytes(8)); + @touch($path); +} + +$stream = frankenphp_get_worker_handle(); +$read = [$stream]; +$write = null; +$except = null; +stream_select($read, $write, $except, null); diff --git a/testdata/bgworker/readiness-then-crash.php b/testdata/bgworker/readiness-then-crash.php new file mode 100644 index 0000000000..c012d51ffe --- /dev/null +++ b/testdata/bgworker/readiness-then-crash.php @@ -0,0 +1,8 @@ + 1]); + +// sleep() is interruptible by SIGRTMIN+3 on Linux/FreeBSD and by +// alertable-wait APCs on Windows; the test skips platforms where +// neither mechanism can interrupt it. +sleep(60); diff --git a/threadbackgroundworker.go b/threadbackgroundworker.go new file mode 100644 index 0000000000..77e196ce64 --- /dev/null +++ b/threadbackgroundworker.go @@ -0,0 +1,350 @@ +package frankenphp + +// #include "frankenphp.h" +import "C" +import ( + "context" + "fmt" + "log/slog" + "path/filepath" + "sync" + "sync/atomic" + "time" + "unsafe" + + "github.com/dunglas/frankenphp/internal/state" +) + +// backgroundWorkerState carries a bg-worker instance's shared vars and +// combined readiness signal. ready closes only when BOTH markHandle (from +// frankenphp_get_worker_handle) and markVars (from frankenphp_set_vars) +// have fired. mu serialises writer/reader access to varsPtr so the C side +// can pfree the old pointer safely after set_vars returns. +type backgroundWorkerState struct { + varsPtr unsafe.Pointer // *C.HashTable in persistent memory + mu sync.RWMutex + varsVersion atomic.Uint64 + + ready chan struct{} + closeReady sync.Once + hasHandle atomic.Bool + hasVars atomic.Bool + + // aborted unblocks ensure() waiters when the start sequence is + // abandoned before reaching ready. + aborted chan struct{} + abortOnce sync.Once + abortErr error + + // bootFailure carries a pre-readiness crash so a timing-out ensure() + // can surface it. + bootFailure atomic.Pointer[bootFailureInfo] +} + +func newBackgroundWorkerState() *backgroundWorkerState { + return &backgroundWorkerState{ + ready: make(chan struct{}), + aborted: make(chan struct{}), + } +} + +// markHandle / markVars fire on the first frankenphp_get_worker_handle() +// and frankenphp_set_vars() calls respectively. ready closes once both +// have fired. Both are idempotent. +func (sk *backgroundWorkerState) markHandle() { + sk.hasHandle.Store(true) + sk.tryCloseReady() +} + +func (sk *backgroundWorkerState) markVars() { + sk.hasVars.Store(true) + sk.tryCloseReady() +} + +func (sk *backgroundWorkerState) tryCloseReady() { + if sk.hasHandle.Load() && sk.hasVars.Load() { + sk.closeReady.Do(func() { close(sk.ready) }) + } +} + +// abort wakes ensure() waiters when the start sequence is abandoned. +// Idempotent. +func (sk *backgroundWorkerState) abort(err error) { + sk.abortOnce.Do(func() { + sk.abortErr = err + close(sk.aborted) + }) +} + +// backgroundWorkerThread handles background worker scripts. Owns its own +// lifecycle: boot, publish vars, loop, optionally crash-restart with +// exponential backoff. +type backgroundWorkerThread struct { + state *state.ThreadState + thread *phpThread + worker *worker + dummyFrankenPHPContext *frankenPHPContext + dummyContext context.Context + isBootingScript bool + failureCount int + + // runtimeName is the worker identity (kept m#-prefixed for metric + // label consistency); setupScript trims it at the PHP boundary. + runtimeName string + // backgroundWorker is this thread's state slot (combined readiness + + // shared vars): worker.backgroundWorker for named workers, + // worker.catchAllNames[runtimeName] for catch-all threads (so + // boot-failure / abort / markHandle / markVars stay per-name). + backgroundWorker *backgroundWorkerState + + // stopFdWrite is this thread's stop-pipe write end (per-thread so pool + // workers drain independently); read end is exposed to PHP via + // frankenphp_get_worker_handle(). + stopFdWrite atomic.Int32 +} + +func convertToBackgroundWorkerThread(thread *phpThread, worker *worker, runtimeName string, sk *backgroundWorkerState) { + handler := &backgroundWorkerThread{ + state: thread.state, + thread: thread, + worker: worker, + runtimeName: runtimeName, + backgroundWorker: sk, + } + handler.stopFdWrite.Store(-1) + thread.setHandler(handler) + worker.attachThread(thread) +} + +func (handler *backgroundWorkerThread) scopedWorker() *worker { return handler.worker } + +func (handler *backgroundWorkerThread) name() string { + if handler.runtimeName != "" && handler.runtimeName != handler.worker.name { + return "Background Worker PHP Thread - " + handler.worker.fileName + " (" + handler.runtimeName + ")" + } + return "Background Worker PHP Thread - " + handler.worker.fileName +} + +// isPostBoot samples the combined readiness channel (handle + set_vars) +// non-blockingly so afterScriptExecution can tell a boot crash from a +// post-boot crash. +func (handler *backgroundWorkerThread) isPostBoot() bool { + if handler.backgroundWorker == nil { + return false + } + select { + case <-handler.backgroundWorker.ready: + return true + default: + return false + } +} + +func (handler *backgroundWorkerThread) frankenPHPContext() *frankenPHPContext { + return handler.dummyFrankenPHPContext +} + +func (handler *backgroundWorkerThread) context() context.Context { + if handler.dummyContext != nil { + return handler.dummyContext + } + return globalCtx +} + +// drain is called by drainWorkerThreads (and thread.shutdown) right before +// drainChan is closed. We close the stop-pipe's write end so the PHP worker +// script, which is typically parked in stream_select on the read end, wakes +// up and can finish its loop gracefully. Per-thread fd so pool workers +// drain their threads independently. +func (handler *backgroundWorkerThread) drain() { + if fd := handler.stopFdWrite.Swap(-1); fd >= 0 { + C.frankenphp_worker_close_fd(C.int(fd)) + } +} + +func (handler *backgroundWorkerThread) beforeScriptExecution() string { + switch handler.state.Get() { + case state.TransitionRequested: + if handler.worker.onThreadShutdown != nil { + handler.worker.onThreadShutdown(handler.thread.threadIndex) + } + handler.worker.detachThread(handler.thread) + return handler.thread.transitionToNewHandler() + case state.Restarting: + if handler.worker.onThreadShutdown != nil { + handler.worker.onThreadShutdown(handler.thread.threadIndex) + } + handler.state.Set(state.Yielding) + handler.state.WaitFor(state.Ready, state.ShuttingDown) + return handler.beforeScriptExecution() + case state.Ready, state.TransitionComplete: + handler.thread.updateContext(true) + if handler.worker.onThreadReady != nil { + handler.worker.onThreadReady(handler.thread.threadIndex) + } + + handler.setupScript() + + return handler.worker.fileName + case state.ShuttingDown: + if handler.worker.onThreadShutdown != nil { + handler.worker.onThreadShutdown(handler.thread.threadIndex) + } + handler.worker.detachThread(handler.thread) + return "" + } + + panic("unexpected state: " + handler.state.Name()) +} + +func (handler *backgroundWorkerThread) setupScript() { + if handler.runtimeName == "" { + handler.runtimeName = handler.worker.name + } + if handler.backgroundWorker == nil { + handler.backgroundWorker = handler.worker.backgroundWorker + } + metrics.StartWorker(handler.runtimeName) + + opts := append([]RequestOption(nil), handler.worker.requestOptions...) + C.frankenphp_set_background_worker(C._Bool(true)) + handler.stopFdWrite.Store(int32(C.frankenphp_worker_get_stop_fd_write())) + + fc, err := newDummyContext( + filepath.Base(handler.worker.fileName), + opts..., + ) + if err != nil { + panic(err) + } + + ctx := context.WithValue(globalCtx, contextKey, fc) + + fc.worker = handler.worker + handler.dummyFrankenPHPContext = fc + handler.dummyContext = ctx + handler.isBootingScript = true + + if globalLogger.Enabled(ctx, slog.LevelDebug) { + globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting background worker", slog.String("worker", handler.runtimeName), slog.Int("thread", handler.thread.threadIndex)) + } + + handler.thread.state.Set(state.Ready) + fc.scriptFilename = handler.worker.fileName +} + +func (handler *backgroundWorkerThread) afterScriptExecution(exitStatus int) { + // frankenphp_worker_get_stop_fd_write transferred fd ownership to Go, + // so we must close on every exit path to avoid a leak. + if fd := handler.stopFdWrite.Swap(-1); fd >= 0 { + C.frankenphp_worker_close_fd(C.int(fd)) + } + worker := handler.worker + runtimeName := handler.runtimeName + if runtimeName == "" { + runtimeName = worker.name + } + handler.dummyFrankenPHPContext = nil + handler.dummyContext = nil + + // During Shutdown the drain's force-kill is armed against one slot; a + // freshly spawned pthread would re-enter the script and never exit. + if mainThread != nil && mainThread.state.Is(state.ShuttingDown) { + handler.thread.state.Set(state.ShuttingDown) + return + } + + // Cooperative exit: re-run. + if exitStatus == 0 && !handler.isBootingScript { + metrics.StopWorker(runtimeName, StopReasonRestart) + + if globalLogger.Enabled(globalCtx, slog.LevelDebug) { + globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "restarting background worker", slog.String("worker", runtimeName), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus)) + } + + return + } + + if handler.isBootingScript { + metrics.StopWorker(runtimeName, StopReasonBootFailure) + } else { + metrics.StopWorker(runtimeName, StopReasonCrash) + } + + if !handler.isBootingScript { + if globalLogger.Enabled(globalCtx, slog.LevelWarn) { + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "background worker crashed, restarting", slog.String("worker", runtimeName), slog.Int("thread", handler.thread.threadIndex), slog.Int("exit_status", exitStatus)) + } + + return + } + + // Pre-readiness crash: capture metadata (including PG(last_error_*) + // grabbed C-side before php_request_shutdown clears it) for a timing- + // out ensure(). + if !handler.isPostBoot() && handler.backgroundWorker != nil { + var phpError string + if cErr := C.frankenphp_get_last_php_error(); cErr != nil { + phpError = C.GoString(cErr) + C.free(unsafe.Pointer(cErr)) + } + handler.backgroundWorker.bootFailure.Store(&bootFailureInfo{ + entrypoint: worker.fileName, + exitStatus: exitStatus, + failureCount: handler.failureCount + 1, + phpError: phpError, + }) + } + + // max_consecutive_failures cap. For single-instance bg workers, + // abort and release the slot so a future ensure() can lazy-spawn a + // fresh thread. Eager pools skip this: other pool threads may still + // be alive. + if worker.maxConsecutiveFailures >= 0 && handler.failureCount >= worker.maxConsecutiveFailures { + isSingleInstance := worker.catchAllNames != nil || worker.num == 0 + if isSingleInstance && handler.backgroundWorker != nil { + handler.backgroundWorker.abort(fmt.Errorf("background worker %s exceeded max_consecutive_failures (%d, last exit status %d)", worker.fileName, worker.maxConsecutiveFailures, exitStatus)) + worker.invalidateBackgroundEntry(runtimeName) + } + if startupFailChan != nil && !watcherIsEnabled { + startupFailChan <- fmt.Errorf("too many consecutive failures: background worker %s has not reached frankenphp_set_vars()", worker.fileName) + handler.thread.state.Set(state.ShuttingDown) + return + } + if globalLogger.Enabled(globalCtx, slog.LevelWarn) { + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "background worker exceeded max_consecutive_failures, stopping respawn", slog.String("worker", runtimeName), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.failureCount)) + } + handler.thread.state.Set(state.ShuttingDown) + return + } + + if globalLogger.Enabled(globalCtx, slog.LevelWarn) { + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "background worker boot failed, restarting", slog.String("worker", runtimeName), slog.Int("thread", handler.thread.threadIndex), slog.Int("failures", handler.failureCount), slog.Int("exit_status", exitStatus)) + } + + backoffDuration := time.Duration(handler.failureCount*handler.failureCount*100) * time.Millisecond + if backoffDuration > time.Second { + backoffDuration = time.Second + } + handler.failureCount++ + time.Sleep(backoffDuration) +} + +// markBackgroundReady fires on the first set_vars after each (re)boot: +// resets the failure counter, clears the boot-failure record, and marks +// the vars half of the combined readiness signal. Idempotent within a +// boot. +func (handler *backgroundWorkerThread) markBackgroundReady() { + if !handler.isBootingScript { + return + } + + handler.failureCount = 0 + handler.isBootingScript = false + if handler.backgroundWorker != nil { + handler.backgroundWorker.bootFailure.Store(nil) + handler.backgroundWorker.markVars() + } + + metrics.ReadyWorker(handler.runtimeName) +} diff --git a/threadworker.go b/threadworker.go index 21e1034805..f35a952d3d 100644 --- a/threadworker.go +++ b/threadworker.go @@ -105,6 +105,8 @@ func (handler *workerThread) name() string { return "Worker PHP Thread - " + handler.worker.fileName } +func (handler *workerThread) scopedWorker() *worker { return handler.worker } + func (handler *workerThread) drain() {} func setupWorkerScript(handler *workerThread, worker *worker) { diff --git a/worker.go b/worker.go index fdc3098da6..26183157d0 100644 --- a/worker.go +++ b/worker.go @@ -33,6 +33,24 @@ type worker struct { onThreadReady func(int) onThreadShutdown func(int) queuedRequests atomic.Int32 + isBackgroundWorker bool + scope Scope + // backgroundWorker is shared state (vars + readiness) for named workers + // and a catch-all's eager pool. Lazy-spawned catch-all instances have + // their own slot in catchAllNames instead. + backgroundWorker *backgroundWorkerState + + // catchAllNames != nil marks this *worker as a scope's catch-all + // template; lazy-spawned threads register their per-name slot here, + // up to catchAllCap, under catchAllMu. + catchAllCap int + catchAllMu sync.Mutex + catchAllNames map[string]*backgroundWorkerState + + // bgLazyStartMu/bgLazyStarted gate the first thread spawn for a num=0 + // named bg worker. Unused for eager (num > 0) or catch-all templates. + bgLazyStartMu sync.Mutex + bgLazyStarted bool } var ( @@ -57,26 +75,47 @@ func initWorkers(opt []workerOpt) error { workersByName = make(map[string]*worker, len(opt)) workersByPath = make(map[string]*worker, len(opt)) - for _, o := range opt { + declared := make([]*worker, len(opt)) + for i, o := range opt { w, err := newWorker(o) if err != nil { return err } totalThreadsToStart += w.num + declared[i] = w workers = append(workers, w) - workersByName[w.name] = w + // Background workers are resolved per-scope via backgroundLookups + // so the same user-facing name can appear in multiple scopes + // without colliding in the global workersByName map. + if !w.isBackgroundWorker { + workersByName[w.name] = w + } if w.allowPathMatching { workersByPath[w.fileName] = w } } + // Build the per-scope lookups (named + catch-all per scope). Each + // php_server block gets its own scope; the global/embed scope is 0. + // Each declaration registers in its scope's lookup so lazy-start + // siblings inherit the template options. + var err error + backgroundLookups, err = buildBackgroundWorkerLookups(declared, opt) + if err != nil { + return err + } + startupFailChan = make(chan error, totalThreadsToStart) for _, w := range workers { for i := 0; i < w.num; i++ { thread := getInactivePHPThread() - convertToWorkerThread(thread, w) + if w.isBackgroundWorker { + convertToBackgroundWorkerThread(thread, w, w.name, w.backgroundWorker) + } else { + convertToWorkerThread(thread, w) + } workersReady.Go(func() { thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Done) @@ -121,21 +160,32 @@ func newWorker(o workerOpt) (*worker, error) { } // workers that have a name starting with "m#" are module workers - // they can only be matched by their name, not by their path - allowPathMatching := !strings.HasPrefix(o.name, "m#") - - if w := workersByPath[absFileName]; w != nil && allowPathMatching { - return w, fmt.Errorf("two workers cannot have the same filename: %q", absFileName) - } - if w := workersByName[o.name]; w != nil { - return w, fmt.Errorf("two workers cannot have the same name: %q", o.name) + // they can only be matched by their name, not by their path. + // Background workers are matched only by name, never by path, since + // they don't handle HTTP requests. + allowPathMatching := !strings.HasPrefix(o.name, "m#") && !o.isBackgroundWorker + + // Background workers are matched only by name, never by path. Multiple + // named bg workers can share an entrypoint file; uniqueness is enforced + // per-scope via backgroundLookups, not at the global path level. + if !o.isBackgroundWorker { + if w := workersByPath[absFileName]; w != nil && allowPathMatching { + return w, fmt.Errorf("two workers cannot have the same filename: %q", absFileName) + } + // Background workers are resolved through per-scope lookups, not the + // global workersByName map; the same user-facing name can appear in + // multiple php_server scopes without collision. + if w := workersByName[o.name]; w != nil { + return w, fmt.Errorf("two workers cannot have the same name: %q", o.name) + } } if o.env == nil { - o.env = make(PreparedEnv, 1) + o.env = make(PreparedEnv) } - o.env["FRANKENPHP_WORKER\x00"] = "1" + // $_SERVER['FRANKENPHP_WORKER'] is populated downstream from + // fc.worker.name via frankenphp_register_server_vars. w := &worker{ name: o.name, fileName: absFileName, @@ -148,10 +198,19 @@ func newWorker(o workerOpt) (*worker, error) { maxConsecutiveFailures: o.maxConsecutiveFailures, onThreadReady: o.onThreadReady, onThreadShutdown: o.onThreadShutdown, + isBackgroundWorker: o.isBackgroundWorker, + scope: o.scope, } + // backgroundWorker state is reserved lazily via the registry at + // thread-setup time, not here; lazy-start callers set it directly + // and eager inits go through setupScript's sync.Once. + w.configureMercure(&o) + // o.env is shared by reference across instances; treat it as + // read-only after init. Deep-clone if per-instance env is ever + // needed. w.requestOptions = append( w.requestOptions, WithRequestDocumentRoot(filepath.Dir(o.fileName), false), @@ -172,9 +231,16 @@ var drainGracePeriod = 30 * time.Second // Blocks until every drained thread yields. Force-kill is armed after a // grace period to wake threads parked in blocking syscalls (sleep, I/O). func DrainWorkers() { + // Defensive RLock paired with RestartWorkers' scalingMu.Lock to + // guard against future runtime mutators of the workers slice. + scalingMu.RLock() + defer scalingMu.RUnlock() _ = drainWorkerThreads() } +// drainWorkerThreads walks the global workers slice. The caller must +// ensure no concurrent mutation of the slice (either hold scalingMu or +// be called from a context where scaling is paused). func drainWorkerThreads() (drainedThreads []*phpThread) { var ready sync.WaitGroup @@ -255,6 +321,26 @@ func (worker *worker) attachThread(thread *phpThread) { worker.threadMutex.Unlock() } +// invalidateBackgroundEntry releases the registry slot held by a bg +// worker exiting on max_consecutive_failures so a future ensure() can +// lazy-spawn a fresh thread. Single-instance only (catch-all instances, +// lazy named). +func (worker *worker) invalidateBackgroundEntry(name string) { + if worker.catchAllNames != nil { + worker.catchAllMu.Lock() + delete(worker.catchAllNames, name) + worker.catchAllMu.Unlock() + return + } + if worker.num == 0 { + // Fresh slot so the retry isn't poisoned by the prior abort. + worker.bgLazyStartMu.Lock() + worker.backgroundWorker = newBackgroundWorkerState() + worker.bgLazyStarted = false + worker.bgLazyStartMu.Unlock() + } +} + func (worker *worker) detachThread(thread *phpThread) { worker.threadMutex.Lock() for i, t := range worker.threads {