Attempted more fixes for ServiceThreadPool race conditions when removing services

This commit is contained in:
mcrcortex
2024-08-07 12:42:36 +10:00
parent 0135c63b88
commit a5122d31a4
2 changed files with 43 additions and 25 deletions

View File

@@ -13,7 +13,7 @@ import java.util.function.Supplier;
public class ServiceSlice extends TrackedObject { public class ServiceSlice extends TrackedObject {
private final String name; private final String name;
final int weightPerJob; final int weightPerJob;
private volatile boolean alive = true; volatile boolean alive = true;
private final ServiceThreadPool threadPool; private final ServiceThreadPool threadPool;
private final Supplier<Runnable> workerGenerator; private final Supplier<Runnable> workerGenerator;
final Semaphore jobCount = new Semaphore(0); final Semaphore jobCount = new Semaphore(0);
@@ -82,7 +82,8 @@ public class ServiceSlice extends TrackedObject {
//Tells the system that a single instance of this service needs executing //Tells the system that a single instance of this service needs executing
public void execute() { public void execute() {
if (!this.alive) { if (!this.alive) {
throw new IllegalStateException("Tried to do work on a dead service"); System.err.println("Tried to do work on a dead service");
return;
} }
this.jobCount.release(); this.jobCount.release();
this.jobCount2.incrementAndGet(); this.jobCount2.incrementAndGet();

View File

@@ -1,6 +1,7 @@
package me.cortex.voxy.common.world.thread; package me.cortex.voxy.common.world.thread;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier; import java.util.function.BooleanSupplier;
@@ -16,12 +17,14 @@ public class ServiceThreadPool {
private volatile ServiceSlice[] serviceSlices = new ServiceSlice[0]; private volatile ServiceSlice[] serviceSlices = new ServiceSlice[0];
private final AtomicLong totalJobWeight = new AtomicLong(); private final AtomicLong totalJobWeight = new AtomicLong();
private final ThreadGroup threadGroup;
public ServiceThreadPool(int workers) { public ServiceThreadPool(int workers) {
this.threadGroup = new ThreadGroup("Service job workers");
this.workers = new Thread[workers]; this.workers = new Thread[workers];
for (int i = 0; i < workers; i++) { for (int i = 0; i < workers; i++) {
int threadId = i; int threadId = i;
var worker = new Thread(()->this.worker(threadId)); var worker = new Thread(this.threadGroup, ()->this.worker(threadId));
worker.setDaemon(false); worker.setDaemon(false);
worker.setName("Service worker #" + i); worker.setName("Service worker #" + i);
worker.start(); worker.start();
@@ -46,10 +49,19 @@ public class ServiceThreadPool {
synchronized void removeService(ServiceSlice service) { synchronized void removeService(ServiceSlice service) {
this.removeServiceFromArray(service); this.removeServiceFromArray(service);
this.totalJobWeight.addAndGet(-((long) service.weightPerJob) * service.jobCount.availablePermits()); int permits = service.jobCount.drainPermits();
if (this.totalJobWeight.addAndGet(-((long) service.weightPerJob) * permits) < 0) {
throw new IllegalStateException("Total job weight negative!");
}
//Need to acquire all the shut-down jobs //Need to acquire all the shut-down jobs
if (!this.jobCounter.tryAcquire(service.jobCount.availablePermits())) { try {
throw new IllegalStateException("Failed to acquire all the permits for the shut down jobs"); //Wait for 1000 millis, to let shinanigans filter down
if (!this.jobCounter.tryAcquire(permits, 1000, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("Failed to acquire all the permits for the shut down jobs");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
} }
@@ -98,30 +110,30 @@ public class ServiceThreadPool {
int attempts = 50; int attempts = 50;
outer: outer:
while (true) { while (true) {
if (attempts-- == 0) {
for (var service : this.serviceSlices) {
//Run the job
if (!service.doRun(threadId)) {
//Didnt consume the job, find a new job
continue;
}
//Consumed a job from the service, decrease weight by the amount
if (this.totalJobWeight.addAndGet(-service.weightPerJob)<0) {
throw new IllegalStateException("Total job weight is negative");
}
break outer;
}
throw new IllegalStateException("All attempts at executing a job failed! something critically wrong has occurred");
}
seed = (seed ^ seed >>> 30) * -4658895280553007687L;
seed = (seed ^ seed >>> 27) * -7723592293110705685L;
long clamped = seed&((1L<<63)-1);
var ref = this.serviceSlices; var ref = this.serviceSlices;
if (ref.length == 0) { if (ref.length == 0) {
System.err.println("Service worker tried to run but had 0 slices"); System.err.println("Service worker tried to run but had 0 slices");
break; break;
} }
if (attempts-- == 0) {
System.err.println("Unable to execute service after many attempts, releasing");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.jobCounter.release();//Release the job we acquired
break;
}
seed = (seed ^ seed >>> 30) * -4658895280553007687L;
seed = (seed ^ seed >>> 27) * -7723592293110705685L;
long clamped = seed&((1L<<63)-1);
long weight = this.totalJobWeight.get();
if (weight == 0) {
this.jobCounter.release();
break;
}
ServiceSlice service = ref[(int) (clamped % ref.length)]; ServiceSlice service = ref[(int) (clamped % ref.length)];
//1 in 64 chance just to pick a service that has a task, in a cycling manor, this is to keep at least one service from overloading all services constantly //1 in 64 chance just to pick a service that has a task, in a cycling manor, this is to keep at least one service from overloading all services constantly
@@ -137,7 +149,7 @@ public class ServiceThreadPool {
} }
} else { } else {
long chosenNumber = clamped % this.totalJobWeight.get(); long chosenNumber = clamped % weight;
for (var slice : ref) { for (var slice : ref) {
chosenNumber -= ((long) slice.weightPerJob) * slice.jobCount.availablePermits(); chosenNumber -= ((long) slice.weightPerJob) * slice.jobCount.availablePermits();
if (chosenNumber <= 0) { if (chosenNumber <= 0) {
@@ -176,6 +188,7 @@ public class ServiceThreadPool {
Thread.onSpinWait(); Thread.onSpinWait();
} }
int remainingJobs = this.jobCounter.drainPermits();
//Shutdown //Shutdown
this.running = false; this.running = false;
this.jobCounter.release(1000); this.jobCounter.release(1000);
@@ -190,6 +203,10 @@ public class ServiceThreadPool {
if (this.totalJobWeight.get() != 0) { if (this.totalJobWeight.get() != 0) {
throw new IllegalStateException("Service pool job weight not 0 after shutdown"); throw new IllegalStateException("Service pool job weight not 0 after shutdown");
} }
if (remainingJobs != 0) {
throw new IllegalStateException("Service thread pool had jobs remaining!");
}
} }
public int getThreadCount() { public int getThreadCount() {