From 014e7d3b8207f4571a2d02cba49b18382c2fcce0 Mon Sep 17 00:00:00 2001 From: Ruffled <105522716+RuffledPlume@users.noreply.github.com> Date: Wed, 20 May 2026 00:01:07 +0100 Subject: [PATCH 1/3] Balance Workers Sleeping Weight Sleeping time based on how long we spend Idle, this means instead of the fixed sleeping time, it'll be more accurate to the Systems Performance --- .../java/rs117/hd/utils/jobs/JobSystem.java | 2 +- src/main/java/rs117/hd/utils/jobs/Worker.java | 38 +++++++++++++++---- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/main/java/rs117/hd/utils/jobs/JobSystem.java b/src/main/java/rs117/hd/utils/jobs/JobSystem.java index 8c2fa91303..c96cf728aa 100644 --- a/src/main/java/rs117/hd/utils/jobs/JobSystem.java +++ b/src/main/java/rs117/hd/utils/jobs/JobSystem.java @@ -150,7 +150,7 @@ public boolean isWorker() { public boolean hasIdleWorkers() { for (Worker worker : workers) { - if (!worker.inflight.get()) + if (!worker.processing.get()) return true; } return false; diff --git a/src/main/java/rs117/hd/utils/jobs/Worker.java b/src/main/java/rs117/hd/utils/jobs/Worker.java index 697478c182..7922e67942 100644 --- a/src/main/java/rs117/hd/utils/jobs/Worker.java +++ b/src/main/java/rs117/hd/utils/jobs/Worker.java @@ -14,7 +14,10 @@ @Slf4j @RequiredArgsConstructor public final class Worker { - private static final long SLEEP_TIME_NANOS = TimeUnit.MICROSECONDS.convert(1, TimeUnit.NANOSECONDS); + static final long MIN_SLEEP_NANOS = 1_000; + static final long MAX_SLEEP_NANOS = 1_000_000; // 1ms + + static final double SLEEP_ALPHA = 0.1; String name, pausedName; Thread thread; @@ -25,16 +28,17 @@ public final class Worker { final int workerIdx; final ConcurrentLinkedDeque localWorkQueue = new ConcurrentLinkedDeque<>(); final ArrayDeque localStalledWork = new ArrayDeque<>(); - final AtomicBoolean inflight = new AtomicBoolean(); + final AtomicBoolean processing = new AtomicBoolean(); boolean findNextStealTarget() { // Find the best target to steal work from int nextVictimIdx = -1; int nextVictimWorkCount = -1; for (int i = 0; i < jobSystem.workers.length; i++) { - if (i == workerIdx || !jobSystem.workers[i].inflight.get()) + final Worker worker = jobSystem.workers[i]; + if (i == workerIdx || !worker.processing.get() || worker.localWorkQueue.isEmpty()) continue; // Don't query ourselves or a worker that is idle - int workCount = jobSystem.workers[i].localWorkQueue.size(); + int workCount = worker.localWorkQueue.size(); if (workCount > nextVictimWorkCount) { nextVictimIdx = i; nextVictimWorkCount = workCount; @@ -48,11 +52,14 @@ boolean findNextStealTarget() { void run() { name = thread.getName(); pausedName = name + " [Paused]"; + + long spinWaitNanos = TimeUnit.MICROSECONDS.convert(1, TimeUnit.NANOSECONDS); while (jobSystem.active) { // Check local work queue handle = (localStalledWork.isEmpty() ? localWorkQueue : localStalledWork).poll(); - long waitStart = handle == null ? System.nanoTime() : 0; + long idleStart = handle == null ? System.nanoTime() : 0; + long idleTime = 0; while (handle == null) { if (stealTargetIdx >= 0) { final Worker victim = jobSystem.workers[stealTargetIdx]; @@ -76,7 +83,8 @@ void run() { handle = localStalledWork.isEmpty() ? jobSystem.workQueue.poll() : localStalledWork.poll(); } - if (handle == null && !findNextStealTarget() && System.nanoTime() - waitStart > SLEEP_TIME_NANOS) { + idleTime = System.nanoTime() - idleStart; + if (handle == null && !findNextStealTarget() && idleTime > spinWaitNanos) { // Wait for a signal that there is work to be had try { jobSystem.workerSemaphore.acquire(); @@ -91,16 +99,33 @@ void run() { } } + if(handle == null) + Thread.onSpinWait(); + if (!jobSystem.active) { log.trace("Shutdown"); return; } } + if (idleStart != 0) { + spinWaitNanos = clamp( + (long) ( + spinWaitNanos * (1.0 - SLEEP_ALPHA) + + idleTime * SLEEP_ALPHA + ), + MIN_SLEEP_NANOS, + MAX_SLEEP_NANOS + ); + } + try { + processing.set(true); processHandle(); } catch (InterruptedException ignored) { thread.isInterrupted(); // Consume the interrupt to prevent it from cancelling the next job + } finally { + processing.set(false); } } log.trace("Shutdown"); @@ -114,7 +139,6 @@ void processHandle() throws InterruptedException { if (handle.item != null) { if (handle.item.canStart()) { if (handle.setRunning(this)) { - inflight.set(true); handle.item.onRun(); handle.item.ranToCompletion.set(true); } From 04f928666c99394e9d60b223a84e86a0954530c3 Mon Sep 17 00:00:00 2001 From: Ruffled <105522716+RuffledPlume@users.noreply.github.com> Date: Wed, 20 May 2026 12:18:18 +0100 Subject: [PATCH 2/3] JNI Platform Bindings Added SetAffinity to hint to the Platform Kernel that workers should be pinned to specific cores Verify that CPU Pinning is working, log if its not Determine Support for each binding instead of needing all to pass --- .../java/rs117/hd/utils/jobs/JobSystem.java | 2 +- src/main/java/rs117/hd/utils/jobs/Worker.java | 23 ++++ .../hd/utils/platform/DummyBindings.java | 5 + .../hd/utils/platform/LinuxBindings.java | 42 +++++++ .../rs117/hd/utils/platform/MacBindings.java | 80 +++++++++++++ .../hd/utils/platform/PlatformBindings.java | 108 ++++++++++++++++++ .../hd/utils/platform/WindowsBindings.java | 36 ++++++ 7 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 src/main/java/rs117/hd/utils/platform/DummyBindings.java create mode 100644 src/main/java/rs117/hd/utils/platform/LinuxBindings.java create mode 100644 src/main/java/rs117/hd/utils/platform/MacBindings.java create mode 100644 src/main/java/rs117/hd/utils/platform/PlatformBindings.java create mode 100644 src/main/java/rs117/hd/utils/platform/WindowsBindings.java diff --git a/src/main/java/rs117/hd/utils/jobs/JobSystem.java b/src/main/java/rs117/hd/utils/jobs/JobSystem.java index c96cf728aa..a97a15d466 100644 --- a/src/main/java/rs117/hd/utils/jobs/JobSystem.java +++ b/src/main/java/rs117/hd/utils/jobs/JobSystem.java @@ -61,7 +61,7 @@ public void startUp(CpuUsageLimit cpuUsageLimit) { for (int i = 0; i < workerCount; i++) { Worker worker = workers[i] = new Worker(this, i); worker.thread = new Thread(worker::run); - worker.thread.setPriority(Thread.NORM_PRIORITY + 1); + worker.thread.setPriority(Thread.NORM_PRIORITY + 3); worker.thread.setName("117HD - Worker " + i); threadToWorker.put(worker.thread, worker); } diff --git a/src/main/java/rs117/hd/utils/jobs/Worker.java b/src/main/java/rs117/hd/utils/jobs/Worker.java index 7922e67942..3f1b629220 100644 --- a/src/main/java/rs117/hd/utils/jobs/Worker.java +++ b/src/main/java/rs117/hd/utils/jobs/Worker.java @@ -6,6 +6,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import rs117.hd.utils.Props; +import rs117.hd.utils.platform.PlatformBindings; import static rs117.hd.utils.HDUtils.getThreadStackTrace; import static rs117.hd.utils.MathUtils.*; @@ -53,11 +55,32 @@ void run() { name = thread.getName(); pausedName = name + " [Paused]"; + int affinityCore = workerIdx + 1; + if(PlatformBindings.setAffinity(1L << affinityCore)) + log.trace("Set worker {} affinity to {}", workerIdx, affinityCore); + long spinWaitNanos = TimeUnit.MICROSECONDS.convert(1, TimeUnit.NANOSECONDS); + long nextCPUCoreCheck = Props.DEVELOPMENT ? 0 : -1; + while (jobSystem.active) { // Check local work queue handle = (localStalledWork.isEmpty() ? localWorkQueue : localStalledWork).poll(); + // Check if the worker is on the correct core, helps determine if CPU Pinning is working correctly on platforms + if(nextCPUCoreCheck >= 0 && System.nanoTime() > nextCPUCoreCheck) { + int cpu = PlatformBindings.getCpu(); + if (cpu != -1) { + if (cpu != affinityCore) { + log.warn("Expected worker {} to be on core {}, but it is on core {}", workerIdx, affinityCore, cpu); + nextCPUCoreCheck = -1L; + } else { + nextCPUCoreCheck = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); + } + } else { + nextCPUCoreCheck = -1L; + } + } + long idleStart = handle == null ? System.nanoTime() : 0; long idleTime = 0; while (handle == null) { diff --git a/src/main/java/rs117/hd/utils/platform/DummyBindings.java b/src/main/java/rs117/hd/utils/platform/DummyBindings.java new file mode 100644 index 0000000000..e5d9140fa1 --- /dev/null +++ b/src/main/java/rs117/hd/utils/platform/DummyBindings.java @@ -0,0 +1,5 @@ +package rs117.hd.utils.platform; + +public class DummyBindings extends PlatformBindings { + DummyBindings() { super(""); } +} diff --git a/src/main/java/rs117/hd/utils/platform/LinuxBindings.java b/src/main/java/rs117/hd/utils/platform/LinuxBindings.java new file mode 100644 index 0000000000..6a1a888526 --- /dev/null +++ b/src/main/java/rs117/hd/utils/platform/LinuxBindings.java @@ -0,0 +1,42 @@ +package rs117.hd.utils.platform; + +import lombok.extern.slf4j.Slf4j; +import org.lwjgl.system.MemoryStack; + +import static org.lwjgl.system.JNI.invokeI; +import static org.lwjgl.system.JNI.invokeP; +import static org.lwjgl.system.JNI.invokePPPI; +import static org.lwjgl.system.MemoryUtil.memPutLong; +import static org.lwjgl.system.Pointer.POINTER_SIZE; + +@Slf4j +public final class LinuxBindings extends PlatformBindings { + private static final int CPU_SET_BYTES = 128; + + private final long pthread_self = findFunction("pthread_self"); + private final long pthread_setaffinity_np = findFunction("pthread_setaffinity_np"); + private final long sched_getcpu = findFunction("sched_getcpu"); + + // https://man7.org/linux/man-pages + LinuxBindings() { super("libc.so.6"); } + + @Override + boolean supportsSetAffinity() { return pthread_self != 0 && pthread_setaffinity_np != 0; } + + @Override + boolean setAffinityImpl(long mask) { + try (MemoryStack stack = MemoryStack.stackPush()) { + long cpuset = stack.ncalloc(POINTER_SIZE, CPU_SET_BYTES, 1); + memPutLong(cpuset, mask); + + long thread = invokeP(pthread_self); + return invokePPPI(thread, CPU_SET_BYTES, cpuset, pthread_setaffinity_np) == 0; + } + } + + @Override + boolean supportsGetCpu() { return sched_getcpu != 0; } + + @Override + int getCpuImpl() { return invokeI(sched_getcpu); } +} \ No newline at end of file diff --git a/src/main/java/rs117/hd/utils/platform/MacBindings.java b/src/main/java/rs117/hd/utils/platform/MacBindings.java new file mode 100644 index 0000000000..5bd223b18d --- /dev/null +++ b/src/main/java/rs117/hd/utils/platform/MacBindings.java @@ -0,0 +1,80 @@ +package rs117.hd.utils.platform; + +import java.nio.ByteBuffer; +import lombok.extern.slf4j.Slf4j; +import org.lwjgl.BufferUtils; +import org.lwjgl.PointerBuffer; +import org.lwjgl.system.MemoryStack; +import org.lwjgl.system.libffi.FFICIF; + +import static org.lwjgl.system.JNI.invokeP; +import static org.lwjgl.system.JNI.invokePI; +import static org.lwjgl.system.libffi.LibFFI.FFI_DEFAULT_ABI; +import static org.lwjgl.system.libffi.LibFFI.FFI_OK; +import static org.lwjgl.system.libffi.LibFFI.ffi_call; +import static org.lwjgl.system.libffi.LibFFI.ffi_prep_cif; +import static org.lwjgl.system.libffi.LibFFI.ffi_type_pointer; +import static org.lwjgl.system.libffi.LibFFI.ffi_type_sint32; + +@Slf4j +public final class MacBindings extends PlatformBindings { + private static final int THREAD_AFFINITY_POLICY = 4; + + private final long pthread_self = findFunction("pthread_self"); + private final long pthread_mach_thread_np = findFunction("pthread_mach_thread_np"); + private final long thread_policy_set = findFunction("thread_policy_set"); + + private FFICIF CIF; + + // https://developer.apple.com/library/archive/documentation/Darwin/Conceptual/KernelProgramming + MacBindings() { super("/usr/lib/libSystem.B.dylib"); } + + @Override + boolean init() { + CIF = FFICIF.malloc(); + + PointerBuffer args = BufferUtils.createPointerBuffer(4); + args.put(ffi_type_sint32); + args.put(ffi_type_sint32); + args.put(ffi_type_pointer); + args.put(ffi_type_sint32); + args.flip(); + + int status = ffi_prep_cif( + CIF, + FFI_DEFAULT_ABI, + ffi_type_sint32, + args + ); + + if (status != FFI_OK) { + log.error("ffi_prep_cif failed: {}", status); + return false; + } + + return true; + } + + @Override + boolean supportsSetAffinity() { return pthread_self != 0 && pthread_mach_thread_np != 0 && thread_policy_set != 0; } + + @Override + boolean setAffinityImpl(long mask) { + try (MemoryStack stack = MemoryStack.stackPush()) { + long pthread = invokeP(pthread_self); + int machThread = invokePI(pthread, pthread_mach_thread_np); + int policyValue = (int) (mask & 0x7FFFFFFF); + + PointerBuffer args = stack.mallocPointer(4); + args.put(0, stack.ints(machThread)); + args.put(1, stack.ints(THREAD_AFFINITY_POLICY)); + args.put(2, stack.ints(policyValue)); + args.put(3, stack.ints(1)); + + ByteBuffer result = stack.malloc(4); + ffi_call(CIF, thread_policy_set, result, args); + + return result.getInt(0) == 0; + } + } +} diff --git a/src/main/java/rs117/hd/utils/platform/PlatformBindings.java b/src/main/java/rs117/hd/utils/platform/PlatformBindings.java new file mode 100644 index 0000000000..73348f9b88 --- /dev/null +++ b/src/main/java/rs117/hd/utils/platform/PlatformBindings.java @@ -0,0 +1,108 @@ +package rs117.hd.utils.platform; + +import lombok.extern.slf4j.Slf4j; +import net.runelite.client.util.OSType; +import org.lwjgl.system.SharedLibrary; + +import static org.lwjgl.system.APIUtil.apiCreateLibrary; +import static org.lwjgl.system.APIUtil.apiGetFunctionAddress; + +@Slf4j +public abstract class PlatformBindings { + private static PlatformBindings BINDINGS; + + static synchronized PlatformBindings createBindings() { + if (BINDINGS != null) + return BINDINGS; + + final OSType osType = OSType.getOSType(); + if(osType != OSType.Windows && osType != OSType.Linux && osType != OSType.MacOS) { + BINDINGS = new DummyBindings(); + log.debug("Unknown Platform, falling back to dummy bindings:"); + return BINDINGS; + } + + try { + final PlatformBindings candidate; + switch (OSType.getOSType()) { + case Windows: + candidate = new WindowsBindings(); + break; + case Linux: + candidate = new LinuxBindings(); + break; + case MacOS: + candidate = new MacBindings(); + break; + default: + candidate = new DummyBindings(); + break; + } + + if(candidate.init()) { + BINDINGS = candidate; + log.info("Initialized platform bindings"); + } + } catch (Throwable t) { + log.error(t.toString()); + } + + if(BINDINGS == null) { + BINDINGS = new DummyBindings(); + log.warn("Failed to initialize platform bindings, falling back to dummy bindings"); + } + + return BINDINGS; + } + + private final String libraryName; + private SharedLibrary library; + + PlatformBindings(String libraryName) { + this.libraryName = libraryName; + } + + boolean init() { return true; } + + protected long findFunction(String name) { + if(library == null) + library = apiCreateLibrary(libraryName); + return apiGetFunctionAddress(library, name); + } + + public static boolean setAffinity(long mask) { + if (BINDINGS == null) + createBindings(); + + if(BINDINGS.supportsSetAffinity()) { + try { + return BINDINGS.setAffinityImpl(mask); + } catch (Throwable t) { + log.error("Failed to set affinity", t); + } + } + + return false; + } + + boolean supportsSetAffinity() { return false; } + boolean setAffinityImpl(long mask) { return false; } + + public static int getCpu() { + if (BINDINGS == null) + createBindings(); + + if(BINDINGS.supportsGetCpu()) { + try { + return BINDINGS.getCpuImpl(); + } catch (Throwable t) { + log.error("Failed to get Cpu", t); + } + } + + return -1; + } + + boolean supportsGetCpu() { return false; } + int getCpuImpl() { return -1; } +} diff --git a/src/main/java/rs117/hd/utils/platform/WindowsBindings.java b/src/main/java/rs117/hd/utils/platform/WindowsBindings.java new file mode 100644 index 0000000000..486fd3a554 --- /dev/null +++ b/src/main/java/rs117/hd/utils/platform/WindowsBindings.java @@ -0,0 +1,36 @@ +package rs117.hd.utils.platform; + +import lombok.extern.slf4j.Slf4j; +import org.lwjgl.system.Pointer; + +import static org.lwjgl.system.JNI.invokeI; +import static org.lwjgl.system.JNI.invokeP; +import static org.lwjgl.system.JNI.invokePP; + +@Slf4j +public final class WindowsBindings extends PlatformBindings { + private final long GetCurrentThread = findFunction("GetCurrentThread"); + private final long SetThreadAffinityMask = findFunction("SetThreadAffinityMask"); + private final long GetCurrentProcessorNumber = findFunction("GetCurrentProcessorNumber"); + + // https://learn.microsoft.com/en-us/windows/win32/ + WindowsBindings() { super("kernel32"); } + + @Override + boolean supportsSetAffinity() { return GetCurrentThread != 0L && SetThreadAffinityMask != 0L; } + + @Override + boolean setAffinityImpl(long mask) { + if (Pointer.POINTER_SIZE == 4 && (mask >>> 32) != 0) + throw new IllegalArgumentException("Mask exceeds 32-bit width"); + + long thread = invokeP(GetCurrentThread); + return invokePP(thread, mask, SetThreadAffinityMask) == 0; + } + + @Override + boolean supportsGetCpu() { return GetCurrentProcessorNumber != 0; } + + @Override + int getCpuImpl() { return invokeI(GetCurrentProcessorNumber); } +} From 6db924d245eec2bdaeac57dcf428bfd9eb9abcfa Mon Sep 17 00:00:00 2001 From: Ruffled <105522716+RuffledPlume@users.noreply.github.com> Date: Wed, 20 May 2026 23:02:39 +0100 Subject: [PATCH 3/3] Simplify JobSystem Worker Queue * Move away from CAS due to being allocation heavy * Removed the Global Work Queue, in favour of round robining work to different workers local queues instead * Removed Stalled Work queue since it was complicating the work queue Current performance bottleneck of workers is actually being provided work, workers are finishing work fast enough to be parked due to not finding work to steal --- .../rs117/hd/overlays/FrameTimerOverlay.java | 6 - .../hd/renderer/zone/AsyncCachedModel.java | 8 +- .../rs117/hd/renderer/zone/SceneManager.java | 2 +- src/main/java/rs117/hd/utils/jobs/Job.java | 2 - .../java/rs117/hd/utils/jobs/JobHandle.java | 15 +- .../java/rs117/hd/utils/jobs/JobSystem.java | 111 +++++---- src/main/java/rs117/hd/utils/jobs/Worker.java | 225 +++++++++--------- 7 files changed, 173 insertions(+), 196 deletions(-) diff --git a/src/main/java/rs117/hd/overlays/FrameTimerOverlay.java b/src/main/java/rs117/hd/overlays/FrameTimerOverlay.java index d596fc9efb..d2b62a1227 100644 --- a/src/main/java/rs117/hd/overlays/FrameTimerOverlay.java +++ b/src/main/java/rs117/hd/overlays/FrameTimerOverlay.java @@ -243,12 +243,6 @@ public Dimension render(Graphics2D g) { .right(String.valueOf(subSceneCount)) .build()); - - children.add(LineComponent.builder() - .left("Streaming Zones:") - .right(String.valueOf(jobSystem.getWorkQueueSize())) - .build()); - if (frameTimingsRecorder.isCapturingSnapshot()) children.add(LineComponent.builder() .leftFont(boldFont) diff --git a/src/main/java/rs117/hd/renderer/zone/AsyncCachedModel.java b/src/main/java/rs117/hd/renderer/zone/AsyncCachedModel.java index 30f20ae832..67937d4ea5 100644 --- a/src/main/java/rs117/hd/renderer/zone/AsyncCachedModel.java +++ b/src/main/java/rs117/hd/renderer/zone/AsyncCachedModel.java @@ -297,12 +297,8 @@ public synchronized void queue( texIndices3.cache(model, model.getTexIndices3()); } - @Override - protected boolean canStart() { - if (processing.get()) // Work has been stolen, so pop it off the queue - return true; - - return + boolean canStart() { + return !processing.get() && verticesX.cached && verticesY.cached && verticesZ.cached && faceIndices1.cached && faceIndices2.cached && faceIndices3.cached && faceColors3.cached; diff --git a/src/main/java/rs117/hd/renderer/zone/SceneManager.java b/src/main/java/rs117/hd/renderer/zone/SceneManager.java index f7258f3620..a1794d5a16 100644 --- a/src/main/java/rs117/hd/renderer/zone/SceneManager.java +++ b/src/main/java/rs117/hd/renderer/zone/SceneManager.java @@ -599,7 +599,7 @@ public synchronized void loadScene(WorldView worldView, Scene scene) { sorted.zone.uploadJob = ZoneUploadJob .build(ctx, nextSceneContext, newZone, false, sorted.x, sorted.z); sorted.zone.uploadJob.revealAfterTimestampMs = - timeMs + ceil(clamp(sorted.dist / 15.0f, 0.25f, 1.5f) * 1000.0f); + timeMs + 100 + ceil(clamp(sorted.dist / 15.0f, 0.25f, 1.5f) * 1000.0f); } else { nextZones[sorted.x][sorted.z] = newZone; ZoneUploadJob diff --git a/src/main/java/rs117/hd/utils/jobs/Job.java b/src/main/java/rs117/hd/utils/jobs/Job.java index af2ef9575b..6305d7db2d 100644 --- a/src/main/java/rs117/hd/utils/jobs/Job.java +++ b/src/main/java/rs117/hd/utils/jobs/Job.java @@ -153,8 +153,6 @@ public final T queue(Job... dependencies) { protected abstract void onRun() throws InterruptedException; - protected boolean canStart() { return true; } - protected void onCompletion() {} protected void onCancel() {} diff --git a/src/main/java/rs117/hd/utils/jobs/JobHandle.java b/src/main/java/rs117/hd/utils/jobs/JobHandle.java index 9d38fb5740..a03f010a4f 100644 --- a/src/main/java/rs117/hd/utils/jobs/JobHandle.java +++ b/src/main/java/rs117/hd/utils/jobs/JobHandle.java @@ -154,7 +154,6 @@ synchronized void setCompleted() throws InterruptedException { if (VALIDATE) log.debug("Handle [{}] Completed", this); - int queuedWork = 0; JobHandle dep; while ((dep = dependants.poll()) != null) { if (wasCancelled) { @@ -166,19 +165,9 @@ synchronized void setCompleted() throws InterruptedException { dep.setInQueue(); if (VALIDATE) log.debug("Handle [{}] Adding: [{}] to queue", this, dep); - - if (dep.isHighPriority()) { - worker.localWorkQueue.addFirst(dep); - } else { - worker.localWorkQueue.addLast(dep); - } - - queuedWork++; + JOB_SYSTEM.pushWork(dep); } } - - if (queuedWork > 1) - JOB_SYSTEM.signalWorkAvailable(queuedWork - 1); } private void setJobState(int newState) { @@ -223,7 +212,7 @@ void cancel(boolean block) throws InterruptedException { if (VALIDATE) log.debug("Cancelling [{}] state: [{}]", this, STATE_NAMES[prevState]); - if (prevState == STATE_NONE || (prevState == STATE_QUEUED && JOB_SYSTEM.workQueue.remove(this))) { + if (prevState == STATE_NONE || prevState == STATE_QUEUED) { setCompleted(); return; } diff --git a/src/main/java/rs117/hd/utils/jobs/JobSystem.java b/src/main/java/rs117/hd/utils/jobs/JobSystem.java index a97a15d466..4cb2d468c4 100644 --- a/src/main/java/rs117/hd/utils/jobs/JobSystem.java +++ b/src/main/java/rs117/hd/utils/jobs/JobSystem.java @@ -1,9 +1,11 @@ package rs117.hd.utils.jobs; import com.google.inject.Injector; +import java.util.ArrayDeque; import java.util.HashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.inject.Inject; import javax.inject.Singleton; import lombok.Getter; @@ -42,26 +44,22 @@ public final class JobSystem { private int workerCount; - final ConcurrentLinkedDeque workQueue = new ConcurrentLinkedDeque<>(); - private final ConcurrentLinkedDeque clientCallbacks = new ConcurrentLinkedDeque<>(); - + private final ArrayDeque clientCallbacks = new ArrayDeque<>(); private final HashMap threadToWorker = new HashMap<>(); - - private boolean clientInvokeScheduled; + private final AtomicInteger roundRobinIdx = new AtomicInteger(); + private final AtomicBoolean clientInvokeScheduled = new AtomicBoolean(); Worker[] workers; - Semaphore workerSemaphore; public void startUp(CpuUsageLimit cpuUsageLimit) { workerCount = max(1, ceil((PROCESSOR_COUNT - 1) * cpuUsageLimit.threadRatio)); workers = new Worker[workerCount]; - workerSemaphore = new Semaphore(workerCount); active = true; for (int i = 0; i < workerCount; i++) { Worker worker = workers[i] = new Worker(this, i); worker.thread = new Thread(worker::run); - worker.thread.setPriority(Thread.NORM_PRIORITY + 3); + worker.thread.setPriority(Thread.NORM_PRIORITY + 1); worker.thread.setName("117HD - Worker " + i); threadToWorker.put(worker.thread, worker); } @@ -75,20 +73,33 @@ public void startUp(CpuUsageLimit cpuUsageLimit) { log.debug("Initialized JobSystem with {} workers", workerCount); } - void signalWorkAvailable(int workCount) { - int availPermits = workerSemaphore.availablePermits(); - if (availPermits >= workCount) - return; - workerSemaphore.release(min(workCount, workCount - availPermits)); - } + void pushWork(JobHandle handle) { + // Simple RR with randomization fallback for contention spreading + int idx = Math.floorMod( + roundRobinIdx.getAndIncrement() + ThreadLocalRandom.current().nextInt(workerCount), + workerCount + ); - public int getWorkQueueSize() { - return workQueue.size(); + Worker best = workers[idx]; + int bestDepth = best.queueDepth.get(); + + // Small scan window for lower contention + decent balancing + for (int i = 1; i < min(workerCount, 3); i++) { + Worker worker = workers[(idx + i) % workerCount]; + int depth = worker.queueDepth.get(); + + if (depth < bestDepth) { + best = worker; + bestDepth = depth; + } + } + + best.push(handle); } - private void cancelAllWork(ConcurrentLinkedDeque queue) { + private void cancelAllWork(ArrayDeque queue) { JobHandle handle; - while ((handle = queue.poll()) != null) { + while ((handle = queue.pollFirst()) != null) { try { handle.cancel(false); handle.setCompleted(); @@ -101,7 +112,6 @@ private void cancelAllWork(ConcurrentLinkedDeque queue) { public void shutDown() { active = false; - cancelAllWork(workQueue); for (Worker worker : workers) { cancelAllWork(worker.localWorkQueue); @@ -148,16 +158,7 @@ public boolean isWorker() { return threadToWorker.containsKey(Thread.currentThread()); } - public boolean hasIdleWorkers() { - for (Worker worker : workers) { - if (!worker.processing.get()) - return true; - } - return false; - } - public void printWorkersState() { - log.debug("WorkQueue Size: {}", workQueue.size()); for (Worker worker : workers) worker.printState(); } @@ -200,15 +201,9 @@ void queue(Job item, boolean highPriority, Job... dependencies) { if (shouldQueue) { newHandle.setInQueue(); - if (VALIDATE) log.debug("Handle [{}] Added to queue (Dep Count: {{}})", newHandle, dependencies); - if (highPriority) { - workQueue.addFirst(newHandle); - } else { - workQueue.addLast(newHandle); - } + if (VALIDATE) log.debug("Handle [{}] Added to queue", newHandle); + pushWork(newHandle); } - - signalWorkAvailable(1); } void invokeClientCallback(Runnable callback) throws InterruptedException { @@ -221,38 +216,40 @@ void invokeClientCallback(Runnable callback) throws InterruptedException { final ClientCallbackJob clientCallback = ClientCallbackJob.current(); clientCallback.callback = callback; - clientCallbacks.add(clientCallback); + synchronized (clientCallbacks) { + clientCallbacks.add(clientCallback); + } - if (!clientInvokeScheduled) { - clientInvokeScheduled = true; + if (clientInvokeScheduled.compareAndSet(false, true)) { clientThread.invoke(() -> { - clientInvokeScheduled = false; processPendingClientCallbacks(); + clientInvokeScheduled.set(false); }); } try { clientCallback.semaphore.acquire(); } catch (InterruptedException e) { - clientCallbacks.remove(clientCallback); - throw new InterruptedException(); + synchronized (clientCallbacks) { + clientCallbacks.remove(clientCallback); + throw new InterruptedException(); + } } } public void processPendingClientCallbacks() { - int size = clientCallbacks.size(); - if (size == 0) - return; - - ClientCallbackJob pair; - while (size-- > 0 && (pair = clientCallbacks.poll()) != null) { - try { - pair.callback.run(); - } catch (Throwable ex) { - log.warn("Encountered exception whilst processing client callback", ex); - } finally { - pair.semaphore.release(); + synchronized (clientCallbacks) { + ClientCallbackJob pair; + while ((pair = clientCallbacks.poll()) != null) { + try { + pair.callback.run(); + } catch (Throwable ex) { + log.warn("Encountered exception whilst processing client callback", ex); + } finally { + pair.semaphore.release(); + } } } + } -} +} \ No newline at end of file diff --git a/src/main/java/rs117/hd/utils/jobs/Worker.java b/src/main/java/rs117/hd/utils/jobs/Worker.java index 3f1b629220..bba980e71c 100644 --- a/src/main/java/rs117/hd/utils/jobs/Worker.java +++ b/src/main/java/rs117/hd/utils/jobs/Worker.java @@ -1,12 +1,10 @@ package rs117.hd.utils.jobs; import java.util.ArrayDeque; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import rs117.hd.utils.Props; import rs117.hd.utils.platform.PlatformBindings; import static rs117.hd.utils.HDUtils.getThreadStackTrace; @@ -16,11 +14,6 @@ @Slf4j @RequiredArgsConstructor public final class Worker { - static final long MIN_SLEEP_NANOS = 1_000; - static final long MAX_SLEEP_NANOS = 1_000_000; // 1ms - - static final double SLEEP_ALPHA = 0.1; - String name, pausedName; Thread thread; JobHandle handle; @@ -28,147 +21,157 @@ public final class Worker { final JobSystem jobSystem; final int workerIdx; - final ConcurrentLinkedDeque localWorkQueue = new ConcurrentLinkedDeque<>(); - final ArrayDeque localStalledWork = new ArrayDeque<>(); - final AtomicBoolean processing = new AtomicBoolean(); + + final ArrayDeque localWorkQueue = new ArrayDeque<>(); + + final AtomicInteger queueDepth = new AtomicInteger(); + final AtomicInteger unparkedStamp = new AtomicInteger(); + final AtomicInteger parkedStamp = new AtomicInteger(); boolean findNextStealTarget() { - // Find the best target to steal work from int nextVictimIdx = -1; int nextVictimWorkCount = -1; - for (int i = 0; i < jobSystem.workers.length; i++) { - final Worker worker = jobSystem.workers[i]; - if (i == workerIdx || !worker.processing.get() || worker.localWorkQueue.isEmpty()) - continue; // Don't query ourselves or a worker that is idle - int workCount = worker.localWorkQueue.size(); + + for (int i = 1; i < jobSystem.workers.length; i++) { + final Worker worker = jobSystem.workers[(workerIdx + i) % jobSystem.workers.length]; + if (i == workerIdx) + continue; + + int workCount = worker.queueDepth.get(); + if (workCount <= 1) + continue; + if (workCount > nextVictimWorkCount) { nextVictimIdx = i; nextVictimWorkCount = workCount; } } + stealTargetIdx = nextVictimIdx; return nextVictimWorkCount > 0; } + void push(JobHandle handle) { + synchronized (localWorkQueue) { + if (handle.highPriority) { + localWorkQueue.addFirst(handle); + } else { + localWorkQueue.addLast(handle); + } + } + + queueDepth.incrementAndGet(); + if(unparkedStamp.compareAndSet(unparkedStamp.get(), parkedStamp.get())) + LockSupport.unpark(thread); + } + @SuppressWarnings("ResultOfMethodCallIgnored") void run() { + init(); + + while (jobSystem.active) { + if(!acquireHandle()) + break; + + try { + processHandle(); + } catch (InterruptedException ignored) { + thread.isInterrupted(); + } + } + + log.trace("Shutdown"); + } + + private void init() { name = thread.getName(); pausedName = name + " [Paused]"; int affinityCore = workerIdx + 1; - if(PlatformBindings.setAffinity(1L << affinityCore)) + if(PlatformBindings.setAffinity(1L << affinityCore)) { log.trace("Set worker {} affinity to {}", workerIdx, affinityCore); + Thread.yield(); // Yield to ensure affinity is set + } - long spinWaitNanos = TimeUnit.MICROSECONDS.convert(1, TimeUnit.NANOSECONDS); - long nextCPUCoreCheck = Props.DEVELOPMENT ? 0 : -1; + int cpu = PlatformBindings.getCpu(); + if (cpu != -1 && affinityCore != cpu) { + log.warn( + "Expected worker {} to be on core {}, but it is on core {}", + workerIdx, + affinityCore, + cpu + ); + } + } - while (jobSystem.active) { - // Check local work queue - handle = (localStalledWork.isEmpty() ? localWorkQueue : localStalledWork).poll(); - - // Check if the worker is on the correct core, helps determine if CPU Pinning is working correctly on platforms - if(nextCPUCoreCheck >= 0 && System.nanoTime() > nextCPUCoreCheck) { - int cpu = PlatformBindings.getCpu(); - if (cpu != -1) { - if (cpu != affinityCore) { - log.warn("Expected worker {} to be on core {}, but it is on core {}", workerIdx, affinityCore, cpu); - nextCPUCoreCheck = -1L; - } else { - nextCPUCoreCheck = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); - } - } else { - nextCPUCoreCheck = -1L; - } - } + JobHandle pollFirst() { + if(localWorkQueue.isEmpty()) + return null; - long idleStart = handle == null ? System.nanoTime() : 0; - long idleTime = 0; - while (handle == null) { - if (stealTargetIdx >= 0) { - final Worker victim = jobSystem.workers[stealTargetIdx]; - int stealCount = max(1, victim.localWorkQueue.size() / jobSystem.workers.length); - - JobHandle stolenHandle; - while (stealCount-- > 0 && (stolenHandle = victim.localWorkQueue.poll()) != null) { - if (handle == null) { - handle = stolenHandle; - } else { - if (handle.highPriority) - localWorkQueue.addFirst(stolenHandle); - else - localWorkQueue.addLast(stolenHandle); - } - } - } + synchronized (localWorkQueue) { + return localWorkQueue.pollFirst(); + } + } - if (handle == null) { - // Check if any work is in the main queue before attempting to steal again - handle = localStalledWork.isEmpty() ? jobSystem.workQueue.poll() : localStalledWork.poll(); - } + JobHandle pollLast() { + if(localWorkQueue.isEmpty()) + return null; - idleTime = System.nanoTime() - idleStart; - if (handle == null && !findNextStealTarget() && idleTime > spinWaitNanos) { - // Wait for a signal that there is work to be had - try { - jobSystem.workerSemaphore.acquire(); - } catch (InterruptedException ignored) { - // Interrupts are used to signal that the worker should shutdown, we'll pick this up and shutdown - thread.isInterrupted(); // Consume the interrupt to prevent it from cancelling the next job - } + synchronized (localWorkQueue) { + return localWorkQueue.pollLast(); + } + } + + private boolean acquireHandle() { + handle = pollFirst(); + while (handle == null) { + if (stealTargetIdx >= 0) { + final Worker victim = jobSystem.workers[stealTargetIdx]; + + int stealCount = max(1, victim.queueDepth.get() / jobSystem.workers.length); + JobHandle stolenHandle; + + while (stealCount-- > 0 && (stolenHandle = victim.pollLast()) != null) { + victim.queueDepth.decrementAndGet(); if (handle == null) { - // We've been signaled that there is work to be had, try the main queue again - handle = jobSystem.workQueue.poll(); + handle = stolenHandle; + } else { + push(stolenHandle); } } + } - if(handle == null) - Thread.onSpinWait(); + if (handle == null && !findNextStealTarget()) { + parkedStamp.incrementAndGet(); + LockSupport.park(this); if (!jobSystem.active) { log.trace("Shutdown"); - return; + return false; } - } - if (idleStart != 0) { - spinWaitNanos = clamp( - (long) ( - spinWaitNanos * (1.0 - SLEEP_ALPHA) + - idleTime * SLEEP_ALPHA - ), - MIN_SLEEP_NANOS, - MAX_SLEEP_NANOS - ); + handle = pollFirst(); } - try { - processing.set(true); - processHandle(); - } catch (InterruptedException ignored) { - thread.isInterrupted(); // Consume the interrupt to prevent it from cancelling the next job - } finally { - processing.set(false); + if (!jobSystem.active) { + log.trace("Shutdown"); + return false; } } - log.trace("Shutdown"); + + queueDepth.decrementAndGet(); + return true; } - void processHandle() throws InterruptedException { - boolean requeued = false; + private void processHandle() throws InterruptedException { try { workerHandleCancel(); if (handle.item != null) { - if (handle.item.canStart()) { - if (handle.setRunning(this)) { - handle.item.onRun(); - handle.item.ranToCompletion.set(true); - } - } else { - // Requeue into stalled work queue, since adding to ConcurrentLinkedDeque continuously is costly - localStalledWork.addLast(handle); - requeued = true; + if (handle.setRunning(this)) { + handle.item.onRun(); + handle.item.ranToCompletion.set(true); } } } catch (InterruptedException e) { @@ -179,15 +182,15 @@ void processHandle() throws InterruptedException { } else { log.warn("Encountered an error whilst processing: {}", handle.hashCode(), ex); } + handle.item.encounteredError.set(true); handle.cancel(false); } finally { - if (!requeued) { - if (handle.item != null && handle.item.wasCancelled.get()) - handle.item.onCancel(); - handle.setCompleted(); - handle.worker = null; - } + if (handle.item != null && handle.item.wasCancelled.get()) + handle.item.onCancel(); + + handle.setCompleted(); + handle.worker = null; handle = null; } }