Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/main/java/rs117/hd/overlays/FrameTimerOverlay.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,6 @@ public Dimension render(Graphics2D g) {
.right(String.valueOf(subSceneCount))
.build());


children.add(LineComponent.builder()
.left("Streaming Zones:")
.right(String.valueOf(jobSystem.getWorkQueueSize()))
.build());

if (frameTimingsRecorder.isCapturingSnapshot())
children.add(LineComponent.builder()
.leftFont(boldFont)
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/rs117/hd/renderer/zone/AsyncCachedModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,8 @@ public synchronized void queue(
texIndices3.cache(model, model.getTexIndices3());
}

@Override
protected boolean canStart() {
if (processing.get()) // Work has been stolen, so pop it off the queue
return true;

return
boolean canStart() {
return !processing.get() &&
verticesX.cached && verticesY.cached && verticesZ.cached &&
faceIndices1.cached && faceIndices2.cached && faceIndices3.cached &&
faceColors3.cached;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rs117/hd/renderer/zone/SceneManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ public synchronized void loadScene(WorldView worldView, Scene scene) {
sorted.zone.uploadJob = ZoneUploadJob
.build(ctx, nextSceneContext, newZone, false, sorted.x, sorted.z);
sorted.zone.uploadJob.revealAfterTimestampMs =
timeMs + ceil(clamp(sorted.dist / 15.0f, 0.25f, 1.5f) * 1000.0f);
timeMs + 100 + ceil(clamp(sorted.dist / 15.0f, 0.25f, 1.5f) * 1000.0f);
} else {
nextZones[sorted.x][sorted.z] = newZone;
ZoneUploadJob
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/rs117/hd/utils/jobs/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,6 @@ public final <T extends Job> T queue(Job... dependencies) {

protected abstract void onRun() throws InterruptedException;

protected boolean canStart() { return true; }

protected void onCompletion() {}

protected void onCancel() {}
Expand Down
15 changes: 2 additions & 13 deletions src/main/java/rs117/hd/utils/jobs/JobHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ synchronized void setCompleted() throws InterruptedException {
if (VALIDATE)
log.debug("Handle [{}] Completed", this);

int queuedWork = 0;
JobHandle dep;
while ((dep = dependants.poll()) != null) {
if (wasCancelled) {
Expand All @@ -166,19 +165,9 @@ synchronized void setCompleted() throws InterruptedException {
dep.setInQueue();
if (VALIDATE)
log.debug("Handle [{}] Adding: [{}] to queue", this, dep);

if (dep.isHighPriority()) {
worker.localWorkQueue.addFirst(dep);
} else {
worker.localWorkQueue.addLast(dep);
}

queuedWork++;
JOB_SYSTEM.pushWork(dep);
}
}

if (queuedWork > 1)
JOB_SYSTEM.signalWorkAvailable(queuedWork - 1);
}

private void setJobState(int newState) {
Expand Down Expand Up @@ -223,7 +212,7 @@ void cancel(boolean block) throws InterruptedException {

if (VALIDATE) log.debug("Cancelling [{}] state: [{}]", this, STATE_NAMES[prevState]);

if (prevState == STATE_NONE || (prevState == STATE_QUEUED && JOB_SYSTEM.workQueue.remove(this))) {
if (prevState == STATE_NONE || prevState == STATE_QUEUED) {
setCompleted();
return;
}
Expand Down
109 changes: 53 additions & 56 deletions src/main/java/rs117/hd/utils/jobs/JobSystem.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package rs117.hd.utils.jobs;

import com.google.inject.Injector;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import javax.inject.Singleton;
import lombok.Getter;
Expand Down Expand Up @@ -42,20 +44,16 @@ public final class JobSystem {

private int workerCount;

final ConcurrentLinkedDeque<JobHandle> workQueue = new ConcurrentLinkedDeque<>();
private final ConcurrentLinkedDeque<ClientCallbackJob> clientCallbacks = new ConcurrentLinkedDeque<>();

private final ArrayDeque<ClientCallbackJob> clientCallbacks = new ArrayDeque<>();
private final HashMap<Thread, Worker> threadToWorker = new HashMap<>();

private boolean clientInvokeScheduled;
private final AtomicInteger roundRobinIdx = new AtomicInteger();
private final AtomicBoolean clientInvokeScheduled = new AtomicBoolean();

Worker[] workers;
Semaphore workerSemaphore;

public void startUp(CpuUsageLimit cpuUsageLimit) {
workerCount = max(1, ceil((PROCESSOR_COUNT - 1) * cpuUsageLimit.threadRatio));
workers = new Worker[workerCount];
workerSemaphore = new Semaphore(workerCount);
active = true;

for (int i = 0; i < workerCount; i++) {
Expand All @@ -75,20 +73,33 @@ public void startUp(CpuUsageLimit cpuUsageLimit) {
log.debug("Initialized JobSystem with {} workers", workerCount);
}

void signalWorkAvailable(int workCount) {
int availPermits = workerSemaphore.availablePermits();
if (availPermits >= workCount)
return;
workerSemaphore.release(min(workCount, workCount - availPermits));
}
void pushWork(JobHandle handle) {
// Simple RR with randomization fallback for contention spreading
int idx = Math.floorMod(
roundRobinIdx.getAndIncrement() + ThreadLocalRandom.current().nextInt(workerCount),
workerCount
);

public int getWorkQueueSize() {
return workQueue.size();
Worker best = workers[idx];
int bestDepth = best.queueDepth.get();

// Small scan window for lower contention + decent balancing
for (int i = 1; i < min(workerCount, 3); i++) {
Worker worker = workers[(idx + i) % workerCount];
int depth = worker.queueDepth.get();

if (depth < bestDepth) {
best = worker;
bestDepth = depth;
}
}

best.push(handle);
}

private void cancelAllWork(ConcurrentLinkedDeque<JobHandle> queue) {
private void cancelAllWork(ArrayDeque<JobHandle> queue) {
JobHandle handle;
while ((handle = queue.poll()) != null) {
while ((handle = queue.pollFirst()) != null) {
try {
handle.cancel(false);
handle.setCompleted();
Expand All @@ -101,7 +112,6 @@ private void cancelAllWork(ConcurrentLinkedDeque<JobHandle> queue) {

public void shutDown() {
active = false;
cancelAllWork(workQueue);

for (Worker worker : workers) {
cancelAllWork(worker.localWorkQueue);
Expand Down Expand Up @@ -148,16 +158,7 @@ public boolean isWorker() {
return threadToWorker.containsKey(Thread.currentThread());
}

public boolean hasIdleWorkers() {
for (Worker worker : workers) {
if (!worker.inflight.get())
return true;
}
return false;
}

public void printWorkersState() {
log.debug("WorkQueue Size: {}", workQueue.size());
for (Worker worker : workers)
worker.printState();
}
Expand Down Expand Up @@ -200,15 +201,9 @@ void queue(Job item, boolean highPriority, Job... dependencies) {

if (shouldQueue) {
newHandle.setInQueue();
if (VALIDATE) log.debug("Handle [{}] Added to queue (Dep Count: {{}})", newHandle, dependencies);
if (highPriority) {
workQueue.addFirst(newHandle);
} else {
workQueue.addLast(newHandle);
}
if (VALIDATE) log.debug("Handle [{}] Added to queue", newHandle);
pushWork(newHandle);
}

signalWorkAvailable(1);
}

void invokeClientCallback(Runnable callback) throws InterruptedException {
Expand All @@ -221,38 +216,40 @@ void invokeClientCallback(Runnable callback) throws InterruptedException {
final ClientCallbackJob clientCallback = ClientCallbackJob.current();
clientCallback.callback = callback;

clientCallbacks.add(clientCallback);
synchronized (clientCallbacks) {
clientCallbacks.add(clientCallback);
}

if (!clientInvokeScheduled) {
clientInvokeScheduled = true;
if (clientInvokeScheduled.compareAndSet(false, true)) {
clientThread.invoke(() -> {
clientInvokeScheduled = false;
processPendingClientCallbacks();
clientInvokeScheduled.set(false);
});
}

try {
clientCallback.semaphore.acquire();
} catch (InterruptedException e) {
clientCallbacks.remove(clientCallback);
throw new InterruptedException();
synchronized (clientCallbacks) {
clientCallbacks.remove(clientCallback);
throw new InterruptedException();
}
}
}

public void processPendingClientCallbacks() {
int size = clientCallbacks.size();
if (size == 0)
return;

ClientCallbackJob pair;
while (size-- > 0 && (pair = clientCallbacks.poll()) != null) {
try {
pair.callback.run();
} catch (Throwable ex) {
log.warn("Encountered exception whilst processing client callback", ex);
} finally {
pair.semaphore.release();
synchronized (clientCallbacks) {
ClientCallbackJob pair;
while ((pair = clientCallbacks.poll()) != null) {
try {
pair.callback.run();
} catch (Throwable ex) {
log.warn("Encountered exception whilst processing client callback", ex);
} finally {
pair.semaphore.release();
}
}
}

}
}
}
Loading