diff --git a/src/main/java/me/cortex/voxy/client/VoxyClientInstance.java b/src/main/java/me/cortex/voxy/client/VoxyClientInstance.java index 52d74226..2cfe7f5a 100644 --- a/src/main/java/me/cortex/voxy/client/VoxyClientInstance.java +++ b/src/main/java/me/cortex/voxy/client/VoxyClientInstance.java @@ -27,7 +27,10 @@ public class VoxyClientInstance extends VoxyInstance { private final Path basePath; private final boolean noIngestOverride; public VoxyClientInstance() { - super(VoxyConfig.CONFIG.serviceThreads); + super(); + + this.threadPool.setNumThreads(VoxyConfig.CONFIG.serviceThreads); + var path = FlashbackCompat.getReplayStoragePath(); this.noIngestOverride = path != null; if (path == null) { diff --git a/src/main/java/me/cortex/voxy/client/VoxyCommands.java b/src/main/java/me/cortex/voxy/client/VoxyCommands.java index 5f033c23..9bd7da88 100644 --- a/src/main/java/me/cortex/voxy/client/VoxyCommands.java +++ b/src/main/java/me/cortex/voxy/client/VoxyCommands.java @@ -103,7 +103,7 @@ public class VoxyCommands { var engine = WorldIdentifier.ofEngine(MinecraftClient.getInstance().world); if (engine==null)return 1; return instance.getImportManager().makeAndRunIfNone(engine, ()-> - new DHImporter(dbFile_, engine, MinecraftClient.getInstance().world, instance.getThreadPool(), instance.savingServiceRateLimiter))?0:1; + new DHImporter(dbFile_, engine, MinecraftClient.getInstance().world, instance.getServiceManager(), instance.savingServiceRateLimiter))?0:1; } private static boolean fileBasedImporter(File directory) { @@ -115,7 +115,7 @@ public class VoxyCommands { var engine = WorldIdentifier.ofEngine(MinecraftClient.getInstance().world); if (engine==null) return false; return instance.getImportManager().makeAndRunIfNone(engine, ()->{ - var importer = new WorldImporter(engine, MinecraftClient.getInstance().world, instance.getThreadPool(), instance.savingServiceRateLimiter); + var importer = new WorldImporter(engine, MinecraftClient.getInstance().world, instance.getServiceManager(), instance.savingServiceRateLimiter); importer.importRegionDirectoryAsync(directory); return importer; }); @@ -224,7 +224,7 @@ public class VoxyCommands { var engine = WorldIdentifier.ofEngine(MinecraftClient.getInstance().world); if (engine != null) { return instance.getImportManager().makeAndRunIfNone(engine, () -> { - var importer = new WorldImporter(engine, MinecraftClient.getInstance().world, instance.getThreadPool(), instance.savingServiceRateLimiter); + var importer = new WorldImporter(engine, MinecraftClient.getInstance().world, instance.getServiceManager(), instance.savingServiceRateLimiter); importer.importZippedRegionDirectoryAsync(zip, finalInnerDir); return importer; }) ? 0 : 1; diff --git a/src/main/java/me/cortex/voxy/client/core/VoxyRenderSystem.java b/src/main/java/me/cortex/voxy/client/core/VoxyRenderSystem.java index 98b81fc8..831b9391 100644 --- a/src/main/java/me/cortex/voxy/client/core/VoxyRenderSystem.java +++ b/src/main/java/me/cortex/voxy/client/core/VoxyRenderSystem.java @@ -29,6 +29,7 @@ import me.cortex.voxy.client.core.util.GPUTiming; import me.cortex.voxy.client.core.util.IrisUtil; import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.thread.ServiceThreadPool; +import me.cortex.voxy.common.thread3.ServiceManager; import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.commonImpl.VoxyCommon; import net.caffeinemc.mods.sodium.client.render.chunk.ChunkRenderMatrices; @@ -73,7 +74,7 @@ public class VoxyRenderSystem { return new MDICSectionRenderer(pipeline, modelStore, (BasicSectionGeometryData) geometryData);//We only have MDIC backend... for now } - public VoxyRenderSystem(WorldEngine world, ServiceThreadPool threadPool) { + public VoxyRenderSystem(WorldEngine world, ServiceManager sm) { //Keep the world loaded, NOTE: this is done FIRST, to keep and ensure that even if the rest of loading takes more // than timeout, we keep the world acquired world.acquireRef(); @@ -96,7 +97,7 @@ public class VoxyRenderSystem { this.modelService = new ModelBakerySubsystem(world.getMapper()); - this.renderGen = new RenderGenerationService(world, this.modelService, threadPool, false, () -> true); + this.renderGen = new RenderGenerationService(world, this.modelService, sm, false); this.geometryData = new BasicSectionGeometryData(1 << 20, geometryCapacity); diff --git a/src/main/java/me/cortex/voxy/client/core/rendering/building/RenderGenerationService.java b/src/main/java/me/cortex/voxy/client/core/rendering/building/RenderGenerationService.java index be55a658..8fc03125 100644 --- a/src/main/java/me/cortex/voxy/client/core/rendering/building/RenderGenerationService.java +++ b/src/main/java/me/cortex/voxy/client/core/rendering/building/RenderGenerationService.java @@ -5,7 +5,8 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import me.cortex.voxy.client.core.model.IdNotYetComputedException; import me.cortex.voxy.client.core.model.ModelBakerySubsystem; import me.cortex.voxy.common.thread.ServiceSlice; -import me.cortex.voxy.common.thread.ServiceThreadPool; +import me.cortex.voxy.common.thread3.Service; +import me.cortex.voxy.common.thread3.ServiceManager; import me.cortex.voxy.common.util.Pair; import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.WorldSection; @@ -15,7 +16,6 @@ import java.util.List; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.StampedLock; -import java.util.function.BooleanSupplier; import java.util.function.Consumer; //TODO: Add a render cache @@ -60,26 +60,27 @@ public class RenderGenerationService { private Consumer resultConsumer; private final boolean emitMeshlets; - private final ServiceSlice threads; + private final Service service; - public RenderGenerationService(WorldEngine world, ModelBakerySubsystem modelBakery, ServiceThreadPool serviceThreadPool, boolean emitMeshlets) { - this(world, modelBakery, serviceThreadPool, emitMeshlets, ()->true); - } + /* + public RenderGenerationService(WorldEngine world, ModelBakerySubsystem modelBakery, ServiceManager sm, boolean emitMeshlets) { + this(world, modelBakery, sm, emitMeshlets, ()->true); + }*/ - public RenderGenerationService(WorldEngine world, ModelBakerySubsystem modelBakery, ServiceThreadPool serviceThreadPool, boolean emitMeshlets, BooleanSupplier taskLimiter) { + public RenderGenerationService(WorldEngine world, ModelBakerySubsystem modelBakery, ServiceManager sm, boolean emitMeshlets) { this.emitMeshlets = emitMeshlets; this.world = world; this.modelBakery = modelBakery; - this.threads = serviceThreadPool.createService("Section mesh generation service", 100, ()->{ + this.service = sm.createService(()->{ //Thread local instance of the factory var factory = new RenderDataFactory(this.world, this.modelBakery.factory, this.emitMeshlets); IntOpenHashSet seenMissed = new IntOpenHashSet(128); return new Pair<>(() -> { this.processJob(factory, seenMissed); }, factory::free); - }, taskLimiter); + }, 100, "Section mesh generation service"); } public void setResultConsumer(Consumer consumer) { @@ -258,8 +259,8 @@ public class RenderGenerationService { this.taskQueue.add(task); this.taskQueueCount.incrementAndGet(); - if (this.threads.isAlive()) {//Only execute if were not dead - this.threads.execute();//Since we put in queue, release permit + if (this.service.isLive()) {//Only execute if were not dead + this.service.execute();//Since we put in queue, release permit } } } @@ -282,7 +283,7 @@ public class RenderGenerationService { public void enqueueTask(long pos) { - if (!this.threads.isAlive()) { + if (!this.service.isLive()) { return; } boolean[] isOurs = new boolean[1]; @@ -298,7 +299,7 @@ public class RenderGenerationService { task.updatePriority(); this.taskQueue.add(task); this.taskQueueCount.incrementAndGet(); - this.threads.execute(); + this.service.execute(); } } @@ -310,8 +311,8 @@ public class RenderGenerationService { public void shutdown() { //Steal and free as much work as possible - while (this.threads.hasJobs()) { - int i = this.threads.drain(); + while (this.service.numJobs() != 0) { + int i = this.service.drain(); if (i == 0) break; { long stamp = this.taskMapLock.writeLock(); @@ -331,7 +332,7 @@ public class RenderGenerationService { } //Shutdown the threads - this.threads.shutdown(); + this.service.shutdown(); //Cleanup any remaining data while (!this.taskQueue.isEmpty()) { diff --git a/src/main/java/me/cortex/voxy/client/mixin/minecraft/MixinWorldRenderer.java b/src/main/java/me/cortex/voxy/client/mixin/minecraft/MixinWorldRenderer.java index e70b5fb1..194f38e7 100644 --- a/src/main/java/me/cortex/voxy/client/mixin/minecraft/MixinWorldRenderer.java +++ b/src/main/java/me/cortex/voxy/client/mixin/minecraft/MixinWorldRenderer.java @@ -80,7 +80,7 @@ public abstract class MixinWorldRenderer implements IGetVoxyRenderSystem { return; } try { - this.renderer = new VoxyRenderSystem(world, instance.getThreadPool()); + this.renderer = new VoxyRenderSystem(world, instance.getServiceManager()); } catch (RuntimeException e) { if (IrisUtil.irisShaderPackEnabled()) { IrisUtil.disableIrisShaders(); diff --git a/src/main/java/me/cortex/voxy/common/thread3/MultiThreadPrioritySemaphore.java b/src/main/java/me/cortex/voxy/common/thread3/MultiThreadPrioritySemaphore.java new file mode 100644 index 00000000..30623247 --- /dev/null +++ b/src/main/java/me/cortex/voxy/common/thread3/MultiThreadPrioritySemaphore.java @@ -0,0 +1,100 @@ +package me.cortex.voxy.common.thread3; + +import me.cortex.voxy.common.util.TrackedObject; + +import java.lang.ref.WeakReference; +import java.util.*; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +//Basiclly acts as a priority based mutlti semaphore +// allows the pooling of multiple threadpools together while prioritizing the work the original was ment for +public class MultiThreadPrioritySemaphore { + public static final class Block extends TrackedObject { + private final Semaphore blockSemaphore = new Semaphore(0);//The work pool semaphore + private final Semaphore localSemaphore = new Semaphore(0);//The local semaphore + //private final AtomicInteger debt = new AtomicInteger();//the debt of the work pool semphore with respect to the usage + private final MultiThreadPrioritySemaphore man; + + Block(MultiThreadPrioritySemaphore man) { + this.man = man; + } + + public void release(int permits) { + //release local then block to prevent race conditions + this.localSemaphore.release(permits); + this.blockSemaphore.release(permits); + } + + public void acquire() {//Block until a permit for this block is availbe, other jobs maybe executed while we wait + while (true) { + this.blockSemaphore.acquireUninterruptibly();//Block on all + if (this.localSemaphore.tryAcquire()) {//We prioritize locals first + return; + } + //It wasnt a local job so run + this.man.tryRun(this); + } + } + + + public void free() { + this.man.freeBlock(this); + this.free0(); + } + } + + private final Semaphore pooledSemaphore = new Semaphore(0); + private final Runnable executor; + + private volatile Block[] blocks = new Block[0]; + + public MultiThreadPrioritySemaphore(Runnable executor) { + this.executor = executor; + } + + public synchronized Block createBlock() { + var block = new Block(this); + var blocks = Arrays.copyOf(this.blocks, this.blocks.length+1); + blocks[blocks.length-1] = block; + this.blocks = blocks; + return block; + } + + private synchronized void freeBlock(Block block) { + var ob = this.blocks; + var blocks = new Block[ob.length-1]; + int j = 0; + for (int i = 0; i <= blocks.length; i++) { + if (ob[i] != block) { + blocks[j++] = ob[i]; + } + } + if (j != blocks.length) { + throw new IllegalStateException("Could not find the service in the services array"); + } + this.blocks = blocks; + } + + public void pooledRelease(int permits) { + this.pooledSemaphore.release(permits); + for (var block : this.blocks) { + block.blockSemaphore.release(permits); + } + } + + private boolean tryRun(Block block) { + if (!this.pooledSemaphore.tryAcquire()) {//No jobs for the unified pool + return false; + } + /* + for (var otherBlock : this.blocks) { + if (otherBlock != block) { + block.debt.incrementAndGet(); + } + }*/ + //Run the pooled job + this.executor.run(); + return true; + } +} diff --git a/src/main/java/me/cortex/voxy/common/thread3/PerThreadContextExecutor.java b/src/main/java/me/cortex/voxy/common/thread3/PerThreadContextExecutor.java new file mode 100644 index 00000000..f115cf5b --- /dev/null +++ b/src/main/java/me/cortex/voxy/common/thread3/PerThreadContextExecutor.java @@ -0,0 +1,161 @@ +package me.cortex.voxy.common.thread3; + +import me.cortex.voxy.common.Logger; +import me.cortex.voxy.common.util.Pair; +import me.cortex.voxy.common.util.TrackedObject; + +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +public class PerThreadContextExecutor extends TrackedObject { + private static final class ThreadContext { + private final Runnable execute; + private final Runnable cleanup; + + private ThreadContext(Pair wrap) { + this(wrap.left(), wrap.right()); + } + + private ThreadContext(Runnable execute, Runnable cleanup) { + this.execute = execute; + this.cleanup = cleanup; + } + } + + private static record ThreadObj(long id) implements LongSupplier { + private static final AtomicLong IDENTIFIER = new AtomicLong(); + public ThreadObj() { + this(IDENTIFIER.getAndIncrement()); + } + + @Override + public long getAsLong() { + return this.id; + } + } + + private static final ThreadLocal THREAD_CTX = ThreadLocal.withInitial(ThreadObj::new); + private final WeakConcurrentCleanableHashMap contexts = new WeakConcurrentCleanableHashMap<>(this::ctxCleaner); //TODO: a custom weak concurrent hashmap that can enqueue values when the value is purged + private final Supplier contextFactory; + private final Consumer exceptionHandler; + + private final AtomicInteger currentRunning = new AtomicInteger(); + private volatile boolean isLive = true; + + PerThreadContextExecutor(Supplier> ctxFactory) { + this(ctxFactory, (e)->{ + Logger.error("Executor had the following exception",e); + }); + } + PerThreadContextExecutor(Supplier> ctxFactory, Consumer exceptionHandler) { + this.contextFactory = ()->new ThreadContext(ctxFactory.get()); + this.exceptionHandler = exceptionHandler; + } + + private void ctxCleaner(ThreadContext ctx) { + try { + ctx.cleanup.run(); + } catch (Exception e) { + this.exceptionHandler.accept(e); + } + } + + boolean run() { + this.currentRunning.incrementAndGet(); + if (!this.isLive) { + this.currentRunning.decrementAndGet(); + this.exceptionHandler.accept(new IllegalStateException("Executor is in shutdown")); + return false; + } + var ctx = this.contexts.computeIfAbsent(THREAD_CTX.get(), this.contextFactory); + try { + ctx.execute.run(); + } catch (Exception e) { + this.exceptionHandler.accept(e); + } + this.currentRunning.decrementAndGet(); + return true; + } + + public void shutdown() { + if (!this.isLive) { + throw new IllegalStateException("Tried shutting down a executor twice"); + } + this.isLive = false; + while (this.currentRunning.get() != 0) { + Thread.onSpinWait();//TODO: maybe add a sleep or something + } + for (var ctx : this.contexts.clear()) { + ctx.cleanup.run(); + } + + this.free0(); + } + + @Override + public void free() { + this.shutdown(); + } + + public boolean isLive() { + return this.isLive; + } + + + private static void inner(PerThreadContextExecutor s) throws InterruptedException { + Thread[] t = new Thread[1<<8]; + Random r = new Random(19874396); + for (int i = 0; i{ + s.run(); + Random lr = new Random(rs); + while (lr.nextFloat()<0.9) { + s.run(); + try { + Thread.sleep((long) (100*lr.nextFloat())); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + t[i].start(); + } + + for (var tt : t) { + tt.join(); + } + } + + public static void main(String[] args) throws InterruptedException { + AtomicInteger cc = new AtomicInteger(); + var s = new PerThreadContextExecutor(()->{ + AtomicBoolean cleaned = new AtomicBoolean(); + int[] a = new int[1]; + return new Pair<>(()->{ + if (cleaned.get()) { + System.err.println("TRIED EXECUTING CLEANED CTX"); + } else { + a[0]++; + } + }, ()->{ + if (cleaned.getAndSet(true)) { + System.err.println("TRIED DOUBLE CLEANING A VALUE"); + } else { + System.out.println("Cleaned ref, exec: " + a[0]); + cc.incrementAndGet(); + } + }); + }); + inner(s); + System.gc(); + s.shutdown(); + System.err.println(cc.get()); + } +} diff --git a/src/main/java/me/cortex/voxy/common/thread3/Service.java b/src/main/java/me/cortex/voxy/common/thread3/Service.java new file mode 100644 index 00000000..a9edffc7 --- /dev/null +++ b/src/main/java/me/cortex/voxy/common/thread3/Service.java @@ -0,0 +1,101 @@ +package me.cortex.voxy.common.thread3; + +import me.cortex.voxy.common.Logger; +import me.cortex.voxy.common.util.Pair; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import java.util.function.Supplier; + +public class Service { + private final PerThreadContextExecutor executor; + private final ServiceManager sm; + final long weight; + final String name; + final BooleanSupplier limiter; + + private final Semaphore tasks = new Semaphore(0); + private volatile boolean isLive = true; + private volatile boolean isStopping = false; + + Service(Supplier> ctxSupplier, ServiceManager sm, long weight, String name, BooleanSupplier limiter) { + this.sm = sm; + this.weight = weight; + this.name = name; + this.limiter = limiter; + + this.executor = new PerThreadContextExecutor(ctxSupplier, e->sm.handleException(this, e)); + } + + public void execute() { + if (this.isStopping) { + Logger.error("Tried executing on a dead service"); + return; + } + this.tasks.release(); + this.sm.execute(this); + } + + boolean runJob() { + if (this.isStopping||!this.isLive) { + return false; + } + if (!this.tasks.tryAcquire()) { + //Failed to get the job, probably due to a race condition + return false; + } + if (!this.executor.run()) {//Run the job + throw new IllegalStateException("Executor failed to run"); + } + return true; + } + + public boolean isLive() { + return this.isLive&&!this.isStopping; + } + + public int numJobs() { + return this.tasks.availablePermits(); + } + + public void blockTillEmpty() { + while (this.isLive() && this.numJobs() != 0) { + Thread.yield(); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + public int shutdown() { + if (this.isStopping) { + throw new IllegalStateException("Service not live"); + } + this.isStopping = true;//First mark the service as stopping + this.sm.removeService(this);//Remove the service this is so that new jobs are never executed + this.executor.shutdown();//Await shutdown of all running jobs + int remaining = this.tasks.drainPermits();//Drain the remaining tasks to 0 + this.isLive = false;//Mark the service as dead + this.sm.remJobs(remaining); + return remaining; + } + + public boolean steal() { + if (!this.tasks.tryAcquire()) { + return false; + } + this.sm.remJobs(1); + return true; + } + + public int drain() { + int tasks = this.tasks.drainPermits(); + if (tasks != 0) { + this.sm.remJobs(tasks); + } + return tasks; + } +} diff --git a/src/main/java/me/cortex/voxy/common/thread3/ServiceManager.java b/src/main/java/me/cortex/voxy/common/thread3/ServiceManager.java new file mode 100644 index 00000000..dac462c6 --- /dev/null +++ b/src/main/java/me/cortex/voxy/common/thread3/ServiceManager.java @@ -0,0 +1,180 @@ +package me.cortex.voxy.common.thread3; + +import it.unimi.dsi.fastutil.HashCommon; +import me.cortex.voxy.common.Logger; +import me.cortex.voxy.common.util.Pair; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import java.util.function.IntConsumer; +import java.util.function.Supplier; + +public class ServiceManager { + private static final class ThreadCtx { + int shiftFactor = 0; + long seed;//Random seed used for selecting service + + ThreadCtx() { + this.seed = HashCommon.murmurHash3(System.nanoTime()^System.identityHashCode(this)); + } + + long rand(long size) { + return (this.seed = HashCommon.mix(this.seed))%size; + } + } + + private final IntConsumer jobRelease; + private final ThreadLocal accelerationContext = ThreadLocal.withInitial(ThreadCtx::new); + private final AtomicInteger totalJobs = new AtomicInteger(); + private volatile Service[] services = new Service[0]; + private volatile boolean isShutdown = false; + + public ServiceManager(IntConsumer jobRelease) { + this.jobRelease = jobRelease; + } + + + public Service createServiceNoCleanup(Supplier ctxFactory, long weight) { + return this.createService(()->new Pair<>(ctxFactory.get(), ()->{}), weight, ""); + } + + public Service createServiceNoCleanup(Supplier ctxFactory, long weight, String name) { + return this.createService(()->new Pair<>(ctxFactory.get(), ()->{}), weight, name); + } + + public Service createService(Supplier> ctxFactory, long weight) { + return this.createService(ctxFactory, weight, ""); + } + + public Service createService(Supplier> ctxFactory, long weight, String name) { + return this.createService(ctxFactory, weight, name, null); + } + public synchronized Service createService(Supplier> ctxFactory, long weight, String name, BooleanSupplier limiter) { + Service newService = new Service(ctxFactory, this, weight, name, limiter); + var newServices = Arrays.copyOf(this.services, this.services.length+1); + newServices[newServices.length-1] = newService; + this.services = newServices; + return newService; + } + + public boolean runAJob() {//Executes a single job on the current thread + while (true) { + if (this.services.length == 0) return false; + if (this.runAJob0()) return true; + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private boolean runAJob0() {//Executes a single job on the current thread + if (this.services.length == 0) return false; + var ctx = this.accelerationContext.get(); + outer: + while (true) { + var services = this.services;//Capture the current services array + if (services.length == 0) return false; + if (this.totalJobs.get()==0) return false; + long totalWeight = 0; + int shiftFactor = (ctx.shiftFactor++)&Integer.MAX_VALUE;//We cycle and shift the starting service when choosing to prevent bias + int c = shiftFactor; + Service selectedService = null; + for (var service:services) { + if (!service.isLive()) { + Thread.yield(); + continue outer;//We need to refetch the array and start over + } + boolean sc = c--<=0; + if (service.limiter!=null && !service.limiter.getAsBoolean()) continue; + long jc = service.numJobs(); + if (sc&&jc!=0&&selectedService==null) selectedService=service; + totalWeight += jc * service.weight; + } + if (totalWeight == 0) return false; + + long sample = ctx.rand(totalWeight);//Random number + + for (int i = 0; i < services.length; i++) { + var service = services[(i+shiftFactor)%services.length]; + if (service.limiter!=null && !service.limiter.getAsBoolean()) continue; + sample -= service.numJobs() * service.weight; + if (sample<=0) { + selectedService = service; + break; + } + } + + if (selectedService == null) { + return false; + } + + if (!selectedService.isLive()) { + continue;//Failed to select a live service, try again + } + + if (!selectedService.runJob()) { + //We failed to run the service, try again + continue; + } + if (this.totalJobs.decrementAndGet() < 0) { + throw new IllegalStateException("Job count <0"); + } + break; + } + return true; + } + + public void shutdown() { + if (this.isShutdown) { + throw new IllegalStateException("Service manager already shutdown"); + } + this.isShutdown = true; + while (this.services.length != 0) { + Thread.yield(); + synchronized (this) { + for (var s : this.services) { + if (s.isLive()) { + throw new IllegalStateException("Service '" + s.name + "' was not in shutdown when manager shutdown"); + } + } + } + } + while (this.totalJobs.get()!=0) { + Thread.yield(); + } + } + + synchronized void removeService(Service service) { + var services = this.services; + var newServices = new Service[services.length-1]; + int j = 0; + for (int i = 0; i < services.length; i++) { + if (services[i] != service) { + newServices[j++] = services[i]; + } + } + if (j != newServices.length) { + throw new IllegalStateException("Could not find the service in the services array"); + } + + this.services = newServices; + } + + void execute(Service service) { + this.totalJobs.incrementAndGet(); + this.jobRelease.accept(1); + } + + void remJobs(int remaining) { + if (this.totalJobs.addAndGet(-remaining)<0) { + throw new IllegalStateException("total jobs <0"); + } + } + + void handleException(Service service, Exception exception) { + Logger.error("Service '"+service.name+"' on thread '"+Thread.currentThread().getName()+"' had an exception", exception); + } +} diff --git a/src/main/java/me/cortex/voxy/common/thread3/UnifiedServiceThreadPool.java b/src/main/java/me/cortex/voxy/common/thread3/UnifiedServiceThreadPool.java new file mode 100644 index 00000000..0ff91d3e --- /dev/null +++ b/src/main/java/me/cortex/voxy/common/thread3/UnifiedServiceThreadPool.java @@ -0,0 +1,121 @@ +package me.cortex.voxy.common.thread3; + +import me.cortex.voxy.common.util.Pair; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class UnifiedServiceThreadPool { + public final ServiceManager serviceManager; + private final MultiThreadPrioritySemaphore groupSemaphore; + + private final MultiThreadPrioritySemaphore.Block selfBlock; + private final ThreadGroup dedicatedPool; + private final List threads = new ArrayList<>(); + + public UnifiedServiceThreadPool() { + this.dedicatedPool = new ThreadGroup("Voxy Dedicated Service"); + this.serviceManager = new ServiceManager(this::release); + this.groupSemaphore = new MultiThreadPrioritySemaphore(this.serviceManager::runAJob); + + this.selfBlock = this.groupSemaphore.createBlock(); + } + + private final void release(int i) {this.groupSemaphore.pooledRelease(i);} + + public void setNumThreads(int threads) { + synchronized (this.threads) { + int diff = threads - this.threads.size(); + if (diff==0) return;//Already correct + if (diff<0) {//Remove threads + this.selfBlock.release(-diff); + } else {//Add threads + for (int i = 0; i < diff; i++) { + var t = new Thread(this.dedicatedPool, this::workerThread); + t.setDaemon(true); + this.threads.add(t); + t.start(); + } + } + } + while (true) { + synchronized (this.threads) { + if (this.threads.size() == threads) return; + } + try { + Thread.sleep(50); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private void workerThread() { + this.selfBlock.acquire();//This is stupid but it works + + //We are exiting, remove self from list of threads + synchronized (this.threads) { + this.threads.remove(Thread.currentThread()); + } + } + + public void shutdown() { + this.serviceManager.shutdown(); + this.selfBlock.release(10000); + while (true) { + synchronized (this.threads) { + if (this.threads.isEmpty()) { + break; + } + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + this.selfBlock.free(); + } + + + public static void main(String[] args) { + var ustp = new UnifiedServiceThreadPool(); + + AtomicInteger cc = new AtomicInteger(); + AtomicInteger cnt = new AtomicInteger(); + + var s1 = ustp.serviceManager.createService(()->{ + AtomicBoolean cleaned = new AtomicBoolean(); + AtomicInteger a = new AtomicInteger(); + return new Pair<>(()->{ + if (cleaned.get()) { + System.err.println("TRIED EXECUTING CLEANED CTX"); + } else { + a.incrementAndGet(); + cnt.incrementAndGet(); + } + }, ()->{ + if (cleaned.getAndSet(true)) { + System.err.println("TRIED DOUBLE CLEANING A VALUE"); + } else { + System.out.println("Cleaned ref, exec: " + a.get()); + cc.incrementAndGet(); + } + }); + }, 1); + + for (int i = 0; i < 1000; i++) { + s1.execute(); + } + ustp.setNumThreads(1); + ustp.setNumThreads(10); + ustp.setNumThreads(0); + ustp.setNumThreads(1); + s1.blockTillEmpty(); + s1.shutdown(); + ustp.shutdown(); + System.out.println(cnt); + } +} diff --git a/src/main/java/me/cortex/voxy/common/thread3/WeakConcurrentCleanableHashMap.java b/src/main/java/me/cortex/voxy/common/thread3/WeakConcurrentCleanableHashMap.java new file mode 100644 index 00000000..dc3b7732 --- /dev/null +++ b/src/main/java/me/cortex/voxy/common/thread3/WeakConcurrentCleanableHashMap.java @@ -0,0 +1,131 @@ +package me.cortex.voxy.common.thread3; + +import it.unimi.dsi.fastutil.HashCommon; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; + +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +public class WeakConcurrentCleanableHashMap { + //TODO could move to a Cleanable style system possibly? + + private final Consumer valueCleaner; + private final ReferenceQueue cleanupQueue = new ReferenceQueue<>(); + + private final ReentrantLock k2iLock = new ReentrantLock(); + private final Object2LongOpenHashMap> k2i = new Object2LongOpenHashMap<>(); + { + this.k2i.defaultReturnValue(-1); + } + private final Long2ObjectOpenHashMap[] i2v = new Long2ObjectOpenHashMap[1<<4]; + private final ReentrantLock[] i2vLocks = new ReentrantLock[this.i2v.length]; + { + for (int i = 0; i < this.i2v.length; i++) { + this.i2v[i] = new Long2ObjectOpenHashMap<>(); + this.i2vLocks[i] = new ReentrantLock(); + } + } + + private final AtomicInteger count = new AtomicInteger(); + + public WeakConcurrentCleanableHashMap(Consumer cleanupConsumer) { + this.valueCleaner = cleanupConsumer; + } + + private static int Id2Seg(long id, int MSK) { + return HashCommon.mix((int)id)&MSK; + } + + public V computeIfAbsent(K key, Supplier valueOnAbsent) { + this.cleanup(); + + long id = key.getAsLong(); + int bucket = Id2Seg(id, this.i2v.length-1); + var i2v = this.i2v[bucket]; + var lock = this.i2vLocks[bucket]; + lock.lock(); + if (i2v.containsKey(id)) { + lock.unlock(); + return i2v.get(id); + } else { + var v = valueOnAbsent.get(); + i2v.put(id, v); + this.k2iLock.lock(); + lock.unlock(); + this.k2i.put(new WeakReference<>(key, this.cleanupQueue), id); + this.k2iLock.unlock(); + this.count.incrementAndGet(); + return v; + } + } + + public void cleanup() { + WeakReference ref = (WeakReference) this.cleanupQueue.poll(); + if (ref != null) { + LongArrayFIFOQueue ids = new LongArrayFIFOQueue(); + this.k2iLock.lock(); + do { + long id = this.k2i.removeLong(ref); + if (id < 0) continue; + ids.enqueue(id); + } while ((ref = (WeakReference) this.cleanupQueue.poll()) != null); + this.k2iLock.unlock(); + if (ids.isEmpty()) return; + int count = ids.size(); + while (!ids.isEmpty()) { + long id = ids.dequeueLong(); + int bucket = Id2Seg(id, this.i2v.length - 1); + var lock = this.i2vLocks[bucket]; + lock.lock(); + var val = this.i2v[bucket].remove(id); + lock.unlock(); + if (val != null) { + this.valueCleaner.accept(val); + } else { + count--; + } + } + if (this.count.addAndGet(-count)<0) { + throw new IllegalStateException(); + } + } + } + + public List clear() { + this.cleanup(); + + List values = new ArrayList<>(this.size()); + //lock everything + for (var lock : this.i2vLocks) { + lock.lock(); + } + this.k2iLock.lock(); + this.k2i.clear();//Clear here while its safe to do so + for (var i2v : this.i2v) { + values.addAll(i2v.values()); + i2v.clear(); + } + this.count.set(0); + this.k2iLock.unlock(); + for (var lock : this.i2vLocks) { + lock.unlock(); + } + return values; + } + + public int size() { + return this.count.get(); + } +} diff --git a/src/main/java/me/cortex/voxy/common/world/service/SectionSavingService.java b/src/main/java/me/cortex/voxy/common/world/service/SectionSavingService.java index a3a880c8..89ff9fe8 100644 --- a/src/main/java/me/cortex/voxy/common/world/service/SectionSavingService.java +++ b/src/main/java/me/cortex/voxy/common/world/service/SectionSavingService.java @@ -3,6 +3,8 @@ package me.cortex.voxy.common.world.service; import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.thread.ServiceSlice; import me.cortex.voxy.common.thread.ServiceThreadPool; +import me.cortex.voxy.common.thread3.Service; +import me.cortex.voxy.common.thread3.ServiceManager; import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.WorldSection; @@ -12,12 +14,12 @@ import java.util.concurrent.ConcurrentLinkedDeque; // save to the db, this can be useful for just reducing the amount of thread pools in total // might have some issues with threading if the same section is saved from multiple threads? public class SectionSavingService { - private final ServiceSlice threads; + private final Service service; private record SaveEntry(WorldEngine engine, WorldSection section) {} private final ConcurrentLinkedDeque saveQueue = new ConcurrentLinkedDeque<>(); - public SectionSavingService(ServiceThreadPool threadPool) { - this.threads = threadPool.createServiceNoCleanup("Section saving service", 100, () -> this::processJob); + public SectionSavingService(ServiceManager sm) { + this.service = sm.createServiceNoCleanup(() -> this::processJob, 100, "Section saving service"); } private void processJob() { @@ -58,8 +60,8 @@ public class SectionSavingService { throw new RuntimeException(e); } //If we are still full, process entries in the queue ourselves instead of waiting for the service - while (this.getTaskCount() > 5_000 && this.threads.isAlive()) { - if (!this.threads.steal()) { + while (this.getTaskCount() > 5_000 && this.service.isLive()) { + if (!this.service.steal()) { break; } this.processJob(); @@ -67,18 +69,16 @@ public class SectionSavingService { } this.saveQueue.add(new SaveEntry(in, section)); - this.threads.execute(); + this.service.execute(); } } public void shutdown() { - if (this.threads.getJobCount() != 0) { - Logger.error("Voxy section saving still in progress, estimated " + this.threads.getJobCount() + " sections remaining."); - while (this.threads.getJobCount() != 0) { - Thread.onSpinWait(); - } + if (this.service.numJobs() != 0) { + Logger.error("Voxy section saving still in progress, estimated " + this.service.numJobs() + " sections remaining."); + this.service.blockTillEmpty(); } - this.threads.shutdown(); + this.service.shutdown(); //Manually save any remaining entries while (!this.saveQueue.isEmpty()) { this.processJob(); @@ -86,6 +86,6 @@ public class SectionSavingService { } public int getTaskCount() { - return this.threads.getJobCount(); + return this.service.numJobs(); } } diff --git a/src/main/java/me/cortex/voxy/common/world/service/VoxelIngestService.java b/src/main/java/me/cortex/voxy/common/world/service/VoxelIngestService.java index 7adf1633..8173695d 100644 --- a/src/main/java/me/cortex/voxy/common/world/service/VoxelIngestService.java +++ b/src/main/java/me/cortex/voxy/common/world/service/VoxelIngestService.java @@ -1,8 +1,8 @@ package me.cortex.voxy.common.world.service; import me.cortex.voxy.common.Logger; -import me.cortex.voxy.common.thread.ServiceSlice; -import me.cortex.voxy.common.thread.ServiceThreadPool; +import me.cortex.voxy.common.thread3.Service; +import me.cortex.voxy.common.thread3.ServiceManager; import me.cortex.voxy.common.voxelization.ILightingSupplier; import me.cortex.voxy.common.voxelization.VoxelizedSection; import me.cortex.voxy.common.voxelization.WorldConversionFactory; @@ -22,12 +22,12 @@ import java.util.concurrent.ConcurrentLinkedDeque; public class VoxelIngestService { private static final ThreadLocal SECTION_CACHE = ThreadLocal.withInitial(VoxelizedSection::createEmpty); - private final ServiceSlice threads; + private final Service service; private record IngestSection(int cx, int cy, int cz, WorldEngine world, ChunkSection section, ChunkNibbleArray blockLight, ChunkNibbleArray skyLight){} private final ConcurrentLinkedDeque ingestQueue = new ConcurrentLinkedDeque<>(); - public VoxelIngestService(ServiceThreadPool pool) { - this.threads = pool.createServiceNoCleanup("Ingest service", 5000, ()-> this::processJob); + public VoxelIngestService(ServiceManager pool) { + this.service = pool.createServiceNoCleanup(()->this::processJob, 5000, "Ingest service"); } private void processJob() { @@ -86,7 +86,7 @@ public class VoxelIngestService { } public boolean enqueueIngest(WorldEngine engine, WorldChunk chunk) { - if (!this.threads.isAlive()) { + if (!this.service.isLive()) { return false; } if (!engine.isLive()) { @@ -116,7 +116,7 @@ public class VoxelIngestService { if (section == null || !shouldIngestSection(section, chunk.getPos().x, i, chunk.getPos().z)) continue; this.ingestQueue.add(new IngestSection(chunk.getPos().x, i, chunk.getPos().z, engine, section, null, null)); try { - this.threads.execute(); + this.service.execute(); } catch (Exception e) { Logger.error("Executing had an error: assume shutting down, aborting",e); break; @@ -156,7 +156,7 @@ public class VoxelIngestService { this.ingestQueue.add(new IngestSection(chunk.getPos().x, i, chunk.getPos().z, engine, section, bl, sl));//TODO: fixme, this is technically not safe todo on the chunk load ingest, we need to copy the section data so it cant be modified while being read try { - this.threads.execute(); + this.service.execute(); } catch (Exception e) { Logger.error("Executing had an error: assume shutting down, aborting",e); break; @@ -166,11 +166,11 @@ public class VoxelIngestService { } public int getTaskCount() { - return this.threads.getJobCount(); + return this.service.numJobs(); } public void shutdown() { - this.threads.shutdown(); + this.service.shutdown(); } //Utility method to ingest a chunk into the given WorldIdentifier or world @@ -192,7 +192,7 @@ public class VoxelIngestService { private boolean rawIngest0(WorldEngine engine, ChunkSection section, int x, int y, int z, ChunkNibbleArray bl, ChunkNibbleArray sl) { this.ingestQueue.add(new IngestSection(x, y, z, engine, section, bl, sl)); try { - this.threads.execute(); + this.service.execute(); return true; } catch (Exception e) { Logger.error("Executing had an error: assume shutting down, aborting",e); diff --git a/src/main/java/me/cortex/voxy/commonImpl/VoxyInstance.java b/src/main/java/me/cortex/voxy/commonImpl/VoxyInstance.java index 679b16e9..bfc140d4 100644 --- a/src/main/java/me/cortex/voxy/commonImpl/VoxyInstance.java +++ b/src/main/java/me/cortex/voxy/commonImpl/VoxyInstance.java @@ -3,6 +3,8 @@ package me.cortex.voxy.commonImpl; import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.config.section.SectionStorage; import me.cortex.voxy.common.thread.ServiceThreadPool; +import me.cortex.voxy.common.thread3.ServiceManager; +import me.cortex.voxy.common.thread3.UnifiedServiceThreadPool; import me.cortex.voxy.common.util.MemoryBuffer; import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.service.SectionSavingService; @@ -21,7 +23,7 @@ public abstract class VoxyInstance { private volatile boolean isRunning = true; private final Thread worldCleaner; public final BooleanSupplier savingServiceRateLimiter;//Can run if this returns true - protected final ServiceThreadPool threadPool; + protected final UnifiedServiceThreadPool threadPool; protected final SectionSavingService savingService; protected final VoxelIngestService ingestService; @@ -30,11 +32,11 @@ public abstract class VoxyInstance { protected final ImportManager importManager; - public VoxyInstance(int threadCount) { + public VoxyInstance() { Logger.info("Initializing voxy instance"); - this.threadPool = new ServiceThreadPool(threadCount); - this.savingService = new SectionSavingService(this.threadPool); - this.ingestService = new VoxelIngestService(this.threadPool); + this.threadPool = new UnifiedServiceThreadPool(); + this.savingService = new SectionSavingService(this.getServiceManager()); + this.ingestService = new VoxelIngestService(this.getServiceManager()); this.importManager = this.createImportManager(); this.savingServiceRateLimiter = ()->this.savingService.getTaskCount()<1200; this.worldCleaner = new Thread(()->{ @@ -60,8 +62,8 @@ public abstract class VoxyInstance { return new ImportManager(); } - public ServiceThreadPool getThreadPool() { - return this.threadPool; + public ServiceManager getServiceManager() { + return this.threadPool.serviceManager; } public VoxelIngestService getIngestService() { return this.ingestService; diff --git a/src/main/java/me/cortex/voxy/commonImpl/importers/DHImporter.java b/src/main/java/me/cortex/voxy/commonImpl/importers/DHImporter.java index a445d52a..8c9e2b26 100644 --- a/src/main/java/me/cortex/voxy/commonImpl/importers/DHImporter.java +++ b/src/main/java/me/cortex/voxy/commonImpl/importers/DHImporter.java @@ -2,7 +2,8 @@ package me.cortex.voxy.commonImpl.importers; import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.thread.ServiceSlice; -import me.cortex.voxy.common.thread.ServiceThreadPool; +import me.cortex.voxy.common.thread3.Service; +import me.cortex.voxy.common.thread3.ServiceManager; import me.cortex.voxy.common.util.Pair; import me.cortex.voxy.common.voxelization.VoxelizedSection; import me.cortex.voxy.common.voxelization.WorldConversionFactory; @@ -43,7 +44,7 @@ import java.util.function.BooleanSupplier; public class DHImporter implements IDataImporter { private final Connection db; private final WorldEngine engine; - private final ServiceSlice threadPool; + private final Service service; private final World world; private final int bottomOfWorld; private final int worldHeightSections; @@ -68,7 +69,7 @@ public class DHImporter implements IDataImporter { } } - public DHImporter(File file, WorldEngine worldEngine, World mcWorld, ServiceThreadPool servicePool, BooleanSupplier rateLimiter) { + public DHImporter(File file, WorldEngine worldEngine, World mcWorld, ServiceManager servicePool, BooleanSupplier rateLimiter) { this.engine = worldEngine; this.world = mcWorld; this.biomeRegistry = mcWorld.getRegistryManager().getOrThrow(RegistryKeys.BIOME); @@ -85,7 +86,7 @@ public class DHImporter implements IDataImporter { } catch (SQLException e) { throw new RuntimeException(e); } - this.threadPool = servicePool.createService("DH Importer", 1, ()->{ + this.service = servicePool.createService(()->{ try { var dataFetchStmt = this.db.prepareStatement("SELECT Data,ColumnGenerationStep,Mapping FROM FullData WHERE DetailLevel = 0 AND PosX = ? AND PosZ = ?;"); var ctx = new WorkCTX(dataFetchStmt, this.worldHeightSections*16); @@ -101,7 +102,7 @@ public class DHImporter implements IDataImporter { } catch (SQLException e) { throw new RuntimeException(e); } - }, rateLimiter); + }, 10, "DH Importer", rateLimiter); } public void runImport(IUpdateCallback updateCallback, ICompletionCallback completionCallback) { @@ -139,7 +140,7 @@ public class DHImporter implements IDataImporter { while (this.isRunning&&!taskQ.isEmpty()) { this.tasks.add(taskQ.poll()); - this.threadPool.execute(); + this.service.execute(); while (this.tasks.size() > 100 && this.isRunning) { try { @@ -369,7 +370,7 @@ public class DHImporter implements IDataImporter { } catch (InterruptedException e) { throw new RuntimeException(e); } - this.threadPool.shutdown(); + this.service.shutdown(); this.engine.releaseRef(); try { this.db.close(); diff --git a/src/main/java/me/cortex/voxy/commonImpl/importers/WorldImporter.java b/src/main/java/me/cortex/voxy/commonImpl/importers/WorldImporter.java index c972ff31..13c2515b 100644 --- a/src/main/java/me/cortex/voxy/commonImpl/importers/WorldImporter.java +++ b/src/main/java/me/cortex/voxy/commonImpl/importers/WorldImporter.java @@ -3,23 +3,22 @@ package me.cortex.voxy.commonImpl.importers; import com.mojang.serialization.Codec; import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.thread.ServiceSlice; -import me.cortex.voxy.common.thread.ServiceThreadPool; +import me.cortex.voxy.common.thread3.Service; +import me.cortex.voxy.common.thread3.ServiceManager; import me.cortex.voxy.common.util.MemoryBuffer; +import me.cortex.voxy.common.util.Pair; import me.cortex.voxy.common.util.UnsafeUtil; import me.cortex.voxy.common.voxelization.VoxelizedSection; import me.cortex.voxy.common.voxelization.WorldConversionFactory; import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.WorldUpdater; -import net.minecraft.block.Block; import net.minecraft.block.BlockState; -import net.minecraft.block.Blocks; import net.minecraft.nbt.NbtCompound; import net.minecraft.nbt.NbtIo; import net.minecraft.nbt.NbtOps; import net.minecraft.network.PacketByteBuf; import net.minecraft.registry.RegistryKeys; import net.minecraft.registry.entry.RegistryEntry; -import net.minecraft.util.collection.IndexedIterable; import net.minecraft.world.World; import net.minecraft.world.biome.Biome; import net.minecraft.world.biome.BiomeKeys; @@ -39,6 +38,7 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BooleanSupplier; import java.util.function.Consumer; @@ -54,13 +54,13 @@ public class WorldImporter implements IDataImporter { private final AtomicInteger chunksProcessed = new AtomicInteger(); private final ConcurrentLinkedDeque jobQueue = new ConcurrentLinkedDeque<>(); - private final ServiceSlice threadPool; + private final Service service; private volatile boolean isRunning; - public WorldImporter(WorldEngine worldEngine, World mcWorld, ServiceThreadPool servicePool, BooleanSupplier runChecker) { + public WorldImporter(WorldEngine worldEngine, World mcWorld, ServiceManager sm, BooleanSupplier runChecker) { this.world = worldEngine; - this.threadPool = servicePool.createServiceNoCleanup("World importer", 3, ()->()->this.jobQueue.poll().run(), runChecker); + this.service = sm.createService(()->new Pair<>(()->this.jobQueue.poll().run(), ()->{}), 3, "World importer", runChecker); var biomeRegistry = mcWorld.getRegistryManager().getOrThrow(RegistryKeys.BIOME); var defaultBiome = biomeRegistry.getOrThrow(BiomeKeys.PLAINS); @@ -143,7 +143,11 @@ public class WorldImporter implements IDataImporter { return this.world; } + private final AtomicBoolean isShutdown = new AtomicBoolean(); public void shutdown() { + if (this.isShutdown.getAndSet(true)) { + return; + } this.isRunning = false; if (this.worker != null) { try { @@ -152,9 +156,9 @@ public class WorldImporter implements IDataImporter { throw new RuntimeException(e); } } - if (!this.threadPool.isFreed()) { + if (this.service.isLive()) { this.world.releaseRef(); - this.threadPool.shutdown(); + this.service.shutdown(); } //Free all the remaining entries by running the lambda while (!this.jobQueue.isEmpty()) { @@ -254,13 +258,13 @@ public class WorldImporter implements IDataImporter { } } if (!this.isRunning) { - this.threadPool.blockTillEmpty(); + this.service.blockTillEmpty(); this.completionCallback.onCompletion(this.totalChunks.get()); this.worker = null; return; } } - this.threadPool.blockTillEmpty(); + this.service.blockTillEmpty(); while (this.chunksProcessed.get() != this.totalChunks.get() && this.isRunning) { Thread.yield(); try { @@ -269,9 +273,11 @@ public class WorldImporter implements IDataImporter { throw new RuntimeException(e); } } - this.worker = null; - this.world.releaseRef(); - this.threadPool.shutdown(); + if (!this.isShutdown.getAndSet(true)) { + this.worker = null; + this.service.shutdown(); + this.world.releaseRef(); + } this.completionCallback.onCompletion(this.totalChunks.get()); }); this.worker.setName("World importer"); @@ -386,7 +392,7 @@ public class WorldImporter implements IDataImporter { }); this.totalChunks.incrementAndGet(); this.estimatedTotalChunks.incrementAndGet(); - this.threadPool.execute(); + this.service.execute(); } } }