diff --git a/src/main/java/me/cortex/voxy/client/VoxyClientInstance.java b/src/main/java/me/cortex/voxy/client/VoxyClientInstance.java index c0b6f7db..fa8bd84c 100644 --- a/src/main/java/me/cortex/voxy/client/VoxyClientInstance.java +++ b/src/main/java/me/cortex/voxy/client/VoxyClientInstance.java @@ -28,7 +28,7 @@ public class VoxyClientInstance extends VoxyInstance { private final boolean noIngestOverride; public VoxyClientInstance() { super(); - + this.setNumThreads(VoxyConfig.CONFIG.serviceThreads); var path = FlashbackCompat.getReplayStoragePath(); this.noIngestOverride = path != null; if (path == null) { diff --git a/src/main/java/me/cortex/voxy/client/compat/SemaphoreBlockImpersonator.java b/src/main/java/me/cortex/voxy/client/compat/SemaphoreBlockImpersonator.java index 21dacdba..beff73fc 100644 --- a/src/main/java/me/cortex/voxy/client/compat/SemaphoreBlockImpersonator.java +++ b/src/main/java/me/cortex/voxy/client/compat/SemaphoreBlockImpersonator.java @@ -1,6 +1,6 @@ package me.cortex.voxy.client.compat; -import me.cortex.voxy.common.thread3.MultiThreadPrioritySemaphore; +import me.cortex.voxy.common.thread.MultiThreadPrioritySemaphore; import java.util.concurrent.Semaphore; diff --git a/src/main/java/me/cortex/voxy/client/config/VoxyConfigScreenPages.java b/src/main/java/me/cortex/voxy/client/config/VoxyConfigScreenPages.java index 9814ac98..5cd4edef 100644 --- a/src/main/java/me/cortex/voxy/client/config/VoxyConfigScreenPages.java +++ b/src/main/java/me/cortex/voxy/client/config/VoxyConfigScreenPages.java @@ -50,6 +50,7 @@ public abstract class VoxyConfigScreenPages { VoxyCommon.shutdownInstance(); } }, s -> s.enabled) + .setFlags(OptionFlag.REQUIRES_RENDERER_RELOAD) .build() ).add(OptionImpl.createBuilder(int.class, storage) .setName(Text.translatable("voxy.config.general.serviceThreads")) diff --git a/src/main/java/me/cortex/voxy/client/core/VoxyRenderSystem.java b/src/main/java/me/cortex/voxy/client/core/VoxyRenderSystem.java index 831b9391..8c9802bd 100644 --- a/src/main/java/me/cortex/voxy/client/core/VoxyRenderSystem.java +++ b/src/main/java/me/cortex/voxy/client/core/VoxyRenderSystem.java @@ -28,8 +28,7 @@ import me.cortex.voxy.client.core.rendering.util.UploadStream; import me.cortex.voxy.client.core.util.GPUTiming; import me.cortex.voxy.client.core.util.IrisUtil; import me.cortex.voxy.common.Logger; -import me.cortex.voxy.common.thread.ServiceThreadPool; -import me.cortex.voxy.common.thread3.ServiceManager; +import me.cortex.voxy.common.thread.ServiceManager; import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.commonImpl.VoxyCommon; import net.caffeinemc.mods.sodium.client.render.chunk.ChunkRenderMatrices; diff --git a/src/main/java/me/cortex/voxy/client/core/rendering/building/RenderGenerationService.java b/src/main/java/me/cortex/voxy/client/core/rendering/building/RenderGenerationService.java index 8fc03125..a53ee121 100644 --- a/src/main/java/me/cortex/voxy/client/core/rendering/building/RenderGenerationService.java +++ b/src/main/java/me/cortex/voxy/client/core/rendering/building/RenderGenerationService.java @@ -4,9 +4,8 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import me.cortex.voxy.client.core.model.IdNotYetComputedException; import me.cortex.voxy.client.core.model.ModelBakerySubsystem; -import me.cortex.voxy.common.thread.ServiceSlice; -import me.cortex.voxy.common.thread3.Service; -import me.cortex.voxy.common.thread3.ServiceManager; +import me.cortex.voxy.common.thread.Service; +import me.cortex.voxy.common.thread.ServiceManager; import me.cortex.voxy.common.util.Pair; import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.WorldSection; diff --git a/src/main/java/me/cortex/voxy/client/mixin/sodium/MixinChunkJobQueue.java b/src/main/java/me/cortex/voxy/client/mixin/sodium/MixinChunkJobQueue.java index ddab925a..6c4c2bab 100644 --- a/src/main/java/me/cortex/voxy/client/mixin/sodium/MixinChunkJobQueue.java +++ b/src/main/java/me/cortex/voxy/client/mixin/sodium/MixinChunkJobQueue.java @@ -1,15 +1,13 @@ package me.cortex.voxy.client.mixin.sodium; import me.cortex.voxy.client.compat.SemaphoreBlockImpersonator; -import me.cortex.voxy.common.thread3.MultiThreadPrioritySemaphore; +import me.cortex.voxy.common.thread.MultiThreadPrioritySemaphore; import me.cortex.voxy.commonImpl.VoxyCommon; -import org.spongepowered.asm.mixin.Final; import org.spongepowered.asm.mixin.Mixin; import org.spongepowered.asm.mixin.Unique; import org.spongepowered.asm.mixin.injection.At; import org.spongepowered.asm.mixin.injection.Inject; import org.spongepowered.asm.mixin.injection.Redirect; -import org.spongepowered.asm.mixin.injection.callback.CallbackInfo; import org.spongepowered.asm.mixin.injection.callback.CallbackInfoReturnable; import java.util.concurrent.Semaphore; diff --git a/src/main/java/me/cortex/voxy/common/thread3/MultiThreadPrioritySemaphore.java b/src/main/java/me/cortex/voxy/common/thread/MultiThreadPrioritySemaphore.java similarity index 96% rename from src/main/java/me/cortex/voxy/common/thread3/MultiThreadPrioritySemaphore.java rename to src/main/java/me/cortex/voxy/common/thread/MultiThreadPrioritySemaphore.java index 949c90f2..7530b3d7 100644 --- a/src/main/java/me/cortex/voxy/common/thread3/MultiThreadPrioritySemaphore.java +++ b/src/main/java/me/cortex/voxy/common/thread/MultiThreadPrioritySemaphore.java @@ -1,11 +1,9 @@ -package me.cortex.voxy.common.thread3; +package me.cortex.voxy.common.thread; import me.cortex.voxy.common.util.TrackedObject; -import java.lang.ref.WeakReference; import java.util.*; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; //Basiclly acts as a priority based mutlti semaphore // allows the pooling of multiple threadpools together while prioritizing the work the original was ment for diff --git a/src/main/java/me/cortex/voxy/common/thread3/PerThreadContextExecutor.java b/src/main/java/me/cortex/voxy/common/thread/PerThreadContextExecutor.java similarity index 98% rename from src/main/java/me/cortex/voxy/common/thread3/PerThreadContextExecutor.java rename to src/main/java/me/cortex/voxy/common/thread/PerThreadContextExecutor.java index f115cf5b..5434a8f3 100644 --- a/src/main/java/me/cortex/voxy/common/thread3/PerThreadContextExecutor.java +++ b/src/main/java/me/cortex/voxy/common/thread/PerThreadContextExecutor.java @@ -1,4 +1,4 @@ -package me.cortex.voxy.common.thread3; +package me.cortex.voxy.common.thread; import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.util.Pair; @@ -8,7 +8,6 @@ import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Supplier; diff --git a/src/main/java/me/cortex/voxy/common/thread/QueuedServiceSlice.java b/src/main/java/me/cortex/voxy/common/thread/QueuedServiceSlice.java deleted file mode 100644 index d7dabe1b..00000000 --- a/src/main/java/me/cortex/voxy/common/thread/QueuedServiceSlice.java +++ /dev/null @@ -1,32 +0,0 @@ -package me.cortex.voxy.common.thread; - -import me.cortex.voxy.common.util.Pair; - -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.function.BooleanSupplier; -import java.util.function.Consumer; -import java.util.function.Supplier; - -public class QueuedServiceSlice extends ServiceSlice { - private final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>(); - - QueuedServiceSlice(ServiceThreadPool threadPool, Supplier, Runnable>> workerGenerator, String name, int weightPerJob, BooleanSupplier condition) { - super(threadPool, null, name, weightPerJob, condition); - //Fuck off java with the this bullshit before super constructor, fucking bullshit - super.setWorkerGenerator(() -> { - var work = workerGenerator.get(); - var consumer = work.left(); - return new Pair<>(() -> consumer.accept(this.queue.pop()), work.right()); - }); - } - - @Override - public void execute() { - throw new IllegalStateException("Cannot call .execute() on a QueuedServiceSlice"); - } - - public void enqueue(T obj) { - this.queue.add(obj); - super.execute(); - } -} diff --git a/src/main/java/me/cortex/voxy/common/thread3/Service.java b/src/main/java/me/cortex/voxy/common/thread/Service.java similarity index 97% rename from src/main/java/me/cortex/voxy/common/thread3/Service.java rename to src/main/java/me/cortex/voxy/common/thread/Service.java index a9edffc7..e8cc2cb9 100644 --- a/src/main/java/me/cortex/voxy/common/thread3/Service.java +++ b/src/main/java/me/cortex/voxy/common/thread/Service.java @@ -1,10 +1,9 @@ -package me.cortex.voxy.common.thread3; +package me.cortex.voxy.common.thread; import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.util.Pair; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BooleanSupplier; import java.util.function.Supplier; diff --git a/src/main/java/me/cortex/voxy/common/thread3/ServiceManager.java b/src/main/java/me/cortex/voxy/common/thread/ServiceManager.java similarity index 99% rename from src/main/java/me/cortex/voxy/common/thread3/ServiceManager.java rename to src/main/java/me/cortex/voxy/common/thread/ServiceManager.java index 98872b52..43424661 100644 --- a/src/main/java/me/cortex/voxy/common/thread3/ServiceManager.java +++ b/src/main/java/me/cortex/voxy/common/thread/ServiceManager.java @@ -1,4 +1,4 @@ -package me.cortex.voxy.common.thread3; +package me.cortex.voxy.common.thread; import it.unimi.dsi.fastutil.HashCommon; import me.cortex.voxy.common.Logger; diff --git a/src/main/java/me/cortex/voxy/common/thread/ServiceSlice.java b/src/main/java/me/cortex/voxy/common/thread/ServiceSlice.java deleted file mode 100644 index 44809323..00000000 --- a/src/main/java/me/cortex/voxy/common/thread/ServiceSlice.java +++ /dev/null @@ -1,189 +0,0 @@ -package me.cortex.voxy.common.thread; - -import me.cortex.voxy.common.Logger; -import me.cortex.voxy.common.util.Pair; -import me.cortex.voxy.common.util.TrackedObject; -import net.minecraft.client.MinecraftClient; -import net.minecraft.text.Text; - -import java.util.Arrays; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BooleanSupplier; -import java.util.function.Supplier; - -public class ServiceSlice extends TrackedObject { - final String name; - final int weightPerJob; - volatile boolean alive = true; - private final ServiceThreadPool threadPool; - private Supplier> workerGenerator; - final Semaphore jobCount = new Semaphore(0); - private final Runnable[] runningCtxs; - private final Runnable[] cleanupCtxs; - private final AtomicInteger activeCount = new AtomicInteger(); - private final AtomicInteger jobCount2 = new AtomicInteger(); - private final BooleanSupplier condition; - - ServiceSlice(ServiceThreadPool threadPool, Supplier> workerGenerator, String name, int weightPerJob, BooleanSupplier condition) { - this.threadPool = threadPool; - this.condition = condition; - this.runningCtxs = new Runnable[threadPool.getThreadCount()]; - this.cleanupCtxs = new Runnable[threadPool.getThreadCount()]; - this.name = name; - this.weightPerJob = weightPerJob; - this.setWorkerGenerator(workerGenerator); - } - - protected void setWorkerGenerator(Supplier> workerGenerator) { - this.workerGenerator = workerGenerator; - } - - boolean doRun(int threadIndex) { - //If executable - if (!this.condition.getAsBoolean()) { - return false; - } - - //Run this thread once if possible - if (!this.jobCount.tryAcquire()) { - return false; - } - - if (!this.alive) { - return true;//Return true because we have "consumed" the job (needed to keep weight tracking correct) - } - - this.activeCount.incrementAndGet(); - - //Check that we are still alive - if (!this.alive) { - if (this.activeCount.decrementAndGet() < 0) { - throw new IllegalStateException("Alive count negative!:" + this.name); - } - return true; - } - - //If the running context is null, create and set it - var ctx = this.runningCtxs[threadIndex]; - if (ctx == null) { - var pair = this.workerGenerator.get(); - ctx = pair.left(); - this.cleanupCtxs[threadIndex] = pair.right();//Set cleanup - this.runningCtxs[threadIndex] = ctx; - } - - //Run the job - try { - ctx.run(); - } catch (Exception e) { - Logger.error("Unexpected error occurred while executing a service job, expect things to break badly: " + this.name, e); - MinecraftClient.getInstance().execute(()->MinecraftClient.getInstance().player.sendMessage(Text.literal("A voxy service had an exception while executing please check logs and report error"), true)); - } finally { - if (this.activeCount.decrementAndGet() < 0) { - throw new IllegalStateException("Alive count negative!: " + this.name); - } - if (this.jobCount2.decrementAndGet() < 0) { - throw new IllegalStateException("Job count negative!" + this.name); - } - } - return true; - } - - //Tells the system that a single instance of this service needs executing - public void execute() { - if (!this.alive) { - Logger.error("Tried to do work on a dead service: " + this.name, new Throwable()); - return; - } - this.threadPool.addWeight(this); - this.jobCount2.incrementAndGet(); - this.jobCount.release(); - this.threadPool.execute(); - } - - public void shutdown() { - this.alive = false; - - //Wait till all is finished - while (this.activeCount.get() != 0) { - Thread.onSpinWait(); - } - - //Tell parent to remove - this.threadPool.removeService(this); - - this.runCleanup(); - - super.free0(); - } - - private void runCleanup() { - for (var runnable : this.cleanupCtxs) { - if (runnable != null) { - runnable.run(); - } - } - Arrays.fill(this.cleanupCtxs, null); - } - - @Override - public void free() { - this.shutdown(); - } - - public int getJobCount() { - return this.jobCount.availablePermits(); - } - - public boolean hasJobs() { - return this.jobCount.availablePermits() != 0; - } - - boolean workConditionMet() { - return this.condition.getAsBoolean(); - } - - public void blockTillEmpty() { - while (this.activeCount.get() != 0 && this.alive) { - while ((this.jobCount2.get() != 0 || this.jobCount.availablePermits()!=0) && this.alive) { - Thread.onSpinWait(); - try { - Thread.sleep(10); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - Thread.yield(); - } - } - - //Steal a job, if there is no job available return false - public boolean steal() { - if (!this.jobCount.tryAcquire()) { - return false; - } - if (this.jobCount2.decrementAndGet() < 0) { - throw new IllegalStateException("Job count negative!!!:" + this.name); - } - this.threadPool.steal(this, 1); - return true; - } - - public int drain() { - int count = this.jobCount.drainPermits(); - if (count == 0) { - return 0; - } - - if (this.jobCount2.addAndGet(-count) < 0) { - throw new IllegalStateException("Job count negative!!!:" + this.name); - } - this.threadPool.steal(this, count); - return count; - } - - public boolean isAlive() { - return this.alive; - } -} diff --git a/src/main/java/me/cortex/voxy/common/thread/ServiceThreadPool.java b/src/main/java/me/cortex/voxy/common/thread/ServiceThreadPool.java deleted file mode 100644 index 0dc734d3..00000000 --- a/src/main/java/me/cortex/voxy/common/thread/ServiceThreadPool.java +++ /dev/null @@ -1,354 +0,0 @@ -package me.cortex.voxy.common.thread; - -import me.cortex.voxy.common.Logger; -import me.cortex.voxy.common.util.Pair; -import me.cortex.voxy.common.util.ThreadUtils; -import me.cortex.voxy.common.util.cpu.CpuLayout; - -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BooleanSupplier; -import java.util.function.Supplier; - - -//TODO: could also probably replace all of this with just VirtualThreads and a Executors.newThreadPerTaskExecutor with a fixed thread pool -// it is probably better anyway -public class ServiceThreadPool { - private volatile boolean running = true; - private volatile boolean releaseNow = false; - private Thread[] workers = new Thread[0]; - private final Semaphore jobCounter = new Semaphore(0); - - private volatile ServiceSlice[] serviceSlices = new ServiceSlice[0]; - private final AtomicLong totalJobWeight = new AtomicLong(); - private final ThreadGroup threadGroup; - - public ServiceThreadPool(int threadCount) { - this(threadCount, 3);//Maybe change to 3 - } - - public ServiceThreadPool(int threadCount, int priority) { - if (CpuLayout.getCoreCount()-2 < threadCount) { - Logger.warn("The thread count over core count -2, performance degradation possible"); - } - - this.threadGroup = new ThreadGroup("Service job workers"); - this.workers = new Thread[threadCount]; - for (int i = 0; i < threadCount; i++) { - int threadId = i; - var worker = new Thread(this.threadGroup, ()->{ - if (CpuLayout.CORES!=null && CpuLayout.CORES.length>3) { - //Set worker affinity if possible - CpuLayout.setThreadAffinity(CpuLayout.CORES[2 + (threadId % (CpuLayout.CORES.length - 2))]); - } - if (threadId != 0) { - ThreadUtils.SetSelfThreadPriorityWin32(-1); - //ThreadUtils.SetSelfThreadPriorityWin32(ThreadUtils.WIN32_THREAD_MODE_BACKGROUND_BEGIN); - } - this.worker(threadId); - }); - worker.setDaemon(false); - worker.setName("Service worker #" + i); - if (i == 0) {//Give the first thread normal priority, this helps if the system is under huge load for voxy to get some work done - worker.setPriority(Thread.NORM_PRIORITY); - } else { - worker.setPriority(priority); - } - worker.start(); - worker.setUncaughtExceptionHandler(this::handleUncaughtException); - this.workers[i] = worker; - } - } - - public ServiceSlice createServiceNoCleanup(String name, int weight, Supplier workGenerator) { - return this.createService(name, weight, ()->new Pair<>(workGenerator.get(), null)); - } - - public ServiceSlice createServiceNoCleanup(String name, int weight, Supplier workGenerator, BooleanSupplier executionCondition) { - return this.createService(name, weight, ()->new Pair<>(workGenerator.get(), null), executionCondition); - } - - public synchronized ServiceSlice createService(String name, int weight, Supplier> workGenerator) { - return this.createService(name, weight, workGenerator, ()->true); - } - - public synchronized ServiceSlice createService(String name, int weight, Supplier> workGenerator, BooleanSupplier executionCondition) { - var service = new ServiceSlice(this, workGenerator, name, weight, executionCondition); - this.insertService(service); - return service; - } - - private void insertService(ServiceSlice service) { - var current = this.serviceSlices; - var newList = new ServiceSlice[current.length + 1]; - System.arraycopy(current, 0, newList, 0, current.length); - newList[current.length] = service; - this.serviceSlices = newList; - } - - synchronized void removeService(ServiceSlice service) { - this.removeServiceFromArray(service); - int permits = service.jobCount.drainPermits(); - if (this.totalJobWeight.addAndGet(-((long) service.weightPerJob) * permits) < 0) { - throw new IllegalStateException("Total job weight negative!"); - } - - //Need to acquire all the shut-down jobs - try { - //Wait for 1000 millis, to let shinanigans filter down - if (!this.jobCounter.tryAcquire(permits, 1000, TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("Failed to acquire all the permits for the shut down jobs"); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - private synchronized void removeServiceFromArray(ServiceSlice service) { - var lst = this.serviceSlices; - int idx; - for (idx = 0; idx < lst.length; idx++) { - if (lst[idx] == service) { - break; - } - } - if (idx == lst.length) { - throw new IllegalStateException("Service not in service list"); - } - - //Remove the slice from the array and set it back - - if (lst.length-1 == 0) { - this.serviceSlices = new ServiceSlice[0]; - return; - } - - ServiceSlice[] newArr = new ServiceSlice[lst.length-1]; - System.arraycopy(lst, 0, newArr, 0, idx); - if (lst.length-1 != idx) { - //Need to do a second copy - System.arraycopy(lst, idx+1, newArr, idx, newArr.length-idx); - } - this.serviceSlices = newArr; - } - - long addWeight(ServiceSlice service) { - return this.totalJobWeight.addAndGet(service.weightPerJob); - } - - void execute() { - this.jobCounter.release(1); - } - - void steal(ServiceSlice service, int count) { - this.totalJobWeight.addAndGet(-(service.weightPerJob*(long)count)); - this.releaseNow = true; - for (int i = 0; i < count; i++) { - this.jobCounter.acquireUninterruptibly(); - } - this.releaseNow = false; - } - - private void worker(int threadId) { - long[] seed = new long[]{1234342^(threadId*124987198651981L+215987981111L)}; - int[] revolvingSelector = new int[1]; - long[] logIO = new long[] {0, System.currentTimeMillis()}; - while (true) { - this.jobCounter.acquireUninterruptibly(); - if (!this.running) { - break; - } - //This is because of JIT moment (it cant really replace methods while they are executing afak) - this.worker_work(threadId, seed, revolvingSelector, logIO); - } - } - - private void worker_work(int threadId, long[] seedIO, int[] revolvingSelectorIO, long[] logIO) { - final int ATTEMPT_COUNT = 50; - int attempts = ATTEMPT_COUNT; - while (true) { - if (attempts < ATTEMPT_COUNT-2) { - try { - Thread.sleep(20); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - if (this.releaseNow) { - this.jobCounter.release(); - try { - Thread.sleep(20); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - break; - } - var ref = this.serviceSlices; - if (ref.length == 0) { - Logger.error("Service worker tried to run but had 0 slices"); - break; - } - if (attempts-- == 0) { - Logger.warn("Unable to execute service after many attempts, releasing"); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - this.jobCounter.release();//Release the job we acquired - break; - } - long seed = seedIO[0]*1984691871L+1497210975L; - seed = (seed ^ (seed >>> 30)) * -4658895280553007687L; - seed = (seed ^ (seed >>> 27)) * -7723592293110705685L; - seedIO[0] = seed; - long clamped = seed&((1L<<63)-1); - long weight = this.totalJobWeight.get(); - if (weight == 0) { - this.jobCounter.release(); - break; - } - - ServiceSlice service = null; - for (int i = 0; i < ref.length; i++) { - var service2 = ref[(int) ((clamped+i) % ref.length)]; - if (service2.hasJobs() && service2.workConditionMet()) { - service = service2; - break; - } - } - if (service == null) { - logIO[0]++; - long delta = System.currentTimeMillis()-logIO[1]; - if (delta>30_000) { - logIO[1] = System.currentTimeMillis(); - Logger.warn("No available jobs, sleeping releasing returning: " + (delta/1000) + " attempts " + logIO[0]); - logIO[0] = 0; - } - try { - Thread.sleep((long) (500*Math.random()+200)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - this.jobCounter.release(); - break; - } - - //1 in 64 chance just to pick a service that has a task, in a cycling manor, this is to keep at least one service from overloading all services constantly - if (((seed>>10)&63) == 0) { - int revolvingSelector = revolvingSelectorIO[0]; - for (int i = 0; i < ref.length; i++) { - int idx = (i+revolvingSelector)%ref.length; - var slice = ref[idx]; - if (slice.hasJobs() && slice.workConditionMet()) { - service = slice; - revolvingSelector = (idx+1)%ref.length; - break; - } - } - revolvingSelectorIO[0] = revolvingSelector; - } else { - long chosenNumber = clamped % weight; - for (var slice : ref) { - chosenNumber -= ((long) slice.weightPerJob) * slice.jobCount.availablePermits(); - if (chosenNumber <= 0 && slice.workConditionMet()) { - service = slice; - break; - } - } - } - - //Run the job - if (!service.doRun(threadId)) { - //Didnt consume the job, find a new job - continue; - } - - //Consumed a job from the service, decrease weight by the amount - if (this.totalJobWeight.addAndGet(-service.weightPerJob)<0) { - Logger.error("Total job weight is negative"); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (this.totalJobWeight.get()<0) { - throw new IllegalStateException("Total job weight still negative"); - } - } - - //Sleep for a bit after running a job, yeild the thread - //Thread.yield(); - break; - } - } - - private void handleUncaughtException(Thread thread, Throwable throwable) { - Logger.error("Service worker thread has exploded unexpectedly! this is really not good very very bad.", throwable); - } - - public void shutdown() { - if (this.serviceSlices.length != 0) { - String remaining = ""; - for (var service : this.serviceSlices) { - remaining += service.name + ", "; - } - throw new IllegalStateException("All service slices must be shutdown before thread pool can exit. Remaining: " + remaining); - } - - //Wait for the tasks to finish - while (this.jobCounter.availablePermits() != 0) { - Thread.onSpinWait(); - } - - int remainingJobs = this.jobCounter.drainPermits(); - //Shutdown - this.running = false; - this.jobCounter.release(1000); - - //Wait for thread to join - try { - for (var worker : this.workers) { - worker.join(); - } - } catch (InterruptedException e) {throw new RuntimeException(e);} - - if (this.totalJobWeight.get() != 0) { - throw new IllegalStateException("Service pool job weight not 0 after shutdown"); - } - - if (remainingJobs != 0) { - throw new IllegalStateException("Service thread pool had jobs remaining!"); - } - } - - public int getThreadCount() { - return this.workers.length; - } - - /* - public void setThreadCount(int threadCount) { - if (threadCount == this.workers.length) { - return;//No change - } - - if (threadCount < this.workers.length) { - //Need to remove workers - } else { - //Need to add new workers - } - - this.workers = new Thread[threadCount]; - for (int i = 0; i < workers; i++) { - int threadId = i; - var worker = new Thread(this.threadGroup, ()->this.worker(threadId)); - worker.setDaemon(false); - worker.setName("Service worker #" + i); - worker.start(); - worker.setUncaughtExceptionHandler(this::handleUncaughtException); - this.workers[i] = worker; - } - } - */ -} diff --git a/src/main/java/me/cortex/voxy/common/thread3/UnifiedServiceThreadPool.java b/src/main/java/me/cortex/voxy/common/thread/UnifiedServiceThreadPool.java similarity index 99% rename from src/main/java/me/cortex/voxy/common/thread3/UnifiedServiceThreadPool.java rename to src/main/java/me/cortex/voxy/common/thread/UnifiedServiceThreadPool.java index 3948847e..efc953b0 100644 --- a/src/main/java/me/cortex/voxy/common/thread3/UnifiedServiceThreadPool.java +++ b/src/main/java/me/cortex/voxy/common/thread/UnifiedServiceThreadPool.java @@ -1,4 +1,4 @@ -package me.cortex.voxy.common.thread3; +package me.cortex.voxy.common.thread; import me.cortex.voxy.common.util.Pair; diff --git a/src/main/java/me/cortex/voxy/common/thread3/WeakConcurrentCleanableHashMap.java b/src/main/java/me/cortex/voxy/common/thread/WeakConcurrentCleanableHashMap.java similarity index 96% rename from src/main/java/me/cortex/voxy/common/thread3/WeakConcurrentCleanableHashMap.java rename to src/main/java/me/cortex/voxy/common/thread/WeakConcurrentCleanableHashMap.java index dc3b7732..8b112f44 100644 --- a/src/main/java/me/cortex/voxy/common/thread3/WeakConcurrentCleanableHashMap.java +++ b/src/main/java/me/cortex/voxy/common/thread/WeakConcurrentCleanableHashMap.java @@ -1,17 +1,14 @@ -package me.cortex.voxy.common.thread3; +package me.cortex.voxy.common.thread; import it.unimi.dsi.fastutil.HashCommon; import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue; -import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; diff --git a/src/main/java/me/cortex/voxy/common/world/service/SectionSavingService.java b/src/main/java/me/cortex/voxy/common/world/service/SectionSavingService.java index 89ff9fe8..7a5ca042 100644 --- a/src/main/java/me/cortex/voxy/common/world/service/SectionSavingService.java +++ b/src/main/java/me/cortex/voxy/common/world/service/SectionSavingService.java @@ -1,10 +1,8 @@ package me.cortex.voxy.common.world.service; import me.cortex.voxy.common.Logger; -import me.cortex.voxy.common.thread.ServiceSlice; -import me.cortex.voxy.common.thread.ServiceThreadPool; -import me.cortex.voxy.common.thread3.Service; -import me.cortex.voxy.common.thread3.ServiceManager; +import me.cortex.voxy.common.thread.Service; +import me.cortex.voxy.common.thread.ServiceManager; import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.WorldSection; diff --git a/src/main/java/me/cortex/voxy/common/world/service/VoxelIngestService.java b/src/main/java/me/cortex/voxy/common/world/service/VoxelIngestService.java index 36fd2436..7b940069 100644 --- a/src/main/java/me/cortex/voxy/common/world/service/VoxelIngestService.java +++ b/src/main/java/me/cortex/voxy/common/world/service/VoxelIngestService.java @@ -1,8 +1,8 @@ package me.cortex.voxy.common.world.service; import me.cortex.voxy.common.Logger; -import me.cortex.voxy.common.thread3.Service; -import me.cortex.voxy.common.thread3.ServiceManager; +import me.cortex.voxy.common.thread.Service; +import me.cortex.voxy.common.thread.ServiceManager; import me.cortex.voxy.common.voxelization.ILightingSupplier; import me.cortex.voxy.common.voxelization.VoxelizedSection; import me.cortex.voxy.common.voxelization.WorldConversionFactory; diff --git a/src/main/java/me/cortex/voxy/commonImpl/VoxyInstance.java b/src/main/java/me/cortex/voxy/commonImpl/VoxyInstance.java index 8c7ab31e..15ef74f1 100644 --- a/src/main/java/me/cortex/voxy/commonImpl/VoxyInstance.java +++ b/src/main/java/me/cortex/voxy/commonImpl/VoxyInstance.java @@ -1,11 +1,9 @@ package me.cortex.voxy.commonImpl; -import me.cortex.voxy.client.config.VoxyConfig; import me.cortex.voxy.common.Logger; import me.cortex.voxy.common.config.section.SectionStorage; -import me.cortex.voxy.common.thread.ServiceThreadPool; -import me.cortex.voxy.common.thread3.ServiceManager; -import me.cortex.voxy.common.thread3.UnifiedServiceThreadPool; +import me.cortex.voxy.common.thread.ServiceManager; +import me.cortex.voxy.common.thread.UnifiedServiceThreadPool; import me.cortex.voxy.common.util.MemoryBuffer; import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.service.SectionSavingService; diff --git a/src/main/java/me/cortex/voxy/commonImpl/importers/DHImporter.java b/src/main/java/me/cortex/voxy/commonImpl/importers/DHImporter.java index 8c9e2b26..ec135d21 100644 --- a/src/main/java/me/cortex/voxy/commonImpl/importers/DHImporter.java +++ b/src/main/java/me/cortex/voxy/commonImpl/importers/DHImporter.java @@ -1,9 +1,8 @@ package me.cortex.voxy.commonImpl.importers; import me.cortex.voxy.common.Logger; -import me.cortex.voxy.common.thread.ServiceSlice; -import me.cortex.voxy.common.thread3.Service; -import me.cortex.voxy.common.thread3.ServiceManager; +import me.cortex.voxy.common.thread.Service; +import me.cortex.voxy.common.thread.ServiceManager; import me.cortex.voxy.common.util.Pair; import me.cortex.voxy.common.voxelization.VoxelizedSection; import me.cortex.voxy.common.voxelization.WorldConversionFactory; diff --git a/src/main/java/me/cortex/voxy/commonImpl/importers/WorldImporter.java b/src/main/java/me/cortex/voxy/commonImpl/importers/WorldImporter.java index 13c2515b..847b79bc 100644 --- a/src/main/java/me/cortex/voxy/commonImpl/importers/WorldImporter.java +++ b/src/main/java/me/cortex/voxy/commonImpl/importers/WorldImporter.java @@ -2,9 +2,8 @@ package me.cortex.voxy.commonImpl.importers; import com.mojang.serialization.Codec; import me.cortex.voxy.common.Logger; -import me.cortex.voxy.common.thread.ServiceSlice; -import me.cortex.voxy.common.thread3.Service; -import me.cortex.voxy.common.thread3.ServiceManager; +import me.cortex.voxy.common.thread.Service; +import me.cortex.voxy.common.thread.ServiceManager; import me.cortex.voxy.common.util.MemoryBuffer; import me.cortex.voxy.common.util.Pair; import me.cortex.voxy.common.util.UnsafeUtil;