Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.papermc.paper.util.concurrent;

public interface ScheduledTask {
long getNextRun();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package io.papermc.paper.util.concurrent;

import org.jetbrains.annotations.NotNull;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Predicate;

/**
* This class schedules tasks in ticks and executes them efficiently using a circular array (the wheel).
* Each slot in the wheel represents a specific tick modulo the wheel size.
* Tasks are placed into slots based on their target execution tick.
* On each tick, the wheel checks the current slot and runs any tasks whose execute tick has been reached.
*
* O(1) task scheduling and retrieval within a single wheel rotation.
* We are using power of 2 for faster operations than modulo.
*
*/
public class TimingWheel<T extends ScheduledTask> implements Iterable<T> {
private final int wheelSize;
private final long mask;
private final ArrayDeque<T>[] wheel;

@SuppressWarnings("unchecked")
public TimingWheel(int exponent) {
this.wheelSize = 1 << exponent;
this.mask = wheelSize - 1L;

this.wheel = (ArrayDeque<T>[]) new ArrayDeque[wheelSize];
for (int i = 0; i < wheelSize; i++) {
wheel[i] = new ArrayDeque<>();
}
}

public void add(T task) {
int slot = (int) (task.getNextRun() & mask);
wheel[slot].add(task);
}

public void addAll(Collection<? extends T> tasks) {
for (T task : tasks) {
this.add(task);
}
}

public @NotNull List<T> popValid(long currentTick) {
int slot = (int) (currentTick & mask);
ArrayDeque<T> bucket = wheel[slot];
if (bucket.isEmpty()) return Collections.emptyList();

Iterator<T> iter = bucket.iterator();
List<T> list = new ArrayList<>();
while (iter.hasNext()) {
T task = iter.next();

if (task.getNextRun() <= currentTick) {
iter.remove();
list.add(task);
}
}

return list;
}

public boolean isReady(long currentTick) {
int slot = (int) (currentTick & mask);
ArrayDeque<T> bucket = wheel[slot];
if (bucket.isEmpty()) return false;

for (final T task : bucket) {
if (task.getNextRun() <= currentTick) {
return true;
}
}

return false;
}

public void removeIf(Predicate<T> apply) {
Iterator<T> itr = iterator();
while (itr.hasNext()) {
T next = itr.next();
if (apply.test(next)) {
itr.remove();
}
}
}

@SuppressWarnings("unchecked")
private class Itr implements Iterator<T> {
private int index = 0;
private Iterator<T> current = Collections.emptyIterator();
private Iterator<T> lastIterator = null;

@Override
public boolean hasNext() {
if (current.hasNext()) {
return true;
}

for (int i = index; i < wheelSize; i++) {
if (!wheel[i].isEmpty()) {
return true;
}
}

return false;
}

@Override
public T next() {
while (true) {
if (current.hasNext()) {
lastIterator = current;
return current.next();
}

if (index >= wheelSize) {
throw new NoSuchElementException();
}

current = wheel[index++].iterator();
}
}

@Override
public void remove() {
if (lastIterator == null) {
throw new NoSuchElementException();
}

lastIterator.remove();
lastIterator = null;
}
}


@Override
public @NotNull Iterator<T> iterator() {
return new Itr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,20 @@ public void mainThreadHeartbeat() {

private synchronized void runTasks(int currentTick) {
parsePending();
while (!this.pending.isEmpty() && this.pending.peek().getNextRun() <= currentTick) {
CraftTask task = this.pending.remove();
// Paper start - Timing Wheel
List<CraftTask> tasks = this.pending.popValid(currentTick);
for (CraftTask task : tasks) {
// Paper end - Timing Wheel
if (executeTask(task)) {
final long period = task.getPeriod();
if (period > 0) {
task.setNextRun(currentTick + period);
temp.add(task);
}
}
parsePending();
}
parsePending();

this.pending.addAll(temp);
temp.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,7 @@ public class CraftScheduler implements BukkitScheduler {
/**
* Main thread logic only
*/
final PriorityQueue<CraftTask> pending = new PriorityQueue<CraftTask>(10, // Paper
new Comparator<CraftTask>() {
@Override
public int compare(final CraftTask o1, final CraftTask o2) {
int value = Long.compare(o1.getNextRun(), o2.getNextRun());

// If the tasks should run on the same tick they should be run FIFO
return value != 0 ? value : Long.compare(o1.getCreatedAt(), o2.getCreatedAt());
}
});
final io.papermc.paper.util.concurrent.TimingWheel<CraftTask> pending = new io.papermc.paper.util.concurrent.TimingWheel<>(12); // Paper - Timing wheel
/**
* Main thread logic only
*/
Expand Down Expand Up @@ -459,8 +450,10 @@ public void mainThreadHeartbeat() {
// Paper end
final List<CraftTask> temp = this.temp;
this.parsePending();
while (this.isReady(this.currentTick)) {
final CraftTask task = this.pending.remove();
// Paper start - Timing Wheel
final List<CraftTask> tasks = this.pending.popValid(this.currentTick);
for (CraftTask task : tasks) {
// Paper end - Timing Wheel
if (task.getPeriod() < CraftTask.NO_REPEATING) {
if (task.isSync()) {
this.runners.remove(task.getTaskId(), task);
Expand Down Expand Up @@ -564,7 +557,8 @@ void parsePending() { // Paper
}

private boolean isReady(final int currentTick) {
return !this.pending.isEmpty() && this.pending.peek().getNextRun() <= currentTick;
// return !this.pending.isEmpty() && this.pending.peek().getNextRun() <= currentTick;
return this.pending.isReady(currentTick); // Paper - Timing wheel
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitTask;

public class CraftTask implements BukkitTask, Runnable { // Spigot
// Paper - Timing Wheel
public class CraftTask implements BukkitTask, Runnable, io.papermc.paper.util.concurrent.ScheduledTask { // Spigot

private volatile CraftTask next = null;
public static final int ERROR = 0;
Expand Down Expand Up @@ -93,7 +94,7 @@ void setPeriod(long period) {
this.period = period;
}

long getNextRun() {
public long getNextRun() { // Paper - Timing Wheel
return this.nextRun;
}

Expand Down
Loading