locks and replacing O(N) size queries with counters
This commit is contained in:
@@ -15,6 +15,7 @@ import org.lwjgl.opengl.GL11;
|
||||
import java.lang.invoke.VarHandle;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.lwjgl.opengl.ARBFramebufferObject.GL_COLOR_ATTACHMENT0;
|
||||
import static org.lwjgl.opengl.GL11.GL_COLOR_BUFFER_BIT;
|
||||
@@ -28,6 +29,7 @@ public class ModelBakerySubsystem {
|
||||
|
||||
private final ModelStore storage = new ModelStore();
|
||||
public final ModelFactory factory;
|
||||
private final AtomicInteger blockIdCount = new AtomicInteger();
|
||||
private final ConcurrentLinkedDeque<Integer> blockIdQueue = new ConcurrentLinkedDeque<>();//TODO: replace with custom DS
|
||||
private final ConcurrentLinkedDeque<Mapper.BiomeEntry> biomeQueue = new ConcurrentLinkedDeque<>();
|
||||
|
||||
@@ -68,19 +70,22 @@ public class ModelBakerySubsystem {
|
||||
//TimingStatistics.modelProcess.start();
|
||||
long start = System.nanoTime();
|
||||
VarHandle.fullFence();
|
||||
{
|
||||
if (this.blockIdCount.get() != 0) {
|
||||
long budget = Math.min(totalBudget-150_000, totalBudget-(this.factory.resultJobs.size()*10_000L))-150_000;
|
||||
|
||||
//Always do 1 iteration minimum
|
||||
Integer i = this.blockIdQueue.poll();
|
||||
int j = 0;
|
||||
if (i != null) {
|
||||
do {
|
||||
this.factory.addEntry(i);
|
||||
j++;
|
||||
if (budget<(System.nanoTime() - start)+1000)
|
||||
break;
|
||||
i = this.blockIdQueue.poll();
|
||||
} while (i != null);
|
||||
}
|
||||
this.blockIdCount.addAndGet(-j);
|
||||
}
|
||||
|
||||
this.factory.tick();
|
||||
@@ -103,11 +108,12 @@ public class ModelBakerySubsystem {
|
||||
}
|
||||
|
||||
public void addBiome(Mapper.BiomeEntry biomeEntry) {
|
||||
this.blockIdCount.incrementAndGet();
|
||||
this.biomeQueue.add(biomeEntry);
|
||||
}
|
||||
|
||||
public void addDebugData(List<String> debug) {
|
||||
debug.add(String.format("MQ/IF/MC: %04d, %03d, %04d", this.blockIdQueue.size(), this.factory.getInflightCount(), this.factory.getBakedCount()));//Model bake queue/in flight/model baked count
|
||||
debug.add(String.format("MQ/IF/MC: %04d, %03d, %04d", this.blockIdCount.get(), this.factory.getInflightCount(), this.factory.getBakedCount()));//Model bake queue/in flight/model baked count
|
||||
}
|
||||
|
||||
public ModelStore getStore() {
|
||||
@@ -115,10 +121,10 @@ public class ModelBakerySubsystem {
|
||||
}
|
||||
|
||||
public boolean areQueuesEmpty() {
|
||||
return this.blockIdQueue.isEmpty() && this.factory.getInflightCount() == 0 && this.biomeQueue.isEmpty();
|
||||
return this.blockIdCount.get()==0 && this.factory.getInflightCount() == 0 && this.biomeQueue.isEmpty();
|
||||
}
|
||||
|
||||
public int getProcessingCount() {
|
||||
return this.blockIdQueue.size() + this.factory.getInflightCount();
|
||||
return this.blockIdCount.get() + this.factory.getInflightCount();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import java.util.List;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.StampedLock;
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
@@ -50,7 +51,9 @@ public class RenderGenerationService {
|
||||
}
|
||||
}
|
||||
|
||||
private final AtomicInteger taskQueueCount = new AtomicInteger();
|
||||
private final PriorityBlockingQueue<BuildTask> taskQueue = new PriorityBlockingQueue<>(320000, (a,b)-> Long.compareUnsigned(a.priority, b.priority));
|
||||
private final StampedLock taskMapLock = new StampedLock();
|
||||
private final Long2ObjectOpenHashMap<BuildTask> taskMap = new Long2ObjectOpenHashMap<>(320000);
|
||||
|
||||
private final WorldEngine world;
|
||||
@@ -124,6 +127,7 @@ public class RenderGenerationService {
|
||||
//TODO: add a generated render data cache
|
||||
private void processJob(RenderDataFactory45 factory, IntOpenHashSet seenMissedIds) {
|
||||
BuildTask task = this.taskQueue.poll();
|
||||
this.taskQueueCount.decrementAndGet();
|
||||
|
||||
//long time = BuiltSection.getTime();
|
||||
boolean shouldFreeSection = true;
|
||||
@@ -142,21 +146,24 @@ public class RenderGenerationService {
|
||||
section.assertNotFree();
|
||||
BuiltSection mesh = null;
|
||||
|
||||
synchronized (this.taskMap) {
|
||||
{
|
||||
long stamp = this.taskMapLock.writeLock();
|
||||
var rtask = this.taskMap.remove(task.position);
|
||||
if (rtask != task) {
|
||||
this.taskMapLock.unlockWrite(stamp);
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
this.taskMapLock.unlockWrite(stamp);
|
||||
}
|
||||
|
||||
try {
|
||||
mesh = factory.generateMesh(section);
|
||||
} catch (IdNotYetComputedException e) {
|
||||
{
|
||||
BuildTask other;
|
||||
synchronized (this.taskMap) {
|
||||
other = this.taskMap.putIfAbsent(task.position, task);
|
||||
}
|
||||
long stamp = this.taskMapLock.writeLock();
|
||||
BuildTask other = this.taskMap.putIfAbsent(task.position, task);
|
||||
this.taskMapLock.unlockWrite(stamp);
|
||||
|
||||
if (other != null) {//Weve been replaced
|
||||
//Request the block
|
||||
if (e.isIdBlockId) {
|
||||
@@ -225,6 +232,7 @@ public class RenderGenerationService {
|
||||
|
||||
task.updatePriority();
|
||||
this.taskQueue.add(task);
|
||||
this.taskQueueCount.incrementAndGet();
|
||||
|
||||
if (this.threads.isAlive()) {//Only execute if were not dead
|
||||
this.threads.execute();//Since we put in queue, release permit
|
||||
@@ -244,17 +252,18 @@ public class RenderGenerationService {
|
||||
|
||||
public void enqueueTask(long pos) {
|
||||
boolean[] isOurs = new boolean[1];
|
||||
BuildTask task;
|
||||
synchronized (this.taskMap) {
|
||||
task = this.taskMap.computeIfAbsent(pos, p->{
|
||||
long stamp = this.taskMapLock.writeLock();
|
||||
BuildTask task = this.taskMap.computeIfAbsent(pos, p->{
|
||||
isOurs[0] = true;
|
||||
return new BuildTask(p);
|
||||
});
|
||||
}
|
||||
this.taskMapLock.unlockWrite(stamp);
|
||||
|
||||
if (isOurs[0]) {//If its not ours we dont care about it
|
||||
//Set priority and insert into queue and execute
|
||||
task.updatePriority();
|
||||
this.taskQueue.add(task);
|
||||
this.taskQueueCount.incrementAndGet();
|
||||
this.threads.execute();
|
||||
}
|
||||
}
|
||||
@@ -270,8 +279,8 @@ public class RenderGenerationService {
|
||||
while (this.threads.hasJobs()) {
|
||||
int i = this.threads.drain();
|
||||
if (i == 0) break;
|
||||
|
||||
synchronized (this.taskMap) {
|
||||
{
|
||||
long stamp = this.taskMapLock.writeLock();
|
||||
for (int j = 0; j < i; j++) {
|
||||
var task = this.taskQueue.remove();
|
||||
if (task.section != null) {
|
||||
@@ -281,6 +290,8 @@ public class RenderGenerationService {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
this.taskMapLock.unlockWrite(stamp);
|
||||
this.taskQueueCount.addAndGet(-i);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,22 +301,24 @@ public class RenderGenerationService {
|
||||
//Cleanup any remaining data
|
||||
while (!this.taskQueue.isEmpty()) {
|
||||
var task = this.taskQueue.remove();
|
||||
this.taskQueueCount.decrementAndGet();
|
||||
if (task.section != null) {
|
||||
task.section.release();
|
||||
}
|
||||
synchronized (this.taskMap) {
|
||||
if (this.taskMap.remove(task.position) != task) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
long stamp = this.taskMapLock.writeLock();
|
||||
if (this.taskMap.remove(task.position) != task) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
this.taskMapLock.unlockWrite(stamp);
|
||||
}
|
||||
}
|
||||
|
||||
public void addDebugData(List<String> debug) {
|
||||
debug.add("RSSQ: " + this.taskQueue.size());//render section service queue
|
||||
debug.add("RSSQ: " + this.taskQueueCount.get());//render section service queue
|
||||
}
|
||||
|
||||
public int getTaskCount() {
|
||||
return this.taskQueue.size();
|
||||
return this.taskQueueCount.get();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ public class MessageQueue <T> {
|
||||
}
|
||||
|
||||
public int consumeNano(long budget) {
|
||||
if (budget < 25_000) return 0;
|
||||
//if (budget < 25_000) return 0;
|
||||
if (this.count.get() == 0) {
|
||||
return 0;
|
||||
}
|
||||
@@ -61,9 +61,13 @@ public class MessageQueue <T> {
|
||||
}
|
||||
|
||||
public final void clear(Consumer<T> cleaner) {
|
||||
while (!this.queue.isEmpty()) {
|
||||
cleaner.accept(this.queue.pop());
|
||||
}
|
||||
do {
|
||||
var v = this.queue.poll();
|
||||
if (v == null) {
|
||||
break;
|
||||
}
|
||||
cleaner.accept(v);
|
||||
} while (true);
|
||||
}
|
||||
|
||||
public int count() {
|
||||
|
||||
@@ -23,7 +23,9 @@ public class ActiveSectionTracker {
|
||||
private final SectionLoader loader;
|
||||
|
||||
private final int lruSize;
|
||||
private final StampedLock lruLock = new StampedLock();
|
||||
private final Long2ObjectLinkedOpenHashMap<WorldSection> lruSecondaryCache;//TODO: THIS NEEDS TO BECOME A GLOBAL STATIC CACHE
|
||||
|
||||
@Nullable
|
||||
public final WorldEngine engine;
|
||||
|
||||
@@ -89,9 +91,9 @@ public class ActiveSectionTracker {
|
||||
}
|
||||
|
||||
if (isLoader) {
|
||||
synchronized (this.lruSecondaryCache) {
|
||||
section = this.lruSecondaryCache.remove(key);
|
||||
}
|
||||
long stamp = this.lruLock.writeLock();
|
||||
section = this.lruSecondaryCache.remove(key);
|
||||
this.lruLock.unlockWrite(stamp);
|
||||
}
|
||||
|
||||
//If this thread was the one to create the reference then its the thread to load the section
|
||||
@@ -170,14 +172,15 @@ public class ActiveSectionTracker {
|
||||
lock.unlockWrite(stamp);
|
||||
|
||||
if (sec != null) {
|
||||
WorldSection a;
|
||||
synchronized (this.lruSecondaryCache) {
|
||||
a = this.lruSecondaryCache.put(section.key, section);
|
||||
//If cache is bigger than its ment to be, remove the least recently used and free it
|
||||
if (a == null && this.lruSize < this.lruSecondaryCache.size()) {
|
||||
a = this.lruSecondaryCache.removeFirst();
|
||||
}
|
||||
stamp = this.lruLock.writeLock();
|
||||
|
||||
WorldSection a = this.lruSecondaryCache.put(section.key, section);
|
||||
//If cache is bigger than its ment to be, remove the least recently used and free it
|
||||
if (a == null && this.lruSize < this.lruSecondaryCache.size()) {
|
||||
a = this.lruSecondaryCache.removeFirst();
|
||||
}
|
||||
this.lruLock.unlockWrite(stamp);
|
||||
|
||||
if (a != null) {
|
||||
a._releaseArray();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user