Threading stuff, locking stuff, hopefully fix for section tracker data race??

This commit is contained in:
mcrcortex
2025-05-04 22:23:10 +10:00
parent da36c6abd1
commit 636b680c87
11 changed files with 195 additions and 136 deletions

View File

@@ -36,8 +36,12 @@ public class VoxyConfig implements OptionStorage<VoxyConfig> {
if (Files.exists(path)) {
try (FileReader reader = new FileReader(path.toFile())) {
var conf = GSON.fromJson(reader, VoxyConfig.class);
if (conf != null) {
conf.save();
return conf;
} else {
Logger.error("Failed to load voxy config, resetting");
}
} catch (IOException e) {
Logger.error("Could not parse config",e);
}

View File

@@ -2,6 +2,7 @@ package me.cortex.voxy.client.core.model;
import it.unimi.dsi.fastutil.ints.IntLinkedOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import me.cortex.voxy.client.TimingStatistics;
import me.cortex.voxy.client.core.gl.GlFramebuffer;
import me.cortex.voxy.client.core.rendering.building.BuiltSection;
@@ -16,6 +17,7 @@ import java.lang.invoke.VarHandle;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.StampedLock;
import static org.lwjgl.opengl.ARBFramebufferObject.GL_COLOR_ATTACHMENT0;
import static org.lwjgl.opengl.GL11.GL_COLOR_BUFFER_BIT;
@@ -103,7 +105,16 @@ public class ModelBakerySubsystem {
this.storage.free();
}
//This is on this side only and done like this as only worker threads call this code
private final StampedLock seenIdsLock = new StampedLock();
private final IntOpenHashSet seenIds = new IntOpenHashSet(6000);
public void requestBlockBake(int blockId) {
long stamp = this.seenIdsLock.writeLock();
if (!this.seenIds.add(blockId)) {
this.seenIdsLock.unlockWrite(stamp);
return;
}
this.seenIdsLock.unlockWrite(stamp);
this.blockIdQueue.add(blockId);
this.blockIdCount.incrementAndGet();
}

View File

@@ -80,7 +80,7 @@ public class RenderService<T extends AbstractSectionRenderer<J, ?>, J extends Vi
this.viewportSelector = new ViewportSelector<>(this.sectionRenderer::createViewport);
this.renderGen = new RenderGenerationService(world, this.modelService, serviceThreadPool,
this.geometryUpdateQueue::push, this.sectionRenderer.getGeometryManager() instanceof IUsesMeshlets,
()->this.geometryUpdateQueue.count()<1000 && this.modelService.getProcessingCount()< 1000);
()->this.geometryUpdateQueue.count()<1000);
router.setCallbacks(this.renderGen::enqueueTask, section -> {
section.acquire();

View File

@@ -30,6 +30,7 @@ import java.util.function.Supplier;
//TODO: to add remove functionallity add a "defunked" variable to the build task and set it to true on remove
// and process accordingly
public class RenderGenerationService {
private static final int MAX_HOLDING_SECTION_COUNT = 1000;
private static final AtomicInteger COUNTER = new AtomicInteger();
private static final class BuildTask {
WorldSection section;
@@ -51,6 +52,8 @@ public class RenderGenerationService {
}
}
private final AtomicInteger holdingSectionCount = new AtomicInteger();//Used to limit section holding
private final AtomicInteger taskQueueCount = new AtomicInteger();
private final PriorityBlockingQueue<BuildTask> taskQueue = new PriorityBlockingQueue<>(320000, (a,b)-> Long.compareUnsigned(a.priority, b.priority));
private final StampedLock taskMapLock = new StampedLock();
@@ -181,6 +184,9 @@ public class RenderGenerationService {
if (task.hasDoneModelRequestOuter) {
other.hasDoneModelRequestOuter = true;
}
if (task.section != null) {
this.holdingSectionCount.decrementAndGet();
}
task.section = null;
shouldFreeSection = true;
task = null;
@@ -227,8 +233,15 @@ public class RenderGenerationService {
}
//Keep the lock on the section, and attach it to the task, this prevents needing to re-aquire it later
if (task.section == null) {
if (this.holdingSectionCount.get() < MAX_HOLDING_SECTION_COUNT) {
this.holdingSectionCount.incrementAndGet();
task.section = section;
shouldFreeSection = false;
}
} else {
shouldFreeSection = false;
}
task.updatePriority();
this.taskQueue.add(task);
@@ -241,6 +254,9 @@ public class RenderGenerationService {
}
if (shouldFreeSection) {
if (task != null && task.section != null) {
this.holdingSectionCount.decrementAndGet();
}
section.release();
}
@@ -285,6 +301,7 @@ public class RenderGenerationService {
var task = this.taskQueue.remove();
if (task.section != null) {
task.section.release();
this.holdingSectionCount.decrementAndGet();
}
if (this.taskMap.remove(task.position) != task) {
throw new IllegalStateException();
@@ -304,6 +321,7 @@ public class RenderGenerationService {
this.taskQueueCount.decrementAndGet();
if (task.section != null) {
task.section.release();
this.holdingSectionCount.decrementAndGet();
}
long stamp = this.taskMapLock.writeLock();

View File

@@ -6,7 +6,6 @@ public abstract class ScanMesher2D {
private static final int MAX_SIZE = 16;
// is much faster if implemented inline into parent
private final long[] rowData = new long[32];
private final int[] rowLength = new int[32];//How long down does a row entry go
@@ -28,7 +27,7 @@ public abstract class ScanMesher2D {
//If the previous data is not zero, that means it was not merge-able, so emit it at the pos
if (this.currentData!=0) {
if ((this.rowBitset&(1<<31))!=0) {
emitQuad(31, ((this.currentIndex-1)>>5)-1, this.rowLength[31], this.rowDepth[31], this.rowData[31]);
this.emitQuad(31, ((this.currentIndex-1)>>5)-1, this.rowLength[31], this.rowDepth[31], this.rowData[31]);
}
this.rowBitset |= 1<<31;
this.rowLength[31] = this.currentSum;
@@ -84,14 +83,14 @@ public abstract class ScanMesher2D {
private void emitRanged(int msk) {
{//Emit quads that cover the previous indices
int rowSet = this.rowBitset&msk;
this.rowBitset &= ~msk;
while (rowSet!=0) {//Need to emit quads that would have skipped, note that this does not include the current index
int index = Integer.numberOfTrailingZeros(rowSet);
rowSet &= ~Integer.lowestOneBit(rowSet);
//Emit the quad, dont need to clear the data since it not existing in the bitmask is implicit no data
this.emitQuad(index, ((this.currentIndex-1)>>5)-1, this.rowLength[index], this.rowDepth[index], this.rowData[index]);
this.emitQuad(index, (this.currentIndex>>5)-1, this.rowLength[index], this.rowDepth[index], this.rowData[index]);
}
this.rowBitset &= ~msk;
}
}
@@ -107,10 +106,14 @@ public abstract class ScanMesher2D {
this.currentIndex += count;
*/
if (count == 0) return;
if (this.currentData!=0) {
this.putNext(0);
if (1<count) {
this.emitRanged(((1 << (Math.min(count, 32)-1)) - 1) << (this.currentIndex & 31));
this.currentIndex += count - 1;
count--;
}
if (0<count) {
int msk = (int) ((1L<<Math.min(32, count))-1) << (this.currentIndex & 31);
this.emitRanged(msk);
this.currentIndex += count;
}
}

View File

@@ -1,6 +1,7 @@
package me.cortex.voxy.client.mixin.minecraft;
import me.cortex.voxy.client.GPUSelectorWindows2;
import me.cortex.voxy.common.util.ThreadUtils;
import net.minecraft.client.WindowEventHandler;
import net.minecraft.client.WindowSettings;
import net.minecraft.client.util.MonitorTracker;
@@ -19,5 +20,8 @@ public class MixinWindow {
if (!prop.equals("NO")) {
GPUSelectorWindows2.doSelector(Integer.parseInt(prop));
}
//Force the current thread priority to be realtime
ThreadUtils.SetSelfThreadPriorityWin32(ThreadUtils.WIN32_THREAD_PRIORITY_TIME_CRITICAL);
}
}

View File

@@ -2,6 +2,7 @@ package me.cortex.voxy.common.thread;
import me.cortex.voxy.common.Logger;
import me.cortex.voxy.common.util.Pair;
import me.cortex.voxy.common.util.ThreadUtils;
import java.lang.invoke.VarHandle;
import java.lang.management.ManagementFactory;
@@ -17,12 +18,6 @@ import java.util.function.Supplier;
//TODO: could also probably replace all of this with just VirtualThreads and a Executors.newThreadPerTaskExecutor with a fixed thread pool
// it is probably better anyway
public class ServiceThreadPool {
private static final ThreadMXBean THREAD_BEAN = ManagementFactory.getThreadMXBean();
static {
THREAD_BEAN.setThreadCpuTimeEnabled(true);
}
private volatile boolean running = true;
private volatile boolean releaseNow = false;
private Thread[] workers = new Thread[0];
@@ -138,20 +133,25 @@ public class ServiceThreadPool {
}
private void worker(int threadId) {
long seed = 1234342;
int revolvingSelector = 0;
ThreadUtils.SetSelfThreadPriorityWin32(ThreadUtils.WIN32_THREAD_PRIORITY_LOWEST);
//ThreadUtils.SetSelfThreadPriorityWin32(ThreadUtils.WIN32_THREAD_MODE_BACKGROUND_BEGIN);
long[] seed = new long[]{1234342^(threadId*124987198651981L+215987981111L)};
int[] revolvingSelector = new int[1];
double rollRuntimeRatio = 0;
double rollCpuTimeDelta = 0;
while (true) {
this.jobCounter.acquireUninterruptibly();
if (!this.running) {
break;
}
//This is because of JIT moment (it cant really replace methods while they are executing afak)
this.worker_work(threadId, seed, revolvingSelector);
}
}
private void worker_work(int threadId, long[] seedIO, int[] revolvingSelectorIO) {
final int ATTEMPT_COUNT = 50;
int attempts = ATTEMPT_COUNT;
outer:
while (true) {
if (attempts < ATTEMPT_COUNT-2) {
try {
@@ -184,9 +184,10 @@ public class ServiceThreadPool {
this.jobCounter.release();//Release the job we acquired
break;
}
seed = (seed ^ seed >>> 30) * -4658895280553007687L;
seed = (seed ^ seed >>> 27) * -7723592293110705685L;
long seed = seedIO[0]*1984691871L+1497210975L;
seed = (seed ^ (seed >>> 30)) * -4658895280553007687L;
seed = (seed ^ (seed >>> 27)) * -7723592293110705685L;
seedIO[0] = seed;
long clamped = seed&((1L<<63)-1);
long weight = this.totalJobWeight.get();
if (weight == 0) {
@@ -194,16 +195,28 @@ public class ServiceThreadPool {
break;
}
ServiceSlice service = ref[0];
ServiceSlice service = null;
for (int i = 0; i < ref.length; i++) {
service = ref[(int) ((clamped+i) % ref.length)];
if (service.workConditionMet()) {
var service2 = ref[(int) ((clamped+i) % ref.length)];
if (service2.hasJobs() && service2.workConditionMet()) {
service = service2;
break;
}
}
if (service == null) {
Logger.warn("No available jobs, sleeping releasing returning");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.jobCounter.release();
break;
}
//1 in 64 chance just to pick a service that has a task, in a cycling manor, this is to keep at least one service from overloading all services constantly
if (((seed>>10)&63) == 0) {
int revolvingSelector = revolvingSelectorIO[0];
for (int i = 0; i < ref.length; i++) {
int idx = (i+revolvingSelector)%ref.length;
var slice = ref[idx];
@@ -213,7 +226,7 @@ public class ServiceThreadPool {
break;
}
}
revolvingSelectorIO[0] = revolvingSelector;
} else {
long chosenNumber = clamped % weight;
for (var slice : ref) {
@@ -224,40 +237,12 @@ public class ServiceThreadPool {
}
}
}
/*
VarHandle.fullFence();
long realTimeStart = System.nanoTime();
long cpuTimeStart = THREAD_BEAN.getCurrentThreadCpuTime();
VarHandle.fullFence();
*/
//Run the job
if (!service.doRun(threadId)) {
//Didnt consume the job, find a new job
continue;
}
/*
VarHandle.fullFence();
long cpuTimeEnd = THREAD_BEAN.getCurrentThreadCpuTime();
long realTimeEnd = System.nanoTime();
VarHandle.fullFence();
long realTimeDelta = realTimeEnd - realTimeStart;
long cpuTimeDelta = cpuTimeEnd - cpuTimeStart;
//Realtime should always be bigger or equal to cpu time
double runtimeRatio = ((double)cpuTimeDelta)/((double)realTimeDelta);
rollRuntimeRatio = (rollRuntimeRatio*0.95)+runtimeRatio*0.05;
rollCpuTimeDelta = (rollCpuTimeDelta*0.95)+cpuTimeDelta*0.05;
//Attempt to self balance cpu load
VarHandle.fullFence();
try {
if (rollRuntimeRatio > 0.8) {
Thread.sleep(Math.max((long) ((rollRuntimeRatio - 0.5) * (rollCpuTimeDelta / (1000 * 1000))), 1));
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}*/
//Consumed a job from the service, decrease weight by the amount
if (this.totalJobWeight.addAndGet(-service.weightPerJob)<0) {
@@ -266,7 +251,6 @@ public class ServiceThreadPool {
break;
}
}
}
private void handleUncaughtException(Thread thread, Throwable throwable) {
System.err.println("Service worker thread has exploded unexpectedly! this is really not good very very bad.");

View File

@@ -0,0 +1,32 @@
package me.cortex.voxy.common.util;
import org.lwjgl.system.JNI;
import org.lwjgl.system.Platform;
import org.lwjgl.system.windows.Kernel32;
//Platform specific code to assist in thread utilities
public class ThreadUtils {
public static final int WIN32_THREAD_PRIORITY_TIME_CRITICAL = 15;
public static final int WIN32_THREAD_PRIORITY_LOWEST = -2;
public static final int WIN32_THREAD_MODE_BACKGROUND_BEGIN = 0x00010000;
public static final int WIN32_THREAD_MODE_BACKGROUND_END = 0x00020000;
private static final boolean isWindows = Platform.get() == Platform.WINDOWS;
private static final long SetThreadPriority;
static {
if (isWindows) {
SetThreadPriority = Kernel32.getLibrary().getFunctionAddress("SetThreadPriority");
} else {
SetThreadPriority = 0;
}
}
public static boolean SetSelfThreadPriorityWin32(int priority) {
if (SetThreadPriority == 0 || !isWindows) {
return false;
}
if (JNI.callPI(Kernel32.GetCurrentThread(), priority, SetThreadPriority)==0) {
throw new IllegalStateException("Operation failed");
}
return true;
}
}

View File

@@ -125,14 +125,18 @@ public class ActiveSectionTracker {
}
section.acquire();
VarHandle.fullFence();//Do not reorder setting this object
holder.obj = section;
VarHandle.fullFence();
if (nullOnEmpty && status == 1) {//If its air return null as stated, release the section aswell
section.release();
return null;
}
return section;
} else {
VarHandle.fullFence();
while ((section = holder.obj) == null) {
VarHandle.fullFence();
Thread.onSpinWait();
Thread.yield();
}

View File

@@ -118,7 +118,7 @@ public final class WorldSection {
}
public int acquire(int count) {
int state =((int) ATOMIC_STATE_HANDLE.getAndAdd(this, count<<1)) + (count<<1);
int state = ((int) ATOMIC_STATE_HANDLE.getAndAdd(this, count<<1)) + (count<<1);
if ((state & 1) == 0) {
throw new IllegalStateException("Tried to acquire unloaded section");
}

View File

@@ -37,8 +37,7 @@ public class VoxyInstance {
public void addDebug(List<String> debug) {
debug.add("Voxy Core: " + VoxyCommon.MOD_VERSION);
debug.add("MemoryBuffer, Count/Size (mb): " + MemoryBuffer.getCount() + "/" + (MemoryBuffer.getTotalSize()/1_000_000));
debug.add("I/S: " + this.ingestService.getTaskCount() + "/" + this.savingService.getTaskCount());
debug.add("AWSC: [" + this.activeWorlds.stream().map(a->""+a.getActiveSectionCount()).collect(Collectors.joining(", ")) + "]");//Active world section count
debug.add("I/S/AWSC: " + this.ingestService.getTaskCount() + "/" + this.savingService.getTaskCount() + "/[" + this.activeWorlds.stream().map(a->""+a.getActiveSectionCount()).collect(Collectors.joining(", ")) + "]");//Active world section count
}
public void shutdown() {