Rewrite the service threading system tobe thread "pool indepenent" aswell as a system for allowing unified thread pools

This commit is contained in:
mcrcortex
2025-10-25 16:36:17 +10:00
parent cb084e116e
commit 146cffc8d9
16 changed files with 884 additions and 76 deletions

View File

@@ -27,7 +27,10 @@ public class VoxyClientInstance extends VoxyInstance {
private final Path basePath; private final Path basePath;
private final boolean noIngestOverride; private final boolean noIngestOverride;
public VoxyClientInstance() { public VoxyClientInstance() {
super(VoxyConfig.CONFIG.serviceThreads); super();
this.threadPool.setNumThreads(VoxyConfig.CONFIG.serviceThreads);
var path = FlashbackCompat.getReplayStoragePath(); var path = FlashbackCompat.getReplayStoragePath();
this.noIngestOverride = path != null; this.noIngestOverride = path != null;
if (path == null) { if (path == null) {

View File

@@ -103,7 +103,7 @@ public class VoxyCommands {
var engine = WorldIdentifier.ofEngine(MinecraftClient.getInstance().world); var engine = WorldIdentifier.ofEngine(MinecraftClient.getInstance().world);
if (engine==null)return 1; if (engine==null)return 1;
return instance.getImportManager().makeAndRunIfNone(engine, ()-> return instance.getImportManager().makeAndRunIfNone(engine, ()->
new DHImporter(dbFile_, engine, MinecraftClient.getInstance().world, instance.getThreadPool(), instance.savingServiceRateLimiter))?0:1; new DHImporter(dbFile_, engine, MinecraftClient.getInstance().world, instance.getServiceManager(), instance.savingServiceRateLimiter))?0:1;
} }
private static boolean fileBasedImporter(File directory) { private static boolean fileBasedImporter(File directory) {
@@ -115,7 +115,7 @@ public class VoxyCommands {
var engine = WorldIdentifier.ofEngine(MinecraftClient.getInstance().world); var engine = WorldIdentifier.ofEngine(MinecraftClient.getInstance().world);
if (engine==null) return false; if (engine==null) return false;
return instance.getImportManager().makeAndRunIfNone(engine, ()->{ return instance.getImportManager().makeAndRunIfNone(engine, ()->{
var importer = new WorldImporter(engine, MinecraftClient.getInstance().world, instance.getThreadPool(), instance.savingServiceRateLimiter); var importer = new WorldImporter(engine, MinecraftClient.getInstance().world, instance.getServiceManager(), instance.savingServiceRateLimiter);
importer.importRegionDirectoryAsync(directory); importer.importRegionDirectoryAsync(directory);
return importer; return importer;
}); });
@@ -224,7 +224,7 @@ public class VoxyCommands {
var engine = WorldIdentifier.ofEngine(MinecraftClient.getInstance().world); var engine = WorldIdentifier.ofEngine(MinecraftClient.getInstance().world);
if (engine != null) { if (engine != null) {
return instance.getImportManager().makeAndRunIfNone(engine, () -> { return instance.getImportManager().makeAndRunIfNone(engine, () -> {
var importer = new WorldImporter(engine, MinecraftClient.getInstance().world, instance.getThreadPool(), instance.savingServiceRateLimiter); var importer = new WorldImporter(engine, MinecraftClient.getInstance().world, instance.getServiceManager(), instance.savingServiceRateLimiter);
importer.importZippedRegionDirectoryAsync(zip, finalInnerDir); importer.importZippedRegionDirectoryAsync(zip, finalInnerDir);
return importer; return importer;
}) ? 0 : 1; }) ? 0 : 1;

View File

@@ -29,6 +29,7 @@ import me.cortex.voxy.client.core.util.GPUTiming;
import me.cortex.voxy.client.core.util.IrisUtil; import me.cortex.voxy.client.core.util.IrisUtil;
import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.Logger;
import me.cortex.voxy.common.thread.ServiceThreadPool; import me.cortex.voxy.common.thread.ServiceThreadPool;
import me.cortex.voxy.common.thread3.ServiceManager;
import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.WorldEngine;
import me.cortex.voxy.commonImpl.VoxyCommon; import me.cortex.voxy.commonImpl.VoxyCommon;
import net.caffeinemc.mods.sodium.client.render.chunk.ChunkRenderMatrices; import net.caffeinemc.mods.sodium.client.render.chunk.ChunkRenderMatrices;
@@ -73,7 +74,7 @@ public class VoxyRenderSystem {
return new MDICSectionRenderer(pipeline, modelStore, (BasicSectionGeometryData) geometryData);//We only have MDIC backend... for now return new MDICSectionRenderer(pipeline, modelStore, (BasicSectionGeometryData) geometryData);//We only have MDIC backend... for now
} }
public VoxyRenderSystem(WorldEngine world, ServiceThreadPool threadPool) { public VoxyRenderSystem(WorldEngine world, ServiceManager sm) {
//Keep the world loaded, NOTE: this is done FIRST, to keep and ensure that even if the rest of loading takes more //Keep the world loaded, NOTE: this is done FIRST, to keep and ensure that even if the rest of loading takes more
// than timeout, we keep the world acquired // than timeout, we keep the world acquired
world.acquireRef(); world.acquireRef();
@@ -96,7 +97,7 @@ public class VoxyRenderSystem {
this.modelService = new ModelBakerySubsystem(world.getMapper()); this.modelService = new ModelBakerySubsystem(world.getMapper());
this.renderGen = new RenderGenerationService(world, this.modelService, threadPool, false, () -> true); this.renderGen = new RenderGenerationService(world, this.modelService, sm, false);
this.geometryData = new BasicSectionGeometryData(1 << 20, geometryCapacity); this.geometryData = new BasicSectionGeometryData(1 << 20, geometryCapacity);

View File

@@ -5,7 +5,8 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import me.cortex.voxy.client.core.model.IdNotYetComputedException; import me.cortex.voxy.client.core.model.IdNotYetComputedException;
import me.cortex.voxy.client.core.model.ModelBakerySubsystem; import me.cortex.voxy.client.core.model.ModelBakerySubsystem;
import me.cortex.voxy.common.thread.ServiceSlice; import me.cortex.voxy.common.thread.ServiceSlice;
import me.cortex.voxy.common.thread.ServiceThreadPool; import me.cortex.voxy.common.thread3.Service;
import me.cortex.voxy.common.thread3.ServiceManager;
import me.cortex.voxy.common.util.Pair; import me.cortex.voxy.common.util.Pair;
import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.WorldEngine;
import me.cortex.voxy.common.world.WorldSection; import me.cortex.voxy.common.world.WorldSection;
@@ -15,7 +16,6 @@ import java.util.List;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.StampedLock; import java.util.concurrent.locks.StampedLock;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer; import java.util.function.Consumer;
//TODO: Add a render cache //TODO: Add a render cache
@@ -60,26 +60,27 @@ public class RenderGenerationService {
private Consumer<BuiltSection> resultConsumer; private Consumer<BuiltSection> resultConsumer;
private final boolean emitMeshlets; private final boolean emitMeshlets;
private final ServiceSlice threads; private final Service service;
public RenderGenerationService(WorldEngine world, ModelBakerySubsystem modelBakery, ServiceThreadPool serviceThreadPool, boolean emitMeshlets) { /*
this(world, modelBakery, serviceThreadPool, emitMeshlets, ()->true); public RenderGenerationService(WorldEngine world, ModelBakerySubsystem modelBakery, ServiceManager sm, boolean emitMeshlets) {
} this(world, modelBakery, sm, emitMeshlets, ()->true);
}*/
public RenderGenerationService(WorldEngine world, ModelBakerySubsystem modelBakery, ServiceThreadPool serviceThreadPool, boolean emitMeshlets, BooleanSupplier taskLimiter) { public RenderGenerationService(WorldEngine world, ModelBakerySubsystem modelBakery, ServiceManager sm, boolean emitMeshlets) {
this.emitMeshlets = emitMeshlets; this.emitMeshlets = emitMeshlets;
this.world = world; this.world = world;
this.modelBakery = modelBakery; this.modelBakery = modelBakery;
this.threads = serviceThreadPool.createService("Section mesh generation service", 100, ()->{ this.service = sm.createService(()->{
//Thread local instance of the factory //Thread local instance of the factory
var factory = new RenderDataFactory(this.world, this.modelBakery.factory, this.emitMeshlets); var factory = new RenderDataFactory(this.world, this.modelBakery.factory, this.emitMeshlets);
IntOpenHashSet seenMissed = new IntOpenHashSet(128); IntOpenHashSet seenMissed = new IntOpenHashSet(128);
return new Pair<>(() -> { return new Pair<>(() -> {
this.processJob(factory, seenMissed); this.processJob(factory, seenMissed);
}, factory::free); }, factory::free);
}, taskLimiter); }, 100, "Section mesh generation service");
} }
public void setResultConsumer(Consumer<BuiltSection> consumer) { public void setResultConsumer(Consumer<BuiltSection> consumer) {
@@ -258,8 +259,8 @@ public class RenderGenerationService {
this.taskQueue.add(task); this.taskQueue.add(task);
this.taskQueueCount.incrementAndGet(); this.taskQueueCount.incrementAndGet();
if (this.threads.isAlive()) {//Only execute if were not dead if (this.service.isLive()) {//Only execute if were not dead
this.threads.execute();//Since we put in queue, release permit this.service.execute();//Since we put in queue, release permit
} }
} }
} }
@@ -282,7 +283,7 @@ public class RenderGenerationService {
public void enqueueTask(long pos) { public void enqueueTask(long pos) {
if (!this.threads.isAlive()) { if (!this.service.isLive()) {
return; return;
} }
boolean[] isOurs = new boolean[1]; boolean[] isOurs = new boolean[1];
@@ -298,7 +299,7 @@ public class RenderGenerationService {
task.updatePriority(); task.updatePriority();
this.taskQueue.add(task); this.taskQueue.add(task);
this.taskQueueCount.incrementAndGet(); this.taskQueueCount.incrementAndGet();
this.threads.execute(); this.service.execute();
} }
} }
@@ -310,8 +311,8 @@ public class RenderGenerationService {
public void shutdown() { public void shutdown() {
//Steal and free as much work as possible //Steal and free as much work as possible
while (this.threads.hasJobs()) { while (this.service.numJobs() != 0) {
int i = this.threads.drain(); int i = this.service.drain();
if (i == 0) break; if (i == 0) break;
{ {
long stamp = this.taskMapLock.writeLock(); long stamp = this.taskMapLock.writeLock();
@@ -331,7 +332,7 @@ public class RenderGenerationService {
} }
//Shutdown the threads //Shutdown the threads
this.threads.shutdown(); this.service.shutdown();
//Cleanup any remaining data //Cleanup any remaining data
while (!this.taskQueue.isEmpty()) { while (!this.taskQueue.isEmpty()) {

View File

@@ -80,7 +80,7 @@ public abstract class MixinWorldRenderer implements IGetVoxyRenderSystem {
return; return;
} }
try { try {
this.renderer = new VoxyRenderSystem(world, instance.getThreadPool()); this.renderer = new VoxyRenderSystem(world, instance.getServiceManager());
} catch (RuntimeException e) { } catch (RuntimeException e) {
if (IrisUtil.irisShaderPackEnabled()) { if (IrisUtil.irisShaderPackEnabled()) {
IrisUtil.disableIrisShaders(); IrisUtil.disableIrisShaders();

View File

@@ -0,0 +1,100 @@
package me.cortex.voxy.common.thread3;
import me.cortex.voxy.common.util.TrackedObject;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
//Basiclly acts as a priority based mutlti semaphore
// allows the pooling of multiple threadpools together while prioritizing the work the original was ment for
public class MultiThreadPrioritySemaphore {
public static final class Block extends TrackedObject {
private final Semaphore blockSemaphore = new Semaphore(0);//The work pool semaphore
private final Semaphore localSemaphore = new Semaphore(0);//The local semaphore
//private final AtomicInteger debt = new AtomicInteger();//the debt of the work pool semphore with respect to the usage
private final MultiThreadPrioritySemaphore man;
Block(MultiThreadPrioritySemaphore man) {
this.man = man;
}
public void release(int permits) {
//release local then block to prevent race conditions
this.localSemaphore.release(permits);
this.blockSemaphore.release(permits);
}
public void acquire() {//Block until a permit for this block is availbe, other jobs maybe executed while we wait
while (true) {
this.blockSemaphore.acquireUninterruptibly();//Block on all
if (this.localSemaphore.tryAcquire()) {//We prioritize locals first
return;
}
//It wasnt a local job so run
this.man.tryRun(this);
}
}
public void free() {
this.man.freeBlock(this);
this.free0();
}
}
private final Semaphore pooledSemaphore = new Semaphore(0);
private final Runnable executor;
private volatile Block[] blocks = new Block[0];
public MultiThreadPrioritySemaphore(Runnable executor) {
this.executor = executor;
}
public synchronized Block createBlock() {
var block = new Block(this);
var blocks = Arrays.copyOf(this.blocks, this.blocks.length+1);
blocks[blocks.length-1] = block;
this.blocks = blocks;
return block;
}
private synchronized void freeBlock(Block block) {
var ob = this.blocks;
var blocks = new Block[ob.length-1];
int j = 0;
for (int i = 0; i <= blocks.length; i++) {
if (ob[i] != block) {
blocks[j++] = ob[i];
}
}
if (j != blocks.length) {
throw new IllegalStateException("Could not find the service in the services array");
}
this.blocks = blocks;
}
public void pooledRelease(int permits) {
this.pooledSemaphore.release(permits);
for (var block : this.blocks) {
block.blockSemaphore.release(permits);
}
}
private boolean tryRun(Block block) {
if (!this.pooledSemaphore.tryAcquire()) {//No jobs for the unified pool
return false;
}
/*
for (var otherBlock : this.blocks) {
if (otherBlock != block) {
block.debt.incrementAndGet();
}
}*/
//Run the pooled job
this.executor.run();
return true;
}
}

View File

@@ -0,0 +1,161 @@
package me.cortex.voxy.common.thread3;
import me.cortex.voxy.common.Logger;
import me.cortex.voxy.common.util.Pair;
import me.cortex.voxy.common.util.TrackedObject;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
public class PerThreadContextExecutor extends TrackedObject {
private static final class ThreadContext {
private final Runnable execute;
private final Runnable cleanup;
private ThreadContext(Pair<Runnable, Runnable> wrap) {
this(wrap.left(), wrap.right());
}
private ThreadContext(Runnable execute, Runnable cleanup) {
this.execute = execute;
this.cleanup = cleanup;
}
}
private static record ThreadObj(long id) implements LongSupplier {
private static final AtomicLong IDENTIFIER = new AtomicLong();
public ThreadObj() {
this(IDENTIFIER.getAndIncrement());
}
@Override
public long getAsLong() {
return this.id;
}
}
private static final ThreadLocal<ThreadObj> THREAD_CTX = ThreadLocal.withInitial(ThreadObj::new);
private final WeakConcurrentCleanableHashMap<ThreadObj, ThreadContext> contexts = new WeakConcurrentCleanableHashMap<>(this::ctxCleaner); //TODO: a custom weak concurrent hashmap that can enqueue values when the value is purged
private final Supplier<ThreadContext> contextFactory;
private final Consumer<Exception> exceptionHandler;
private final AtomicInteger currentRunning = new AtomicInteger();
private volatile boolean isLive = true;
PerThreadContextExecutor(Supplier<Pair<Runnable, Runnable>> ctxFactory) {
this(ctxFactory, (e)->{
Logger.error("Executor had the following exception",e);
});
}
PerThreadContextExecutor(Supplier<Pair<Runnable, Runnable>> ctxFactory, Consumer<Exception> exceptionHandler) {
this.contextFactory = ()->new ThreadContext(ctxFactory.get());
this.exceptionHandler = exceptionHandler;
}
private void ctxCleaner(ThreadContext ctx) {
try {
ctx.cleanup.run();
} catch (Exception e) {
this.exceptionHandler.accept(e);
}
}
boolean run() {
this.currentRunning.incrementAndGet();
if (!this.isLive) {
this.currentRunning.decrementAndGet();
this.exceptionHandler.accept(new IllegalStateException("Executor is in shutdown"));
return false;
}
var ctx = this.contexts.computeIfAbsent(THREAD_CTX.get(), this.contextFactory);
try {
ctx.execute.run();
} catch (Exception e) {
this.exceptionHandler.accept(e);
}
this.currentRunning.decrementAndGet();
return true;
}
public void shutdown() {
if (!this.isLive) {
throw new IllegalStateException("Tried shutting down a executor twice");
}
this.isLive = false;
while (this.currentRunning.get() != 0) {
Thread.onSpinWait();//TODO: maybe add a sleep or something
}
for (var ctx : this.contexts.clear()) {
ctx.cleanup.run();
}
this.free0();
}
@Override
public void free() {
this.shutdown();
}
public boolean isLive() {
return this.isLive;
}
private static void inner(PerThreadContextExecutor s) throws InterruptedException {
Thread[] t = new Thread[1<<8];
Random r = new Random(19874396);
for (int i = 0; i<t.length; i++) {
long rs = r.nextLong();
t[i] = new Thread(()->{
s.run();
Random lr = new Random(rs);
while (lr.nextFloat()<0.9) {
s.run();
try {
Thread.sleep((long) (100*lr.nextFloat()));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t[i].start();
}
for (var tt : t) {
tt.join();
}
}
public static void main(String[] args) throws InterruptedException {
AtomicInteger cc = new AtomicInteger();
var s = new PerThreadContextExecutor(()->{
AtomicBoolean cleaned = new AtomicBoolean();
int[] a = new int[1];
return new Pair<>(()->{
if (cleaned.get()) {
System.err.println("TRIED EXECUTING CLEANED CTX");
} else {
a[0]++;
}
}, ()->{
if (cleaned.getAndSet(true)) {
System.err.println("TRIED DOUBLE CLEANING A VALUE");
} else {
System.out.println("Cleaned ref, exec: " + a[0]);
cc.incrementAndGet();
}
});
});
inner(s);
System.gc();
s.shutdown();
System.err.println(cc.get());
}
}

View File

@@ -0,0 +1,101 @@
package me.cortex.voxy.common.thread3;
import me.cortex.voxy.common.Logger;
import me.cortex.voxy.common.util.Pair;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
public class Service {
private final PerThreadContextExecutor executor;
private final ServiceManager sm;
final long weight;
final String name;
final BooleanSupplier limiter;
private final Semaphore tasks = new Semaphore(0);
private volatile boolean isLive = true;
private volatile boolean isStopping = false;
Service(Supplier<Pair<Runnable, Runnable>> ctxSupplier, ServiceManager sm, long weight, String name, BooleanSupplier limiter) {
this.sm = sm;
this.weight = weight;
this.name = name;
this.limiter = limiter;
this.executor = new PerThreadContextExecutor(ctxSupplier, e->sm.handleException(this, e));
}
public void execute() {
if (this.isStopping) {
Logger.error("Tried executing on a dead service");
return;
}
this.tasks.release();
this.sm.execute(this);
}
boolean runJob() {
if (this.isStopping||!this.isLive) {
return false;
}
if (!this.tasks.tryAcquire()) {
//Failed to get the job, probably due to a race condition
return false;
}
if (!this.executor.run()) {//Run the job
throw new IllegalStateException("Executor failed to run");
}
return true;
}
public boolean isLive() {
return this.isLive&&!this.isStopping;
}
public int numJobs() {
return this.tasks.availablePermits();
}
public void blockTillEmpty() {
while (this.isLive() && this.numJobs() != 0) {
Thread.yield();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public int shutdown() {
if (this.isStopping) {
throw new IllegalStateException("Service not live");
}
this.isStopping = true;//First mark the service as stopping
this.sm.removeService(this);//Remove the service this is so that new jobs are never executed
this.executor.shutdown();//Await shutdown of all running jobs
int remaining = this.tasks.drainPermits();//Drain the remaining tasks to 0
this.isLive = false;//Mark the service as dead
this.sm.remJobs(remaining);
return remaining;
}
public boolean steal() {
if (!this.tasks.tryAcquire()) {
return false;
}
this.sm.remJobs(1);
return true;
}
public int drain() {
int tasks = this.tasks.drainPermits();
if (tasks != 0) {
this.sm.remJobs(tasks);
}
return tasks;
}
}

View File

@@ -0,0 +1,180 @@
package me.cortex.voxy.common.thread3;
import it.unimi.dsi.fastutil.HashCommon;
import me.cortex.voxy.common.Logger;
import me.cortex.voxy.common.util.Pair;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.IntConsumer;
import java.util.function.Supplier;
public class ServiceManager {
private static final class ThreadCtx {
int shiftFactor = 0;
long seed;//Random seed used for selecting service
ThreadCtx() {
this.seed = HashCommon.murmurHash3(System.nanoTime()^System.identityHashCode(this));
}
long rand(long size) {
return (this.seed = HashCommon.mix(this.seed))%size;
}
}
private final IntConsumer jobRelease;
private final ThreadLocal<ThreadCtx> accelerationContext = ThreadLocal.withInitial(ThreadCtx::new);
private final AtomicInteger totalJobs = new AtomicInteger();
private volatile Service[] services = new Service[0];
private volatile boolean isShutdown = false;
public ServiceManager(IntConsumer jobRelease) {
this.jobRelease = jobRelease;
}
public Service createServiceNoCleanup(Supplier<Runnable> ctxFactory, long weight) {
return this.createService(()->new Pair<>(ctxFactory.get(), ()->{}), weight, "");
}
public Service createServiceNoCleanup(Supplier<Runnable> ctxFactory, long weight, String name) {
return this.createService(()->new Pair<>(ctxFactory.get(), ()->{}), weight, name);
}
public Service createService(Supplier<Pair<Runnable, Runnable>> ctxFactory, long weight) {
return this.createService(ctxFactory, weight, "");
}
public Service createService(Supplier<Pair<Runnable, Runnable>> ctxFactory, long weight, String name) {
return this.createService(ctxFactory, weight, name, null);
}
public synchronized Service createService(Supplier<Pair<Runnable, Runnable>> ctxFactory, long weight, String name, BooleanSupplier limiter) {
Service newService = new Service(ctxFactory, this, weight, name, limiter);
var newServices = Arrays.copyOf(this.services, this.services.length+1);
newServices[newServices.length-1] = newService;
this.services = newServices;
return newService;
}
public boolean runAJob() {//Executes a single job on the current thread
while (true) {
if (this.services.length == 0) return false;
if (this.runAJob0()) return true;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private boolean runAJob0() {//Executes a single job on the current thread
if (this.services.length == 0) return false;
var ctx = this.accelerationContext.get();
outer:
while (true) {
var services = this.services;//Capture the current services array
if (services.length == 0) return false;
if (this.totalJobs.get()==0) return false;
long totalWeight = 0;
int shiftFactor = (ctx.shiftFactor++)&Integer.MAX_VALUE;//We cycle and shift the starting service when choosing to prevent bias
int c = shiftFactor;
Service selectedService = null;
for (var service:services) {
if (!service.isLive()) {
Thread.yield();
continue outer;//We need to refetch the array and start over
}
boolean sc = c--<=0;
if (service.limiter!=null && !service.limiter.getAsBoolean()) continue;
long jc = service.numJobs();
if (sc&&jc!=0&&selectedService==null) selectedService=service;
totalWeight += jc * service.weight;
}
if (totalWeight == 0) return false;
long sample = ctx.rand(totalWeight);//Random number
for (int i = 0; i < services.length; i++) {
var service = services[(i+shiftFactor)%services.length];
if (service.limiter!=null && !service.limiter.getAsBoolean()) continue;
sample -= service.numJobs() * service.weight;
if (sample<=0) {
selectedService = service;
break;
}
}
if (selectedService == null) {
return false;
}
if (!selectedService.isLive()) {
continue;//Failed to select a live service, try again
}
if (!selectedService.runJob()) {
//We failed to run the service, try again
continue;
}
if (this.totalJobs.decrementAndGet() < 0) {
throw new IllegalStateException("Job count <0");
}
break;
}
return true;
}
public void shutdown() {
if (this.isShutdown) {
throw new IllegalStateException("Service manager already shutdown");
}
this.isShutdown = true;
while (this.services.length != 0) {
Thread.yield();
synchronized (this) {
for (var s : this.services) {
if (s.isLive()) {
throw new IllegalStateException("Service '" + s.name + "' was not in shutdown when manager shutdown");
}
}
}
}
while (this.totalJobs.get()!=0) {
Thread.yield();
}
}
synchronized void removeService(Service service) {
var services = this.services;
var newServices = new Service[services.length-1];
int j = 0;
for (int i = 0; i < services.length; i++) {
if (services[i] != service) {
newServices[j++] = services[i];
}
}
if (j != newServices.length) {
throw new IllegalStateException("Could not find the service in the services array");
}
this.services = newServices;
}
void execute(Service service) {
this.totalJobs.incrementAndGet();
this.jobRelease.accept(1);
}
void remJobs(int remaining) {
if (this.totalJobs.addAndGet(-remaining)<0) {
throw new IllegalStateException("total jobs <0");
}
}
void handleException(Service service, Exception exception) {
Logger.error("Service '"+service.name+"' on thread '"+Thread.currentThread().getName()+"' had an exception", exception);
}
}

View File

@@ -0,0 +1,121 @@
package me.cortex.voxy.common.thread3;
import me.cortex.voxy.common.util.Pair;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class UnifiedServiceThreadPool {
public final ServiceManager serviceManager;
private final MultiThreadPrioritySemaphore groupSemaphore;
private final MultiThreadPrioritySemaphore.Block selfBlock;
private final ThreadGroup dedicatedPool;
private final List<Thread> threads = new ArrayList<>();
public UnifiedServiceThreadPool() {
this.dedicatedPool = new ThreadGroup("Voxy Dedicated Service");
this.serviceManager = new ServiceManager(this::release);
this.groupSemaphore = new MultiThreadPrioritySemaphore(this.serviceManager::runAJob);
this.selfBlock = this.groupSemaphore.createBlock();
}
private final void release(int i) {this.groupSemaphore.pooledRelease(i);}
public void setNumThreads(int threads) {
synchronized (this.threads) {
int diff = threads - this.threads.size();
if (diff==0) return;//Already correct
if (diff<0) {//Remove threads
this.selfBlock.release(-diff);
} else {//Add threads
for (int i = 0; i < diff; i++) {
var t = new Thread(this.dedicatedPool, this::workerThread);
t.setDaemon(true);
this.threads.add(t);
t.start();
}
}
}
while (true) {
synchronized (this.threads) {
if (this.threads.size() == threads) return;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private void workerThread() {
this.selfBlock.acquire();//This is stupid but it works
//We are exiting, remove self from list of threads
synchronized (this.threads) {
this.threads.remove(Thread.currentThread());
}
}
public void shutdown() {
this.serviceManager.shutdown();
this.selfBlock.release(10000);
while (true) {
synchronized (this.threads) {
if (this.threads.isEmpty()) {
break;
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
this.selfBlock.free();
}
public static void main(String[] args) {
var ustp = new UnifiedServiceThreadPool();
AtomicInteger cc = new AtomicInteger();
AtomicInteger cnt = new AtomicInteger();
var s1 = ustp.serviceManager.createService(()->{
AtomicBoolean cleaned = new AtomicBoolean();
AtomicInteger a = new AtomicInteger();
return new Pair<>(()->{
if (cleaned.get()) {
System.err.println("TRIED EXECUTING CLEANED CTX");
} else {
a.incrementAndGet();
cnt.incrementAndGet();
}
}, ()->{
if (cleaned.getAndSet(true)) {
System.err.println("TRIED DOUBLE CLEANING A VALUE");
} else {
System.out.println("Cleaned ref, exec: " + a.get());
cc.incrementAndGet();
}
});
}, 1);
for (int i = 0; i < 1000; i++) {
s1.execute();
}
ustp.setNumThreads(1);
ustp.setNumThreads(10);
ustp.setNumThreads(0);
ustp.setNumThreads(1);
s1.blockTillEmpty();
s1.shutdown();
ustp.shutdown();
System.out.println(cnt);
}
}

View File

@@ -0,0 +1,131 @@
package me.cortex.voxy.common.thread3;
import it.unimi.dsi.fastutil.HashCommon;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
public class WeakConcurrentCleanableHashMap<K extends LongSupplier, V> {
//TODO could move to a Cleanable style system possibly?
private final Consumer<V> valueCleaner;
private final ReferenceQueue<K> cleanupQueue = new ReferenceQueue<>();
private final ReentrantLock k2iLock = new ReentrantLock();
private final Object2LongOpenHashMap<WeakReference<K>> k2i = new Object2LongOpenHashMap<>();
{
this.k2i.defaultReturnValue(-1);
}
private final Long2ObjectOpenHashMap<V>[] i2v = new Long2ObjectOpenHashMap[1<<4];
private final ReentrantLock[] i2vLocks = new ReentrantLock[this.i2v.length];
{
for (int i = 0; i < this.i2v.length; i++) {
this.i2v[i] = new Long2ObjectOpenHashMap<>();
this.i2vLocks[i] = new ReentrantLock();
}
}
private final AtomicInteger count = new AtomicInteger();
public WeakConcurrentCleanableHashMap(Consumer<V> cleanupConsumer) {
this.valueCleaner = cleanupConsumer;
}
private static int Id2Seg(long id, int MSK) {
return HashCommon.mix((int)id)&MSK;
}
public V computeIfAbsent(K key, Supplier<V> valueOnAbsent) {
this.cleanup();
long id = key.getAsLong();
int bucket = Id2Seg(id, this.i2v.length-1);
var i2v = this.i2v[bucket];
var lock = this.i2vLocks[bucket];
lock.lock();
if (i2v.containsKey(id)) {
lock.unlock();
return i2v.get(id);
} else {
var v = valueOnAbsent.get();
i2v.put(id, v);
this.k2iLock.lock();
lock.unlock();
this.k2i.put(new WeakReference<>(key, this.cleanupQueue), id);
this.k2iLock.unlock();
this.count.incrementAndGet();
return v;
}
}
public void cleanup() {
WeakReference<K> ref = (WeakReference<K>) this.cleanupQueue.poll();
if (ref != null) {
LongArrayFIFOQueue ids = new LongArrayFIFOQueue();
this.k2iLock.lock();
do {
long id = this.k2i.removeLong(ref);
if (id < 0) continue;
ids.enqueue(id);
} while ((ref = (WeakReference<K>) this.cleanupQueue.poll()) != null);
this.k2iLock.unlock();
if (ids.isEmpty()) return;
int count = ids.size();
while (!ids.isEmpty()) {
long id = ids.dequeueLong();
int bucket = Id2Seg(id, this.i2v.length - 1);
var lock = this.i2vLocks[bucket];
lock.lock();
var val = this.i2v[bucket].remove(id);
lock.unlock();
if (val != null) {
this.valueCleaner.accept(val);
} else {
count--;
}
}
if (this.count.addAndGet(-count)<0) {
throw new IllegalStateException();
}
}
}
public List<V> clear() {
this.cleanup();
List<V> values = new ArrayList<>(this.size());
//lock everything
for (var lock : this.i2vLocks) {
lock.lock();
}
this.k2iLock.lock();
this.k2i.clear();//Clear here while its safe to do so
for (var i2v : this.i2v) {
values.addAll(i2v.values());
i2v.clear();
}
this.count.set(0);
this.k2iLock.unlock();
for (var lock : this.i2vLocks) {
lock.unlock();
}
return values;
}
public int size() {
return this.count.get();
}
}

View File

@@ -3,6 +3,8 @@ package me.cortex.voxy.common.world.service;
import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.Logger;
import me.cortex.voxy.common.thread.ServiceSlice; import me.cortex.voxy.common.thread.ServiceSlice;
import me.cortex.voxy.common.thread.ServiceThreadPool; import me.cortex.voxy.common.thread.ServiceThreadPool;
import me.cortex.voxy.common.thread3.Service;
import me.cortex.voxy.common.thread3.ServiceManager;
import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.WorldEngine;
import me.cortex.voxy.common.world.WorldSection; import me.cortex.voxy.common.world.WorldSection;
@@ -12,12 +14,12 @@ import java.util.concurrent.ConcurrentLinkedDeque;
// save to the db, this can be useful for just reducing the amount of thread pools in total // save to the db, this can be useful for just reducing the amount of thread pools in total
// might have some issues with threading if the same section is saved from multiple threads? // might have some issues with threading if the same section is saved from multiple threads?
public class SectionSavingService { public class SectionSavingService {
private final ServiceSlice threads; private final Service service;
private record SaveEntry(WorldEngine engine, WorldSection section) {} private record SaveEntry(WorldEngine engine, WorldSection section) {}
private final ConcurrentLinkedDeque<SaveEntry> saveQueue = new ConcurrentLinkedDeque<>(); private final ConcurrentLinkedDeque<SaveEntry> saveQueue = new ConcurrentLinkedDeque<>();
public SectionSavingService(ServiceThreadPool threadPool) { public SectionSavingService(ServiceManager sm) {
this.threads = threadPool.createServiceNoCleanup("Section saving service", 100, () -> this::processJob); this.service = sm.createServiceNoCleanup(() -> this::processJob, 100, "Section saving service");
} }
private void processJob() { private void processJob() {
@@ -58,8 +60,8 @@ public class SectionSavingService {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
//If we are still full, process entries in the queue ourselves instead of waiting for the service //If we are still full, process entries in the queue ourselves instead of waiting for the service
while (this.getTaskCount() > 5_000 && this.threads.isAlive()) { while (this.getTaskCount() > 5_000 && this.service.isLive()) {
if (!this.threads.steal()) { if (!this.service.steal()) {
break; break;
} }
this.processJob(); this.processJob();
@@ -67,18 +69,16 @@ public class SectionSavingService {
} }
this.saveQueue.add(new SaveEntry(in, section)); this.saveQueue.add(new SaveEntry(in, section));
this.threads.execute(); this.service.execute();
} }
} }
public void shutdown() { public void shutdown() {
if (this.threads.getJobCount() != 0) { if (this.service.numJobs() != 0) {
Logger.error("Voxy section saving still in progress, estimated " + this.threads.getJobCount() + " sections remaining."); Logger.error("Voxy section saving still in progress, estimated " + this.service.numJobs() + " sections remaining.");
while (this.threads.getJobCount() != 0) { this.service.blockTillEmpty();
Thread.onSpinWait();
} }
} this.service.shutdown();
this.threads.shutdown();
//Manually save any remaining entries //Manually save any remaining entries
while (!this.saveQueue.isEmpty()) { while (!this.saveQueue.isEmpty()) {
this.processJob(); this.processJob();
@@ -86,6 +86,6 @@ public class SectionSavingService {
} }
public int getTaskCount() { public int getTaskCount() {
return this.threads.getJobCount(); return this.service.numJobs();
} }
} }

View File

@@ -1,8 +1,8 @@
package me.cortex.voxy.common.world.service; package me.cortex.voxy.common.world.service;
import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.Logger;
import me.cortex.voxy.common.thread.ServiceSlice; import me.cortex.voxy.common.thread3.Service;
import me.cortex.voxy.common.thread.ServiceThreadPool; import me.cortex.voxy.common.thread3.ServiceManager;
import me.cortex.voxy.common.voxelization.ILightingSupplier; import me.cortex.voxy.common.voxelization.ILightingSupplier;
import me.cortex.voxy.common.voxelization.VoxelizedSection; import me.cortex.voxy.common.voxelization.VoxelizedSection;
import me.cortex.voxy.common.voxelization.WorldConversionFactory; import me.cortex.voxy.common.voxelization.WorldConversionFactory;
@@ -22,12 +22,12 @@ import java.util.concurrent.ConcurrentLinkedDeque;
public class VoxelIngestService { public class VoxelIngestService {
private static final ThreadLocal<VoxelizedSection> SECTION_CACHE = ThreadLocal.withInitial(VoxelizedSection::createEmpty); private static final ThreadLocal<VoxelizedSection> SECTION_CACHE = ThreadLocal.withInitial(VoxelizedSection::createEmpty);
private final ServiceSlice threads; private final Service service;
private record IngestSection(int cx, int cy, int cz, WorldEngine world, ChunkSection section, ChunkNibbleArray blockLight, ChunkNibbleArray skyLight){} private record IngestSection(int cx, int cy, int cz, WorldEngine world, ChunkSection section, ChunkNibbleArray blockLight, ChunkNibbleArray skyLight){}
private final ConcurrentLinkedDeque<IngestSection> ingestQueue = new ConcurrentLinkedDeque<>(); private final ConcurrentLinkedDeque<IngestSection> ingestQueue = new ConcurrentLinkedDeque<>();
public VoxelIngestService(ServiceThreadPool pool) { public VoxelIngestService(ServiceManager pool) {
this.threads = pool.createServiceNoCleanup("Ingest service", 5000, ()-> this::processJob); this.service = pool.createServiceNoCleanup(()->this::processJob, 5000, "Ingest service");
} }
private void processJob() { private void processJob() {
@@ -86,7 +86,7 @@ public class VoxelIngestService {
} }
public boolean enqueueIngest(WorldEngine engine, WorldChunk chunk) { public boolean enqueueIngest(WorldEngine engine, WorldChunk chunk) {
if (!this.threads.isAlive()) { if (!this.service.isLive()) {
return false; return false;
} }
if (!engine.isLive()) { if (!engine.isLive()) {
@@ -116,7 +116,7 @@ public class VoxelIngestService {
if (section == null || !shouldIngestSection(section, chunk.getPos().x, i, chunk.getPos().z)) continue; if (section == null || !shouldIngestSection(section, chunk.getPos().x, i, chunk.getPos().z)) continue;
this.ingestQueue.add(new IngestSection(chunk.getPos().x, i, chunk.getPos().z, engine, section, null, null)); this.ingestQueue.add(new IngestSection(chunk.getPos().x, i, chunk.getPos().z, engine, section, null, null));
try { try {
this.threads.execute(); this.service.execute();
} catch (Exception e) { } catch (Exception e) {
Logger.error("Executing had an error: assume shutting down, aborting",e); Logger.error("Executing had an error: assume shutting down, aborting",e);
break; break;
@@ -156,7 +156,7 @@ public class VoxelIngestService {
this.ingestQueue.add(new IngestSection(chunk.getPos().x, i, chunk.getPos().z, engine, section, bl, sl));//TODO: fixme, this is technically not safe todo on the chunk load ingest, we need to copy the section data so it cant be modified while being read this.ingestQueue.add(new IngestSection(chunk.getPos().x, i, chunk.getPos().z, engine, section, bl, sl));//TODO: fixme, this is technically not safe todo on the chunk load ingest, we need to copy the section data so it cant be modified while being read
try { try {
this.threads.execute(); this.service.execute();
} catch (Exception e) { } catch (Exception e) {
Logger.error("Executing had an error: assume shutting down, aborting",e); Logger.error("Executing had an error: assume shutting down, aborting",e);
break; break;
@@ -166,11 +166,11 @@ public class VoxelIngestService {
} }
public int getTaskCount() { public int getTaskCount() {
return this.threads.getJobCount(); return this.service.numJobs();
} }
public void shutdown() { public void shutdown() {
this.threads.shutdown(); this.service.shutdown();
} }
//Utility method to ingest a chunk into the given WorldIdentifier or world //Utility method to ingest a chunk into the given WorldIdentifier or world
@@ -192,7 +192,7 @@ public class VoxelIngestService {
private boolean rawIngest0(WorldEngine engine, ChunkSection section, int x, int y, int z, ChunkNibbleArray bl, ChunkNibbleArray sl) { private boolean rawIngest0(WorldEngine engine, ChunkSection section, int x, int y, int z, ChunkNibbleArray bl, ChunkNibbleArray sl) {
this.ingestQueue.add(new IngestSection(x, y, z, engine, section, bl, sl)); this.ingestQueue.add(new IngestSection(x, y, z, engine, section, bl, sl));
try { try {
this.threads.execute(); this.service.execute();
return true; return true;
} catch (Exception e) { } catch (Exception e) {
Logger.error("Executing had an error: assume shutting down, aborting",e); Logger.error("Executing had an error: assume shutting down, aborting",e);

View File

@@ -3,6 +3,8 @@ package me.cortex.voxy.commonImpl;
import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.Logger;
import me.cortex.voxy.common.config.section.SectionStorage; import me.cortex.voxy.common.config.section.SectionStorage;
import me.cortex.voxy.common.thread.ServiceThreadPool; import me.cortex.voxy.common.thread.ServiceThreadPool;
import me.cortex.voxy.common.thread3.ServiceManager;
import me.cortex.voxy.common.thread3.UnifiedServiceThreadPool;
import me.cortex.voxy.common.util.MemoryBuffer; import me.cortex.voxy.common.util.MemoryBuffer;
import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.WorldEngine;
import me.cortex.voxy.common.world.service.SectionSavingService; import me.cortex.voxy.common.world.service.SectionSavingService;
@@ -21,7 +23,7 @@ public abstract class VoxyInstance {
private volatile boolean isRunning = true; private volatile boolean isRunning = true;
private final Thread worldCleaner; private final Thread worldCleaner;
public final BooleanSupplier savingServiceRateLimiter;//Can run if this returns true public final BooleanSupplier savingServiceRateLimiter;//Can run if this returns true
protected final ServiceThreadPool threadPool; protected final UnifiedServiceThreadPool threadPool;
protected final SectionSavingService savingService; protected final SectionSavingService savingService;
protected final VoxelIngestService ingestService; protected final VoxelIngestService ingestService;
@@ -30,11 +32,11 @@ public abstract class VoxyInstance {
protected final ImportManager importManager; protected final ImportManager importManager;
public VoxyInstance(int threadCount) { public VoxyInstance() {
Logger.info("Initializing voxy instance"); Logger.info("Initializing voxy instance");
this.threadPool = new ServiceThreadPool(threadCount); this.threadPool = new UnifiedServiceThreadPool();
this.savingService = new SectionSavingService(this.threadPool); this.savingService = new SectionSavingService(this.getServiceManager());
this.ingestService = new VoxelIngestService(this.threadPool); this.ingestService = new VoxelIngestService(this.getServiceManager());
this.importManager = this.createImportManager(); this.importManager = this.createImportManager();
this.savingServiceRateLimiter = ()->this.savingService.getTaskCount()<1200; this.savingServiceRateLimiter = ()->this.savingService.getTaskCount()<1200;
this.worldCleaner = new Thread(()->{ this.worldCleaner = new Thread(()->{
@@ -60,8 +62,8 @@ public abstract class VoxyInstance {
return new ImportManager(); return new ImportManager();
} }
public ServiceThreadPool getThreadPool() { public ServiceManager getServiceManager() {
return this.threadPool; return this.threadPool.serviceManager;
} }
public VoxelIngestService getIngestService() { public VoxelIngestService getIngestService() {
return this.ingestService; return this.ingestService;

View File

@@ -2,7 +2,8 @@ package me.cortex.voxy.commonImpl.importers;
import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.Logger;
import me.cortex.voxy.common.thread.ServiceSlice; import me.cortex.voxy.common.thread.ServiceSlice;
import me.cortex.voxy.common.thread.ServiceThreadPool; import me.cortex.voxy.common.thread3.Service;
import me.cortex.voxy.common.thread3.ServiceManager;
import me.cortex.voxy.common.util.Pair; import me.cortex.voxy.common.util.Pair;
import me.cortex.voxy.common.voxelization.VoxelizedSection; import me.cortex.voxy.common.voxelization.VoxelizedSection;
import me.cortex.voxy.common.voxelization.WorldConversionFactory; import me.cortex.voxy.common.voxelization.WorldConversionFactory;
@@ -43,7 +44,7 @@ import java.util.function.BooleanSupplier;
public class DHImporter implements IDataImporter { public class DHImporter implements IDataImporter {
private final Connection db; private final Connection db;
private final WorldEngine engine; private final WorldEngine engine;
private final ServiceSlice threadPool; private final Service service;
private final World world; private final World world;
private final int bottomOfWorld; private final int bottomOfWorld;
private final int worldHeightSections; private final int worldHeightSections;
@@ -68,7 +69,7 @@ public class DHImporter implements IDataImporter {
} }
} }
public DHImporter(File file, WorldEngine worldEngine, World mcWorld, ServiceThreadPool servicePool, BooleanSupplier rateLimiter) { public DHImporter(File file, WorldEngine worldEngine, World mcWorld, ServiceManager servicePool, BooleanSupplier rateLimiter) {
this.engine = worldEngine; this.engine = worldEngine;
this.world = mcWorld; this.world = mcWorld;
this.biomeRegistry = mcWorld.getRegistryManager().getOrThrow(RegistryKeys.BIOME); this.biomeRegistry = mcWorld.getRegistryManager().getOrThrow(RegistryKeys.BIOME);
@@ -85,7 +86,7 @@ public class DHImporter implements IDataImporter {
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.threadPool = servicePool.createService("DH Importer", 1, ()->{ this.service = servicePool.createService(()->{
try { try {
var dataFetchStmt = this.db.prepareStatement("SELECT Data,ColumnGenerationStep,Mapping FROM FullData WHERE DetailLevel = 0 AND PosX = ? AND PosZ = ?;"); var dataFetchStmt = this.db.prepareStatement("SELECT Data,ColumnGenerationStep,Mapping FROM FullData WHERE DetailLevel = 0 AND PosX = ? AND PosZ = ?;");
var ctx = new WorkCTX(dataFetchStmt, this.worldHeightSections*16); var ctx = new WorkCTX(dataFetchStmt, this.worldHeightSections*16);
@@ -101,7 +102,7 @@ public class DHImporter implements IDataImporter {
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}, rateLimiter); }, 10, "DH Importer", rateLimiter);
} }
public void runImport(IUpdateCallback updateCallback, ICompletionCallback completionCallback) { public void runImport(IUpdateCallback updateCallback, ICompletionCallback completionCallback) {
@@ -139,7 +140,7 @@ public class DHImporter implements IDataImporter {
while (this.isRunning&&!taskQ.isEmpty()) { while (this.isRunning&&!taskQ.isEmpty()) {
this.tasks.add(taskQ.poll()); this.tasks.add(taskQ.poll());
this.threadPool.execute(); this.service.execute();
while (this.tasks.size() > 100 && this.isRunning) { while (this.tasks.size() > 100 && this.isRunning) {
try { try {
@@ -369,7 +370,7 @@ public class DHImporter implements IDataImporter {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.threadPool.shutdown(); this.service.shutdown();
this.engine.releaseRef(); this.engine.releaseRef();
try { try {
this.db.close(); this.db.close();

View File

@@ -3,23 +3,22 @@ package me.cortex.voxy.commonImpl.importers;
import com.mojang.serialization.Codec; import com.mojang.serialization.Codec;
import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.Logger;
import me.cortex.voxy.common.thread.ServiceSlice; import me.cortex.voxy.common.thread.ServiceSlice;
import me.cortex.voxy.common.thread.ServiceThreadPool; import me.cortex.voxy.common.thread3.Service;
import me.cortex.voxy.common.thread3.ServiceManager;
import me.cortex.voxy.common.util.MemoryBuffer; import me.cortex.voxy.common.util.MemoryBuffer;
import me.cortex.voxy.common.util.Pair;
import me.cortex.voxy.common.util.UnsafeUtil; import me.cortex.voxy.common.util.UnsafeUtil;
import me.cortex.voxy.common.voxelization.VoxelizedSection; import me.cortex.voxy.common.voxelization.VoxelizedSection;
import me.cortex.voxy.common.voxelization.WorldConversionFactory; import me.cortex.voxy.common.voxelization.WorldConversionFactory;
import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.WorldEngine;
import me.cortex.voxy.common.world.WorldUpdater; import me.cortex.voxy.common.world.WorldUpdater;
import net.minecraft.block.Block;
import net.minecraft.block.BlockState; import net.minecraft.block.BlockState;
import net.minecraft.block.Blocks;
import net.minecraft.nbt.NbtCompound; import net.minecraft.nbt.NbtCompound;
import net.minecraft.nbt.NbtIo; import net.minecraft.nbt.NbtIo;
import net.minecraft.nbt.NbtOps; import net.minecraft.nbt.NbtOps;
import net.minecraft.network.PacketByteBuf; import net.minecraft.network.PacketByteBuf;
import net.minecraft.registry.RegistryKeys; import net.minecraft.registry.RegistryKeys;
import net.minecraft.registry.entry.RegistryEntry; import net.minecraft.registry.entry.RegistryEntry;
import net.minecraft.util.collection.IndexedIterable;
import net.minecraft.world.World; import net.minecraft.world.World;
import net.minecraft.world.biome.Biome; import net.minecraft.world.biome.Biome;
import net.minecraft.world.biome.BiomeKeys; import net.minecraft.world.biome.BiomeKeys;
@@ -39,6 +38,7 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier; import java.util.function.BooleanSupplier;
import java.util.function.Consumer; import java.util.function.Consumer;
@@ -54,13 +54,13 @@ public class WorldImporter implements IDataImporter {
private final AtomicInteger chunksProcessed = new AtomicInteger(); private final AtomicInteger chunksProcessed = new AtomicInteger();
private final ConcurrentLinkedDeque<Runnable> jobQueue = new ConcurrentLinkedDeque<>(); private final ConcurrentLinkedDeque<Runnable> jobQueue = new ConcurrentLinkedDeque<>();
private final ServiceSlice threadPool; private final Service service;
private volatile boolean isRunning; private volatile boolean isRunning;
public WorldImporter(WorldEngine worldEngine, World mcWorld, ServiceThreadPool servicePool, BooleanSupplier runChecker) { public WorldImporter(WorldEngine worldEngine, World mcWorld, ServiceManager sm, BooleanSupplier runChecker) {
this.world = worldEngine; this.world = worldEngine;
this.threadPool = servicePool.createServiceNoCleanup("World importer", 3, ()->()->this.jobQueue.poll().run(), runChecker); this.service = sm.createService(()->new Pair<>(()->this.jobQueue.poll().run(), ()->{}), 3, "World importer", runChecker);
var biomeRegistry = mcWorld.getRegistryManager().getOrThrow(RegistryKeys.BIOME); var biomeRegistry = mcWorld.getRegistryManager().getOrThrow(RegistryKeys.BIOME);
var defaultBiome = biomeRegistry.getOrThrow(BiomeKeys.PLAINS); var defaultBiome = biomeRegistry.getOrThrow(BiomeKeys.PLAINS);
@@ -143,7 +143,11 @@ public class WorldImporter implements IDataImporter {
return this.world; return this.world;
} }
private final AtomicBoolean isShutdown = new AtomicBoolean();
public void shutdown() { public void shutdown() {
if (this.isShutdown.getAndSet(true)) {
return;
}
this.isRunning = false; this.isRunning = false;
if (this.worker != null) { if (this.worker != null) {
try { try {
@@ -152,9 +156,9 @@ public class WorldImporter implements IDataImporter {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
if (!this.threadPool.isFreed()) { if (this.service.isLive()) {
this.world.releaseRef(); this.world.releaseRef();
this.threadPool.shutdown(); this.service.shutdown();
} }
//Free all the remaining entries by running the lambda //Free all the remaining entries by running the lambda
while (!this.jobQueue.isEmpty()) { while (!this.jobQueue.isEmpty()) {
@@ -254,13 +258,13 @@ public class WorldImporter implements IDataImporter {
} }
} }
if (!this.isRunning) { if (!this.isRunning) {
this.threadPool.blockTillEmpty(); this.service.blockTillEmpty();
this.completionCallback.onCompletion(this.totalChunks.get()); this.completionCallback.onCompletion(this.totalChunks.get());
this.worker = null; this.worker = null;
return; return;
} }
} }
this.threadPool.blockTillEmpty(); this.service.blockTillEmpty();
while (this.chunksProcessed.get() != this.totalChunks.get() && this.isRunning) { while (this.chunksProcessed.get() != this.totalChunks.get() && this.isRunning) {
Thread.yield(); Thread.yield();
try { try {
@@ -269,9 +273,11 @@ public class WorldImporter implements IDataImporter {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
if (!this.isShutdown.getAndSet(true)) {
this.worker = null; this.worker = null;
this.service.shutdown();
this.world.releaseRef(); this.world.releaseRef();
this.threadPool.shutdown(); }
this.completionCallback.onCompletion(this.totalChunks.get()); this.completionCallback.onCompletion(this.totalChunks.get());
}); });
this.worker.setName("World importer"); this.worker.setName("World importer");
@@ -386,7 +392,7 @@ public class WorldImporter implements IDataImporter {
}); });
this.totalChunks.incrementAndGet(); this.totalChunks.incrementAndGet();
this.estimatedTotalChunks.incrementAndGet(); this.estimatedTotalChunks.incrementAndGet();
this.threadPool.execute(); this.service.execute();
} }
} }
} }