diff --git a/src/main/java/me/cortex/voxy/common/world/thread/ServiceSlice.java b/src/main/java/me/cortex/voxy/common/world/thread/ServiceSlice.java index 912dea25..ddaaf7ac 100644 --- a/src/main/java/me/cortex/voxy/common/world/thread/ServiceSlice.java +++ b/src/main/java/me/cortex/voxy/common/world/thread/ServiceSlice.java @@ -13,7 +13,7 @@ import java.util.function.Supplier; public class ServiceSlice extends TrackedObject { private final String name; final int weightPerJob; - private volatile boolean alive = true; + volatile boolean alive = true; private final ServiceThreadPool threadPool; private final Supplier workerGenerator; final Semaphore jobCount = new Semaphore(0); @@ -82,7 +82,8 @@ public class ServiceSlice extends TrackedObject { //Tells the system that a single instance of this service needs executing public void execute() { if (!this.alive) { - throw new IllegalStateException("Tried to do work on a dead service"); + System.err.println("Tried to do work on a dead service"); + return; } this.jobCount.release(); this.jobCount2.incrementAndGet(); diff --git a/src/main/java/me/cortex/voxy/common/world/thread/ServiceThreadPool.java b/src/main/java/me/cortex/voxy/common/world/thread/ServiceThreadPool.java index 788a36be..e4330389 100644 --- a/src/main/java/me/cortex/voxy/common/world/thread/ServiceThreadPool.java +++ b/src/main/java/me/cortex/voxy/common/world/thread/ServiceThreadPool.java @@ -1,6 +1,7 @@ package me.cortex.voxy.common.world.thread; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; @@ -16,12 +17,14 @@ public class ServiceThreadPool { private volatile ServiceSlice[] serviceSlices = new ServiceSlice[0]; private final AtomicLong totalJobWeight = new AtomicLong(); + private final ThreadGroup threadGroup; public ServiceThreadPool(int workers) { + this.threadGroup = new ThreadGroup("Service job workers"); this.workers = new Thread[workers]; for (int i = 0; i < workers; i++) { int threadId = i; - var worker = new Thread(()->this.worker(threadId)); + var worker = new Thread(this.threadGroup, ()->this.worker(threadId)); worker.setDaemon(false); worker.setName("Service worker #" + i); worker.start(); @@ -46,10 +49,19 @@ public class ServiceThreadPool { synchronized void removeService(ServiceSlice service) { this.removeServiceFromArray(service); - this.totalJobWeight.addAndGet(-((long) service.weightPerJob) * service.jobCount.availablePermits()); + int permits = service.jobCount.drainPermits(); + if (this.totalJobWeight.addAndGet(-((long) service.weightPerJob) * permits) < 0) { + throw new IllegalStateException("Total job weight negative!"); + } + //Need to acquire all the shut-down jobs - if (!this.jobCounter.tryAcquire(service.jobCount.availablePermits())) { - throw new IllegalStateException("Failed to acquire all the permits for the shut down jobs"); + try { + //Wait for 1000 millis, to let shinanigans filter down + if (!this.jobCounter.tryAcquire(permits, 1000, TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Failed to acquire all the permits for the shut down jobs"); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); } } @@ -98,30 +110,30 @@ public class ServiceThreadPool { int attempts = 50; outer: while (true) { - if (attempts-- == 0) { - for (var service : this.serviceSlices) { - //Run the job - if (!service.doRun(threadId)) { - //Didnt consume the job, find a new job - continue; - } - //Consumed a job from the service, decrease weight by the amount - if (this.totalJobWeight.addAndGet(-service.weightPerJob)<0) { - throw new IllegalStateException("Total job weight is negative"); - } - break outer; - } - throw new IllegalStateException("All attempts at executing a job failed! something critically wrong has occurred"); - } - seed = (seed ^ seed >>> 30) * -4658895280553007687L; - seed = (seed ^ seed >>> 27) * -7723592293110705685L; - long clamped = seed&((1L<<63)-1); var ref = this.serviceSlices; if (ref.length == 0) { System.err.println("Service worker tried to run but had 0 slices"); break; } + if (attempts-- == 0) { + System.err.println("Unable to execute service after many attempts, releasing"); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + this.jobCounter.release();//Release the job we acquired + break; + } + seed = (seed ^ seed >>> 30) * -4658895280553007687L; + seed = (seed ^ seed >>> 27) * -7723592293110705685L; + long clamped = seed&((1L<<63)-1); + long weight = this.totalJobWeight.get(); + if (weight == 0) { + this.jobCounter.release(); + break; + } ServiceSlice service = ref[(int) (clamped % ref.length)]; //1 in 64 chance just to pick a service that has a task, in a cycling manor, this is to keep at least one service from overloading all services constantly @@ -137,7 +149,7 @@ public class ServiceThreadPool { } } else { - long chosenNumber = clamped % this.totalJobWeight.get(); + long chosenNumber = clamped % weight; for (var slice : ref) { chosenNumber -= ((long) slice.weightPerJob) * slice.jobCount.availablePermits(); if (chosenNumber <= 0) { @@ -176,6 +188,7 @@ public class ServiceThreadPool { Thread.onSpinWait(); } + int remainingJobs = this.jobCounter.drainPermits(); //Shutdown this.running = false; this.jobCounter.release(1000); @@ -190,6 +203,10 @@ public class ServiceThreadPool { if (this.totalJobWeight.get() != 0) { throw new IllegalStateException("Service pool job weight not 0 after shutdown"); } + + if (remainingJobs != 0) { + throw new IllegalStateException("Service thread pool had jobs remaining!"); + } } public int getThreadCount() {