Fix service pools

This commit is contained in:
mcrcortex
2024-08-06 22:09:37 +10:00
parent 10ff5dda7d
commit cd3b40399c
2 changed files with 20 additions and 3 deletions

View File

@@ -74,6 +74,7 @@ public class ServiceSlice extends TrackedObject {
if (!this.alive) { if (!this.alive) {
throw new IllegalStateException("Tried to do work on a dead service"); throw new IllegalStateException("Tried to do work on a dead service");
} }
this.jobCount.release();
this.threadPool.execute(this); this.threadPool.execute(this);
} }

View File

@@ -42,6 +42,10 @@ public class ServiceThreadPool {
synchronized void removeService(ServiceSlice service) { synchronized void removeService(ServiceSlice service) {
this.removeServiceFromArray(service); this.removeServiceFromArray(service);
this.totalJobWeight.addAndGet(-((long) service.weightPerJob) * service.jobCount.availablePermits()); this.totalJobWeight.addAndGet(-((long) service.weightPerJob) * service.jobCount.availablePermits());
//Need to acquire all the shut-down jobs
if (!this.jobCounter.tryAcquire(service.jobCount.availablePermits())) {
throw new IllegalStateException("Failed to acquire all the permits for the shut down jobs");
}
} }
private synchronized void removeServiceFromArray(ServiceSlice service) { private synchronized void removeServiceFromArray(ServiceSlice service) {
@@ -80,16 +84,24 @@ public class ServiceThreadPool {
private void worker(int threadId) { private void worker(int threadId) {
long seed = 1234342; long seed = 1234342;
while (true) { while (true) {
seed = (seed ^ seed >>> 30) * -4658895280553007687L;
seed = (seed ^ seed >>> 27) * -7723592293110705685L;
long clamped = seed&((1L<<63)-1);
this.jobCounter.acquireUninterruptibly(); this.jobCounter.acquireUninterruptibly();
if (!this.running) { if (!this.running) {
break; break;
} }
int attempts = 50;
while (true) { while (true) {
if (attempts-- == 0) {
throw new IllegalStateException("All attempts at executing a job failed! something critically wrong has occurred");
}
seed = (seed ^ seed >>> 30) * -4658895280553007687L;
seed = (seed ^ seed >>> 27) * -7723592293110705685L;
long clamped = seed&((1L<<63)-1);
var ref = this.serviceSlices; var ref = this.serviceSlices;
if (ref.length == 0) {
System.err.println("Service worker tried to run but had 0 slices");
break;
}
long chosenNumber = clamped % this.totalJobWeight.get(); long chosenNumber = clamped % this.totalJobWeight.get();
ServiceSlice service = ref[(int) (clamped % ref.length)]; ServiceSlice service = ref[(int) (clamped % ref.length)];
for (var slice : ref) { for (var slice : ref) {
@@ -138,6 +150,10 @@ public class ServiceThreadPool {
worker.join(); worker.join();
} }
} catch (InterruptedException e) {throw new RuntimeException(e);} } catch (InterruptedException e) {throw new RuntimeException(e);}
if (this.totalJobWeight.get() != 0) {
throw new IllegalStateException("Service pool job weight not 0 after shutdown");
}
} }
public int getThreadCount() { public int getThreadCount() {