diff --git a/src/main/java/rs117/hd/overlays/FrameTimerOverlay.java b/src/main/java/rs117/hd/overlays/FrameTimerOverlay.java index d596fc9efb..d2b62a1227 100644 --- a/src/main/java/rs117/hd/overlays/FrameTimerOverlay.java +++ b/src/main/java/rs117/hd/overlays/FrameTimerOverlay.java @@ -243,12 +243,6 @@ public Dimension render(Graphics2D g) { .right(String.valueOf(subSceneCount)) .build()); - - children.add(LineComponent.builder() - .left("Streaming Zones:") - .right(String.valueOf(jobSystem.getWorkQueueSize())) - .build()); - if (frameTimingsRecorder.isCapturingSnapshot()) children.add(LineComponent.builder() .leftFont(boldFont) diff --git a/src/main/java/rs117/hd/renderer/zone/AsyncCachedModel.java b/src/main/java/rs117/hd/renderer/zone/AsyncCachedModel.java index 30f20ae832..67937d4ea5 100644 --- a/src/main/java/rs117/hd/renderer/zone/AsyncCachedModel.java +++ b/src/main/java/rs117/hd/renderer/zone/AsyncCachedModel.java @@ -297,12 +297,8 @@ public synchronized void queue( texIndices3.cache(model, model.getTexIndices3()); } - @Override - protected boolean canStart() { - if (processing.get()) // Work has been stolen, so pop it off the queue - return true; - - return + boolean canStart() { + return !processing.get() && verticesX.cached && verticesY.cached && verticesZ.cached && faceIndices1.cached && faceIndices2.cached && faceIndices3.cached && faceColors3.cached; diff --git a/src/main/java/rs117/hd/renderer/zone/SceneManager.java b/src/main/java/rs117/hd/renderer/zone/SceneManager.java index f7258f3620..a1794d5a16 100644 --- a/src/main/java/rs117/hd/renderer/zone/SceneManager.java +++ b/src/main/java/rs117/hd/renderer/zone/SceneManager.java @@ -599,7 +599,7 @@ public synchronized void loadScene(WorldView worldView, Scene scene) { sorted.zone.uploadJob = ZoneUploadJob .build(ctx, nextSceneContext, newZone, false, sorted.x, sorted.z); sorted.zone.uploadJob.revealAfterTimestampMs = - timeMs + ceil(clamp(sorted.dist / 15.0f, 0.25f, 1.5f) * 1000.0f); + timeMs + 100 + ceil(clamp(sorted.dist / 15.0f, 0.25f, 1.5f) * 1000.0f); } else { nextZones[sorted.x][sorted.z] = newZone; ZoneUploadJob diff --git a/src/main/java/rs117/hd/utils/jobs/Job.java b/src/main/java/rs117/hd/utils/jobs/Job.java index af2ef9575b..6305d7db2d 100644 --- a/src/main/java/rs117/hd/utils/jobs/Job.java +++ b/src/main/java/rs117/hd/utils/jobs/Job.java @@ -153,8 +153,6 @@ public final T queue(Job... dependencies) { protected abstract void onRun() throws InterruptedException; - protected boolean canStart() { return true; } - protected void onCompletion() {} protected void onCancel() {} diff --git a/src/main/java/rs117/hd/utils/jobs/JobHandle.java b/src/main/java/rs117/hd/utils/jobs/JobHandle.java index 9d38fb5740..a03f010a4f 100644 --- a/src/main/java/rs117/hd/utils/jobs/JobHandle.java +++ b/src/main/java/rs117/hd/utils/jobs/JobHandle.java @@ -154,7 +154,6 @@ synchronized void setCompleted() throws InterruptedException { if (VALIDATE) log.debug("Handle [{}] Completed", this); - int queuedWork = 0; JobHandle dep; while ((dep = dependants.poll()) != null) { if (wasCancelled) { @@ -166,19 +165,9 @@ synchronized void setCompleted() throws InterruptedException { dep.setInQueue(); if (VALIDATE) log.debug("Handle [{}] Adding: [{}] to queue", this, dep); - - if (dep.isHighPriority()) { - worker.localWorkQueue.addFirst(dep); - } else { - worker.localWorkQueue.addLast(dep); - } - - queuedWork++; + JOB_SYSTEM.pushWork(dep); } } - - if (queuedWork > 1) - JOB_SYSTEM.signalWorkAvailable(queuedWork - 1); } private void setJobState(int newState) { @@ -223,7 +212,7 @@ void cancel(boolean block) throws InterruptedException { if (VALIDATE) log.debug("Cancelling [{}] state: [{}]", this, STATE_NAMES[prevState]); - if (prevState == STATE_NONE || (prevState == STATE_QUEUED && JOB_SYSTEM.workQueue.remove(this))) { + if (prevState == STATE_NONE || prevState == STATE_QUEUED) { setCompleted(); return; } diff --git a/src/main/java/rs117/hd/utils/jobs/JobSystem.java b/src/main/java/rs117/hd/utils/jobs/JobSystem.java index 8c2fa91303..4cb2d468c4 100644 --- a/src/main/java/rs117/hd/utils/jobs/JobSystem.java +++ b/src/main/java/rs117/hd/utils/jobs/JobSystem.java @@ -1,9 +1,11 @@ package rs117.hd.utils.jobs; import com.google.inject.Injector; +import java.util.ArrayDeque; import java.util.HashMap; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.inject.Inject; import javax.inject.Singleton; import lombok.Getter; @@ -42,20 +44,16 @@ public final class JobSystem { private int workerCount; - final ConcurrentLinkedDeque workQueue = new ConcurrentLinkedDeque<>(); - private final ConcurrentLinkedDeque clientCallbacks = new ConcurrentLinkedDeque<>(); - + private final ArrayDeque clientCallbacks = new ArrayDeque<>(); private final HashMap threadToWorker = new HashMap<>(); - - private boolean clientInvokeScheduled; + private final AtomicInteger roundRobinIdx = new AtomicInteger(); + private final AtomicBoolean clientInvokeScheduled = new AtomicBoolean(); Worker[] workers; - Semaphore workerSemaphore; public void startUp(CpuUsageLimit cpuUsageLimit) { workerCount = max(1, ceil((PROCESSOR_COUNT - 1) * cpuUsageLimit.threadRatio)); workers = new Worker[workerCount]; - workerSemaphore = new Semaphore(workerCount); active = true; for (int i = 0; i < workerCount; i++) { @@ -75,20 +73,33 @@ public void startUp(CpuUsageLimit cpuUsageLimit) { log.debug("Initialized JobSystem with {} workers", workerCount); } - void signalWorkAvailable(int workCount) { - int availPermits = workerSemaphore.availablePermits(); - if (availPermits >= workCount) - return; - workerSemaphore.release(min(workCount, workCount - availPermits)); - } + void pushWork(JobHandle handle) { + // Simple RR with randomization fallback for contention spreading + int idx = Math.floorMod( + roundRobinIdx.getAndIncrement() + ThreadLocalRandom.current().nextInt(workerCount), + workerCount + ); - public int getWorkQueueSize() { - return workQueue.size(); + Worker best = workers[idx]; + int bestDepth = best.queueDepth.get(); + + // Small scan window for lower contention + decent balancing + for (int i = 1; i < min(workerCount, 3); i++) { + Worker worker = workers[(idx + i) % workerCount]; + int depth = worker.queueDepth.get(); + + if (depth < bestDepth) { + best = worker; + bestDepth = depth; + } + } + + best.push(handle); } - private void cancelAllWork(ConcurrentLinkedDeque queue) { + private void cancelAllWork(ArrayDeque queue) { JobHandle handle; - while ((handle = queue.poll()) != null) { + while ((handle = queue.pollFirst()) != null) { try { handle.cancel(false); handle.setCompleted(); @@ -101,7 +112,6 @@ private void cancelAllWork(ConcurrentLinkedDeque queue) { public void shutDown() { active = false; - cancelAllWork(workQueue); for (Worker worker : workers) { cancelAllWork(worker.localWorkQueue); @@ -148,16 +158,7 @@ public boolean isWorker() { return threadToWorker.containsKey(Thread.currentThread()); } - public boolean hasIdleWorkers() { - for (Worker worker : workers) { - if (!worker.inflight.get()) - return true; - } - return false; - } - public void printWorkersState() { - log.debug("WorkQueue Size: {}", workQueue.size()); for (Worker worker : workers) worker.printState(); } @@ -200,15 +201,9 @@ void queue(Job item, boolean highPriority, Job... dependencies) { if (shouldQueue) { newHandle.setInQueue(); - if (VALIDATE) log.debug("Handle [{}] Added to queue (Dep Count: {{}})", newHandle, dependencies); - if (highPriority) { - workQueue.addFirst(newHandle); - } else { - workQueue.addLast(newHandle); - } + if (VALIDATE) log.debug("Handle [{}] Added to queue", newHandle); + pushWork(newHandle); } - - signalWorkAvailable(1); } void invokeClientCallback(Runnable callback) throws InterruptedException { @@ -221,38 +216,40 @@ void invokeClientCallback(Runnable callback) throws InterruptedException { final ClientCallbackJob clientCallback = ClientCallbackJob.current(); clientCallback.callback = callback; - clientCallbacks.add(clientCallback); + synchronized (clientCallbacks) { + clientCallbacks.add(clientCallback); + } - if (!clientInvokeScheduled) { - clientInvokeScheduled = true; + if (clientInvokeScheduled.compareAndSet(false, true)) { clientThread.invoke(() -> { - clientInvokeScheduled = false; processPendingClientCallbacks(); + clientInvokeScheduled.set(false); }); } try { clientCallback.semaphore.acquire(); } catch (InterruptedException e) { - clientCallbacks.remove(clientCallback); - throw new InterruptedException(); + synchronized (clientCallbacks) { + clientCallbacks.remove(clientCallback); + throw new InterruptedException(); + } } } public void processPendingClientCallbacks() { - int size = clientCallbacks.size(); - if (size == 0) - return; - - ClientCallbackJob pair; - while (size-- > 0 && (pair = clientCallbacks.poll()) != null) { - try { - pair.callback.run(); - } catch (Throwable ex) { - log.warn("Encountered exception whilst processing client callback", ex); - } finally { - pair.semaphore.release(); + synchronized (clientCallbacks) { + ClientCallbackJob pair; + while ((pair = clientCallbacks.poll()) != null) { + try { + pair.callback.run(); + } catch (Throwable ex) { + log.warn("Encountered exception whilst processing client callback", ex); + } finally { + pair.semaphore.release(); + } } } + } -} +} \ No newline at end of file diff --git a/src/main/java/rs117/hd/utils/jobs/Worker.java b/src/main/java/rs117/hd/utils/jobs/Worker.java index 697478c182..bba980e71c 100644 --- a/src/main/java/rs117/hd/utils/jobs/Worker.java +++ b/src/main/java/rs117/hd/utils/jobs/Worker.java @@ -1,11 +1,11 @@ package rs117.hd.utils.jobs; import java.util.ArrayDeque; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import rs117.hd.utils.platform.PlatformBindings; import static rs117.hd.utils.HDUtils.getThreadStackTrace; import static rs117.hd.utils.MathUtils.*; @@ -14,8 +14,6 @@ @Slf4j @RequiredArgsConstructor public final class Worker { - private static final long SLEEP_TIME_NANOS = TimeUnit.MICROSECONDS.convert(1, TimeUnit.NANOSECONDS); - String name, pausedName; Thread thread; JobHandle handle; @@ -23,105 +21,157 @@ public final class Worker { final JobSystem jobSystem; final int workerIdx; - final ConcurrentLinkedDeque localWorkQueue = new ConcurrentLinkedDeque<>(); - final ArrayDeque localStalledWork = new ArrayDeque<>(); - final AtomicBoolean inflight = new AtomicBoolean(); + + final ArrayDeque localWorkQueue = new ArrayDeque<>(); + + final AtomicInteger queueDepth = new AtomicInteger(); + final AtomicInteger unparkedStamp = new AtomicInteger(); + final AtomicInteger parkedStamp = new AtomicInteger(); boolean findNextStealTarget() { - // Find the best target to steal work from int nextVictimIdx = -1; int nextVictimWorkCount = -1; - for (int i = 0; i < jobSystem.workers.length; i++) { - if (i == workerIdx || !jobSystem.workers[i].inflight.get()) - continue; // Don't query ourselves or a worker that is idle - int workCount = jobSystem.workers[i].localWorkQueue.size(); + + for (int i = 1; i < jobSystem.workers.length; i++) { + final Worker worker = jobSystem.workers[(workerIdx + i) % jobSystem.workers.length]; + if (i == workerIdx) + continue; + + int workCount = worker.queueDepth.get(); + if (workCount <= 1) + continue; + if (workCount > nextVictimWorkCount) { nextVictimIdx = i; nextVictimWorkCount = workCount; } } + stealTargetIdx = nextVictimIdx; return nextVictimWorkCount > 0; } + void push(JobHandle handle) { + synchronized (localWorkQueue) { + if (handle.highPriority) { + localWorkQueue.addFirst(handle); + } else { + localWorkQueue.addLast(handle); + } + } + + queueDepth.incrementAndGet(); + if(unparkedStamp.compareAndSet(unparkedStamp.get(), parkedStamp.get())) + LockSupport.unpark(thread); + } + @SuppressWarnings("ResultOfMethodCallIgnored") void run() { + init(); + + while (jobSystem.active) { + if(!acquireHandle()) + break; + + try { + processHandle(); + } catch (InterruptedException ignored) { + thread.isInterrupted(); + } + } + + log.trace("Shutdown"); + } + + private void init() { name = thread.getName(); pausedName = name + " [Paused]"; - while (jobSystem.active) { - // Check local work queue - handle = (localStalledWork.isEmpty() ? localWorkQueue : localStalledWork).poll(); - - long waitStart = handle == null ? System.nanoTime() : 0; - while (handle == null) { - if (stealTargetIdx >= 0) { - final Worker victim = jobSystem.workers[stealTargetIdx]; - int stealCount = max(1, victim.localWorkQueue.size() / jobSystem.workers.length); - - JobHandle stolenHandle; - while (stealCount-- > 0 && (stolenHandle = victim.localWorkQueue.poll()) != null) { - if (handle == null) { - handle = stolenHandle; - } else { - if (handle.highPriority) - localWorkQueue.addFirst(stolenHandle); - else - localWorkQueue.addLast(stolenHandle); - } - } - } - if (handle == null) { - // Check if any work is in the main queue before attempting to steal again - handle = localStalledWork.isEmpty() ? jobSystem.workQueue.poll() : localStalledWork.poll(); - } + int affinityCore = workerIdx + 1; + if(PlatformBindings.setAffinity(1L << affinityCore)) { + log.trace("Set worker {} affinity to {}", workerIdx, affinityCore); + Thread.yield(); // Yield to ensure affinity is set + } - if (handle == null && !findNextStealTarget() && System.nanoTime() - waitStart > SLEEP_TIME_NANOS) { - // Wait for a signal that there is work to be had - try { - jobSystem.workerSemaphore.acquire(); - } catch (InterruptedException ignored) { - // Interrupts are used to signal that the worker should shutdown, we'll pick this up and shutdown - thread.isInterrupted(); // Consume the interrupt to prevent it from cancelling the next job - } + int cpu = PlatformBindings.getCpu(); + if (cpu != -1 && affinityCore != cpu) { + log.warn( + "Expected worker {} to be on core {}, but it is on core {}", + workerIdx, + affinityCore, + cpu + ); + } + } + + JobHandle pollFirst() { + if(localWorkQueue.isEmpty()) + return null; + + synchronized (localWorkQueue) { + return localWorkQueue.pollFirst(); + } + } + + JobHandle pollLast() { + if(localWorkQueue.isEmpty()) + return null; + + synchronized (localWorkQueue) { + return localWorkQueue.pollLast(); + } + } + + private boolean acquireHandle() { + handle = pollFirst(); + while (handle == null) { + if (stealTargetIdx >= 0) { + final Worker victim = jobSystem.workers[stealTargetIdx]; + + int stealCount = max(1, victim.queueDepth.get() / jobSystem.workers.length); + JobHandle stolenHandle; + + while (stealCount-- > 0 && (stolenHandle = victim.pollLast()) != null) { + victim.queueDepth.decrementAndGet(); if (handle == null) { - // We've been signaled that there is work to be had, try the main queue again - handle = jobSystem.workQueue.poll(); + handle = stolenHandle; + } else { + push(stolenHandle); } } + } + + if (handle == null && !findNextStealTarget()) { + parkedStamp.incrementAndGet(); + LockSupport.park(this); if (!jobSystem.active) { log.trace("Shutdown"); - return; + return false; } + + handle = pollFirst(); } - try { - processHandle(); - } catch (InterruptedException ignored) { - thread.isInterrupted(); // Consume the interrupt to prevent it from cancelling the next job + if (!jobSystem.active) { + log.trace("Shutdown"); + return false; } } - log.trace("Shutdown"); + + queueDepth.decrementAndGet(); + return true; } - void processHandle() throws InterruptedException { - boolean requeued = false; + private void processHandle() throws InterruptedException { try { workerHandleCancel(); if (handle.item != null) { - if (handle.item.canStart()) { - if (handle.setRunning(this)) { - inflight.set(true); - handle.item.onRun(); - handle.item.ranToCompletion.set(true); - } - } else { - // Requeue into stalled work queue, since adding to ConcurrentLinkedDeque continuously is costly - localStalledWork.addLast(handle); - requeued = true; + if (handle.setRunning(this)) { + handle.item.onRun(); + handle.item.ranToCompletion.set(true); } } } catch (InterruptedException e) { @@ -132,15 +182,15 @@ void processHandle() throws InterruptedException { } else { log.warn("Encountered an error whilst processing: {}", handle.hashCode(), ex); } + handle.item.encounteredError.set(true); handle.cancel(false); } finally { - if (!requeued) { - if (handle.item != null && handle.item.wasCancelled.get()) - handle.item.onCancel(); - handle.setCompleted(); - handle.worker = null; - } + if (handle.item != null && handle.item.wasCancelled.get()) + handle.item.onCancel(); + + handle.setCompleted(); + handle.worker = null; handle = null; } } diff --git a/src/main/java/rs117/hd/utils/platform/DummyBindings.java b/src/main/java/rs117/hd/utils/platform/DummyBindings.java new file mode 100644 index 0000000000..e5d9140fa1 --- /dev/null +++ b/src/main/java/rs117/hd/utils/platform/DummyBindings.java @@ -0,0 +1,5 @@ +package rs117.hd.utils.platform; + +public class DummyBindings extends PlatformBindings { + DummyBindings() { super(""); } +} diff --git a/src/main/java/rs117/hd/utils/platform/LinuxBindings.java b/src/main/java/rs117/hd/utils/platform/LinuxBindings.java new file mode 100644 index 0000000000..6a1a888526 --- /dev/null +++ b/src/main/java/rs117/hd/utils/platform/LinuxBindings.java @@ -0,0 +1,42 @@ +package rs117.hd.utils.platform; + +import lombok.extern.slf4j.Slf4j; +import org.lwjgl.system.MemoryStack; + +import static org.lwjgl.system.JNI.invokeI; +import static org.lwjgl.system.JNI.invokeP; +import static org.lwjgl.system.JNI.invokePPPI; +import static org.lwjgl.system.MemoryUtil.memPutLong; +import static org.lwjgl.system.Pointer.POINTER_SIZE; + +@Slf4j +public final class LinuxBindings extends PlatformBindings { + private static final int CPU_SET_BYTES = 128; + + private final long pthread_self = findFunction("pthread_self"); + private final long pthread_setaffinity_np = findFunction("pthread_setaffinity_np"); + private final long sched_getcpu = findFunction("sched_getcpu"); + + // https://man7.org/linux/man-pages + LinuxBindings() { super("libc.so.6"); } + + @Override + boolean supportsSetAffinity() { return pthread_self != 0 && pthread_setaffinity_np != 0; } + + @Override + boolean setAffinityImpl(long mask) { + try (MemoryStack stack = MemoryStack.stackPush()) { + long cpuset = stack.ncalloc(POINTER_SIZE, CPU_SET_BYTES, 1); + memPutLong(cpuset, mask); + + long thread = invokeP(pthread_self); + return invokePPPI(thread, CPU_SET_BYTES, cpuset, pthread_setaffinity_np) == 0; + } + } + + @Override + boolean supportsGetCpu() { return sched_getcpu != 0; } + + @Override + int getCpuImpl() { return invokeI(sched_getcpu); } +} \ No newline at end of file diff --git a/src/main/java/rs117/hd/utils/platform/MacBindings.java b/src/main/java/rs117/hd/utils/platform/MacBindings.java new file mode 100644 index 0000000000..5bd223b18d --- /dev/null +++ b/src/main/java/rs117/hd/utils/platform/MacBindings.java @@ -0,0 +1,80 @@ +package rs117.hd.utils.platform; + +import java.nio.ByteBuffer; +import lombok.extern.slf4j.Slf4j; +import org.lwjgl.BufferUtils; +import org.lwjgl.PointerBuffer; +import org.lwjgl.system.MemoryStack; +import org.lwjgl.system.libffi.FFICIF; + +import static org.lwjgl.system.JNI.invokeP; +import static org.lwjgl.system.JNI.invokePI; +import static org.lwjgl.system.libffi.LibFFI.FFI_DEFAULT_ABI; +import static org.lwjgl.system.libffi.LibFFI.FFI_OK; +import static org.lwjgl.system.libffi.LibFFI.ffi_call; +import static org.lwjgl.system.libffi.LibFFI.ffi_prep_cif; +import static org.lwjgl.system.libffi.LibFFI.ffi_type_pointer; +import static org.lwjgl.system.libffi.LibFFI.ffi_type_sint32; + +@Slf4j +public final class MacBindings extends PlatformBindings { + private static final int THREAD_AFFINITY_POLICY = 4; + + private final long pthread_self = findFunction("pthread_self"); + private final long pthread_mach_thread_np = findFunction("pthread_mach_thread_np"); + private final long thread_policy_set = findFunction("thread_policy_set"); + + private FFICIF CIF; + + // https://developer.apple.com/library/archive/documentation/Darwin/Conceptual/KernelProgramming + MacBindings() { super("/usr/lib/libSystem.B.dylib"); } + + @Override + boolean init() { + CIF = FFICIF.malloc(); + + PointerBuffer args = BufferUtils.createPointerBuffer(4); + args.put(ffi_type_sint32); + args.put(ffi_type_sint32); + args.put(ffi_type_pointer); + args.put(ffi_type_sint32); + args.flip(); + + int status = ffi_prep_cif( + CIF, + FFI_DEFAULT_ABI, + ffi_type_sint32, + args + ); + + if (status != FFI_OK) { + log.error("ffi_prep_cif failed: {}", status); + return false; + } + + return true; + } + + @Override + boolean supportsSetAffinity() { return pthread_self != 0 && pthread_mach_thread_np != 0 && thread_policy_set != 0; } + + @Override + boolean setAffinityImpl(long mask) { + try (MemoryStack stack = MemoryStack.stackPush()) { + long pthread = invokeP(pthread_self); + int machThread = invokePI(pthread, pthread_mach_thread_np); + int policyValue = (int) (mask & 0x7FFFFFFF); + + PointerBuffer args = stack.mallocPointer(4); + args.put(0, stack.ints(machThread)); + args.put(1, stack.ints(THREAD_AFFINITY_POLICY)); + args.put(2, stack.ints(policyValue)); + args.put(3, stack.ints(1)); + + ByteBuffer result = stack.malloc(4); + ffi_call(CIF, thread_policy_set, result, args); + + return result.getInt(0) == 0; + } + } +} diff --git a/src/main/java/rs117/hd/utils/platform/PlatformBindings.java b/src/main/java/rs117/hd/utils/platform/PlatformBindings.java new file mode 100644 index 0000000000..73348f9b88 --- /dev/null +++ b/src/main/java/rs117/hd/utils/platform/PlatformBindings.java @@ -0,0 +1,108 @@ +package rs117.hd.utils.platform; + +import lombok.extern.slf4j.Slf4j; +import net.runelite.client.util.OSType; +import org.lwjgl.system.SharedLibrary; + +import static org.lwjgl.system.APIUtil.apiCreateLibrary; +import static org.lwjgl.system.APIUtil.apiGetFunctionAddress; + +@Slf4j +public abstract class PlatformBindings { + private static PlatformBindings BINDINGS; + + static synchronized PlatformBindings createBindings() { + if (BINDINGS != null) + return BINDINGS; + + final OSType osType = OSType.getOSType(); + if(osType != OSType.Windows && osType != OSType.Linux && osType != OSType.MacOS) { + BINDINGS = new DummyBindings(); + log.debug("Unknown Platform, falling back to dummy bindings:"); + return BINDINGS; + } + + try { + final PlatformBindings candidate; + switch (OSType.getOSType()) { + case Windows: + candidate = new WindowsBindings(); + break; + case Linux: + candidate = new LinuxBindings(); + break; + case MacOS: + candidate = new MacBindings(); + break; + default: + candidate = new DummyBindings(); + break; + } + + if(candidate.init()) { + BINDINGS = candidate; + log.info("Initialized platform bindings"); + } + } catch (Throwable t) { + log.error(t.toString()); + } + + if(BINDINGS == null) { + BINDINGS = new DummyBindings(); + log.warn("Failed to initialize platform bindings, falling back to dummy bindings"); + } + + return BINDINGS; + } + + private final String libraryName; + private SharedLibrary library; + + PlatformBindings(String libraryName) { + this.libraryName = libraryName; + } + + boolean init() { return true; } + + protected long findFunction(String name) { + if(library == null) + library = apiCreateLibrary(libraryName); + return apiGetFunctionAddress(library, name); + } + + public static boolean setAffinity(long mask) { + if (BINDINGS == null) + createBindings(); + + if(BINDINGS.supportsSetAffinity()) { + try { + return BINDINGS.setAffinityImpl(mask); + } catch (Throwable t) { + log.error("Failed to set affinity", t); + } + } + + return false; + } + + boolean supportsSetAffinity() { return false; } + boolean setAffinityImpl(long mask) { return false; } + + public static int getCpu() { + if (BINDINGS == null) + createBindings(); + + if(BINDINGS.supportsGetCpu()) { + try { + return BINDINGS.getCpuImpl(); + } catch (Throwable t) { + log.error("Failed to get Cpu", t); + } + } + + return -1; + } + + boolean supportsGetCpu() { return false; } + int getCpuImpl() { return -1; } +} diff --git a/src/main/java/rs117/hd/utils/platform/WindowsBindings.java b/src/main/java/rs117/hd/utils/platform/WindowsBindings.java new file mode 100644 index 0000000000..486fd3a554 --- /dev/null +++ b/src/main/java/rs117/hd/utils/platform/WindowsBindings.java @@ -0,0 +1,36 @@ +package rs117.hd.utils.platform; + +import lombok.extern.slf4j.Slf4j; +import org.lwjgl.system.Pointer; + +import static org.lwjgl.system.JNI.invokeI; +import static org.lwjgl.system.JNI.invokeP; +import static org.lwjgl.system.JNI.invokePP; + +@Slf4j +public final class WindowsBindings extends PlatformBindings { + private final long GetCurrentThread = findFunction("GetCurrentThread"); + private final long SetThreadAffinityMask = findFunction("SetThreadAffinityMask"); + private final long GetCurrentProcessorNumber = findFunction("GetCurrentProcessorNumber"); + + // https://learn.microsoft.com/en-us/windows/win32/ + WindowsBindings() { super("kernel32"); } + + @Override + boolean supportsSetAffinity() { return GetCurrentThread != 0L && SetThreadAffinityMask != 0L; } + + @Override + boolean setAffinityImpl(long mask) { + if (Pointer.POINTER_SIZE == 4 && (mask >>> 32) != 0) + throw new IllegalArgumentException("Mask exceeds 32-bit width"); + + long thread = invokeP(GetCurrentThread); + return invokePP(thread, mask, SetThreadAffinityMask) == 0; + } + + @Override + boolean supportsGetCpu() { return GetCurrentProcessorNumber != 0; } + + @Override + int getCpuImpl() { return invokeI(GetCurrentProcessorNumber); } +}