Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.papermc.paper.util.concurrent;

public interface TickBoundTask {
long getNextRun();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package io.papermc.paper.util.concurrent;

import org.jetbrains.annotations.NotNull;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Predicate;

/**
* This class schedules tasks in ticks and executes them efficiently using a circular array (the wheel).
* Each slot in the wheel represents a specific tick modulo the wheel size.
* Tasks are placed into slots based on their target execution tick.
* On each tick, the wheel checks the current slot and runs any tasks whose execute tick has been reached.
*
* O(1) task scheduling and retrieval within a single wheel rotation.
* We are using power of 2 for faster operations than modulo.
*
*/
public class TimingWheel<T extends TickBoundTask> implements Iterable<T> {
private final int wheelSize;
private final long mask;
private final ArrayDeque<T>[] wheel;

@SuppressWarnings("unchecked")
public TimingWheel(int exponent) {
this.wheelSize = 1 << exponent;
this.mask = wheelSize - 1L;

this.wheel = (ArrayDeque<T>[]) new ArrayDeque[wheelSize];
for (int i = 0; i < wheelSize; i++) {
wheel[i] = new ArrayDeque<>();
}
}

public void add(T task) {
int slot = (int) (task.getNextRun() & mask);
wheel[slot].add(task);
}

public void addAll(Collection<? extends T> tasks) {
for (T task : tasks) {
this.add(task);
}
}

public @NotNull List<T> popValid(long currentTick) {
int slot = (int) (currentTick & mask);
ArrayDeque<T> bucket = wheel[slot];
if (bucket.isEmpty()) return Collections.emptyList();

Iterator<T> iter = bucket.iterator();
List<T> list = new ArrayList<>();
while (iter.hasNext()) {
T task = iter.next();

if (task.getNextRun() <= currentTick) {
iter.remove();
list.add(task);
}
}

return list;
}

public boolean isReady(long currentTick) {
int slot = (int) (currentTick & mask);
ArrayDeque<T> bucket = wheel[slot];
if (bucket.isEmpty()) return false;

for (final T task : bucket) {
if (task.getNextRun() <= currentTick) {
return true;
}
}

return false;
}

public void removeIf(Predicate<T> apply) {
Iterator<T> itr = iterator();
while (itr.hasNext()) {
T next = itr.next();
if (apply.test(next)) {
itr.remove();
}
}
}

@SuppressWarnings("unchecked")
private class Itr implements Iterator<T> {
private int index = 0;
private Iterator<T> current = Collections.emptyIterator();
private Iterator<T> lastIterator = null;

@Override
public boolean hasNext() {
if (current.hasNext()) {
return true;
}

for (int i = index; i < wheelSize; i++) {
if (!wheel[i].isEmpty()) {
return true;
}
}

return false;
}

@Override
public T next() {
while (true) {
if (current.hasNext()) {
lastIterator = current;
return current.next();
}

if (index >= wheelSize) {
throw new NoSuchElementException();
}

current = wheel[index++].iterator();
}
}

@Override
public void remove() {
if (lastIterator == null) {
throw new NoSuchElementException();
}

lastIterator.remove();
lastIterator = null;
}
}


@Override
public @NotNull Iterator<T> iterator() {
return new Itr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,18 @@ public void mainThreadHeartbeat() {

private synchronized void runTasks(int currentTick) {
parsePending();
while (!this.pending.isEmpty() && this.pending.peek().getNextRun() <= currentTick) {
CraftTask task = this.pending.remove();
List<CraftTask> tasks = this.pending.popValid(currentTick);
for (CraftTask task : tasks) {
if (executeTask(task)) {
final long period = task.getPeriod();
if (period > 0) {
task.setNextRun(currentTick + period);
temp.add(task);
}
}
parsePending();
}
parsePending();

this.pending.addAll(temp);
temp.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
import java.util.logging.Level;
import io.papermc.paper.util.concurrent.TimingWheel;
import org.bukkit.plugin.IllegalPluginAccessException;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitRunnable;
Expand Down Expand Up @@ -75,16 +76,7 @@ public class CraftScheduler implements BukkitScheduler {
/**
* Main thread logic only
*/
final PriorityQueue<CraftTask> pending = new PriorityQueue<CraftTask>(10, // Paper
new Comparator<CraftTask>() {
@Override
public int compare(final CraftTask o1, final CraftTask o2) {
int value = Long.compare(o1.getNextRun(), o2.getNextRun());

// If the tasks should run on the same tick they should be run FIFO
return value != 0 ? value : Long.compare(o1.getCreatedAt(), o2.getCreatedAt());
}
});
final TimingWheel<CraftTask> pending = new TimingWheel<>(12);
/**
* Main thread logic only
*/
Expand Down Expand Up @@ -459,8 +451,8 @@ public void mainThreadHeartbeat() {
// Paper end
final List<CraftTask> temp = this.temp;
this.parsePending();
while (this.isReady(this.currentTick)) {
final CraftTask task = this.pending.remove();
final List<CraftTask> tasks = this.pending.popValid(this.currentTick);
for (CraftTask task : tasks) {
if (task.getPeriod() < CraftTask.NO_REPEATING) {
if (task.isSync()) {
this.runners.remove(task.getTaskId(), task);
Expand Down Expand Up @@ -564,7 +556,7 @@ void parsePending() { // Paper
}

private boolean isReady(final int currentTick) {
return !this.pending.isEmpty() && this.pending.peek().getNextRun() <= currentTick;
return this.pending.isReady(currentTick);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import java.util.function.Consumer;

import io.papermc.paper.util.concurrent.TickBoundTask;
import org.bukkit.Bukkit;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitTask;

public class CraftTask implements BukkitTask, Runnable { // Spigot
public class CraftTask implements BukkitTask, Runnable, TickBoundTask {

private volatile CraftTask next = null;
public static final int ERROR = 0;
Expand Down Expand Up @@ -93,7 +94,7 @@ void setPeriod(long period) {
this.period = period;
}

long getNextRun() {
public long getNextRun() {
return this.nextRun;
}

Expand Down
Loading