Swap to a single ServiceThreadPool workload
This commit is contained in:
@@ -23,7 +23,6 @@ public class Voxy implements ClientModInitializer {
|
||||
|
||||
@Override
|
||||
public void onInitializeClient() {
|
||||
|
||||
ClientCommandRegistrationCallback.EVENT.register((dispatcher, registryAccess) -> {
|
||||
dispatcher.register(WorldImportCommand.register());
|
||||
});
|
||||
|
||||
@@ -24,15 +24,10 @@ public class VoxyConfig {
|
||||
|
||||
public boolean enabled = true;
|
||||
public boolean ingestEnabled = true;
|
||||
public int qualityScale = 12;
|
||||
public int maxSections = 200_000;
|
||||
public int renderDistance = 128;
|
||||
public int geometryBufferSize = (1<<30)/8;
|
||||
public int ingestThreads = 2;
|
||||
public int savingThreads = 4;
|
||||
public int renderThreads = 5;
|
||||
public boolean useMeshShaderIfPossible = true;
|
||||
public int renderDistance = 128;//Unused at the present
|
||||
public int serviceThreads = Math.max(Runtime.getRuntime().availableProcessors()/2, 1);
|
||||
public String defaultSaveConfig;
|
||||
public int renderQuality = 256;//Smaller is higher quality
|
||||
|
||||
|
||||
public static VoxyConfig loadOrCreate() {
|
||||
@@ -69,8 +64,4 @@ public class VoxyConfig {
|
||||
.getConfigDir()
|
||||
.resolve("voxy-config.json");
|
||||
}
|
||||
|
||||
public boolean useMeshShaders() {
|
||||
return this.useMeshShaderIfPossible && Capabilities.INSTANCE.meshShaders;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,8 +46,6 @@ public class VoxyConfigScreenFactory implements ModMenuApi {
|
||||
}
|
||||
|
||||
private static void addGeneralCategory(ConfigBuilder builder, VoxyConfig config) {
|
||||
|
||||
|
||||
ConfigCategory category = builder.getOrCreateCategory(Text.translatable("voxy.config.general"));
|
||||
ConfigEntryBuilder entryBuilder = builder.entryBuilder();
|
||||
|
||||
@@ -76,12 +74,13 @@ public class VoxyConfigScreenFactory implements ModMenuApi {
|
||||
.setDefaultValue(DEFAULT.ingestEnabled)
|
||||
.build());
|
||||
|
||||
category.addEntry(entryBuilder.startIntSlider(Text.translatable("voxy.config.general.quality"), config.qualityScale, 8, 32)
|
||||
category.addEntry(entryBuilder.startIntSlider(Text.translatable("voxy.config.general.quality"), config.renderQuality, 32, 512)
|
||||
.setTooltip(Text.translatable("voxy.config.general.quality.tooltip"))
|
||||
.setSaveConsumer(val -> config.qualityScale = val)
|
||||
.setDefaultValue(DEFAULT.qualityScale)
|
||||
.setSaveConsumer(val -> config.renderQuality = val)
|
||||
.setDefaultValue(DEFAULT.renderQuality)
|
||||
.build());
|
||||
|
||||
/*
|
||||
category.addEntry(entryBuilder.startIntSlider(Text.translatable("voxy.config.general.geometryBuffer"), config.geometryBufferSize, (1<<27)/8, ((1<<31)-1)/8)
|
||||
.setTooltip(Text.translatable("voxy.config.general.geometryBuffer.tooltip"))
|
||||
.setSaveConsumer(val -> config.geometryBufferSize = val)
|
||||
@@ -93,32 +92,22 @@ public class VoxyConfigScreenFactory implements ModMenuApi {
|
||||
.setSaveConsumer(val -> config.maxSections = val)
|
||||
.setDefaultValue(DEFAULT.maxSections)
|
||||
.build());
|
||||
*/
|
||||
|
||||
category.addEntry(entryBuilder.startIntField(Text.translatable("voxy.config.general.renderDistance"), config.renderDistance)
|
||||
.setTooltip(Text.translatable("voxy.config.general.renderDistance.tooltip"))
|
||||
.setSaveConsumer(val -> config.renderDistance = val)
|
||||
.setDefaultValue(DEFAULT.renderDistance)
|
||||
.build());
|
||||
|
||||
category.addEntry(entryBuilder.startBooleanToggle(Text.translatable("voxy.config.general.nvmesh"), config.useMeshShaderIfPossible)
|
||||
.setTooltip(Text.translatable("voxy.config.general.nvmesh.tooltip"))
|
||||
.setSaveConsumer(val -> config.useMeshShaderIfPossible = val)
|
||||
.setDefaultValue(DEFAULT.useMeshShaderIfPossible)
|
||||
.build());
|
||||
|
||||
//category.addEntry(entryBuilder.startIntSlider(Text.translatable("voxy.config.general.compression"), config.savingCompressionLevel, 1, 21)
|
||||
// .setTooltip(Text.translatable("voxy.config.general.compression.tooltip"))
|
||||
// .setSaveConsumer(val -> config.savingCompressionLevel = val)
|
||||
// .setDefaultValue(DEFAULT.savingCompressionLevel)
|
||||
// .build());
|
||||
}
|
||||
|
||||
private static void addThreadsCategory(ConfigBuilder builder, VoxyConfig config) {
|
||||
ConfigCategory category = builder.getOrCreateCategory(Text.translatable("voxy.config.threads"));
|
||||
ConfigEntryBuilder entryBuilder = builder.entryBuilder();
|
||||
|
||||
category.addEntry(entryBuilder.startIntSlider(Text.translatable("voxy.config.threads.ingest"), config.ingestThreads, 1, Runtime.getRuntime().availableProcessors())
|
||||
.setTooltip(Text.translatable("voxy.config.ingest.tooltip"))
|
||||
/*
|
||||
category.addEntry(entryBuilder.startIntSlider(Text.translatable("voxy.config.threads.service"), config.serviceThreads, 1, Runtime.getRuntime().availableProcessors())
|
||||
.setTooltip(Text.translatable("voxy.config.threads.tooltip"))
|
||||
.setSaveConsumer(val -> config.ingestThreads = val)
|
||||
.setDefaultValue(DEFAULT.ingestThreads)
|
||||
.build());
|
||||
@@ -134,6 +123,7 @@ public class VoxyConfigScreenFactory implements ModMenuApi {
|
||||
.setSaveConsumer(val -> config.renderThreads = val)
|
||||
.setDefaultValue(DEFAULT.renderThreads)
|
||||
.build());
|
||||
*/
|
||||
}
|
||||
|
||||
private static void addStorageCategory(ConfigBuilder builder, VoxyConfig config) {
|
||||
|
||||
@@ -2,6 +2,7 @@ package me.cortex.voxy.client.core;
|
||||
|
||||
import com.mojang.blaze3d.systems.RenderSystem;
|
||||
import me.cortex.voxy.client.Voxy;
|
||||
import me.cortex.voxy.client.config.VoxyConfig;
|
||||
import me.cortex.voxy.client.core.rendering.*;
|
||||
import me.cortex.voxy.client.core.rendering.post.PostProcessing;
|
||||
import me.cortex.voxy.client.core.rendering.util.DownloadStream;
|
||||
@@ -9,6 +10,7 @@ import me.cortex.voxy.client.core.util.IrisUtil;
|
||||
import me.cortex.voxy.client.saver.ContextSelectionSystem;
|
||||
import me.cortex.voxy.common.world.WorldEngine;
|
||||
import me.cortex.voxy.client.importers.WorldImporter;
|
||||
import me.cortex.voxy.common.world.thread.ServiceThreadPool;
|
||||
import net.minecraft.client.MinecraftClient;
|
||||
import net.minecraft.client.gui.hud.ClientBossBar;
|
||||
import net.minecraft.client.render.Camera;
|
||||
@@ -54,20 +56,21 @@ public class VoxelCore {
|
||||
|
||||
private final RenderService renderer;
|
||||
private final PostProcessing postProcessing;
|
||||
|
||||
//private final Thread shutdownThread = new Thread(this::shutdown);
|
||||
private final ServiceThreadPool serviceThreadPool;
|
||||
|
||||
private WorldImporter importer;
|
||||
public VoxelCore(ContextSelectionSystem.Selection worldSelection) {
|
||||
this.world = worldSelection.createEngine();
|
||||
var cfg = worldSelection.getConfig();
|
||||
this.serviceThreadPool = new ServiceThreadPool(VoxyConfig.CONFIG.serviceThreads);
|
||||
|
||||
this.world = worldSelection.createEngine(this.serviceThreadPool);
|
||||
System.out.println("Initializing voxy core");
|
||||
|
||||
//Trigger the shared index buffer loading
|
||||
SharedIndexBuffer.INSTANCE.id();
|
||||
Capabilities.init();//Ensure clinit is called
|
||||
|
||||
this.renderer = new RenderService(this.world);
|
||||
this.renderer = new RenderService(this.world, this.serviceThreadPool);
|
||||
System.out.println("Using " + this.renderer.getClass().getSimpleName());
|
||||
this.postProcessing = new PostProcessing();
|
||||
|
||||
@@ -183,6 +186,8 @@ public class VoxelCore {
|
||||
if (this.postProcessing!=null){try {this.postProcessing.shutdown();} catch (Exception e) {e.printStackTrace();}}
|
||||
System.out.println("Shutting down world engine");
|
||||
try {this.world.shutdown();} catch (Exception e) {e.printStackTrace();}
|
||||
System.out.println("Shutting down service thread pool");
|
||||
this.serviceThreadPool.shutdown();
|
||||
System.out.println("Voxel core shut down");
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import me.cortex.voxy.client.core.rendering.util.DownloadStream;
|
||||
import me.cortex.voxy.client.core.rendering.util.UploadStream;
|
||||
import me.cortex.voxy.common.world.WorldEngine;
|
||||
import me.cortex.voxy.common.world.WorldSection;
|
||||
import me.cortex.voxy.common.world.thread.ServiceThreadPool;
|
||||
import net.minecraft.client.render.Camera;
|
||||
|
||||
import java.util.Arrays;
|
||||
@@ -40,7 +41,7 @@ public class RenderService<T extends AbstractSectionRenderer<J, ?>, J extends Vi
|
||||
|
||||
private final ConcurrentLinkedDeque<BuiltSection> sectionBuildResultQueue = new ConcurrentLinkedDeque<>();
|
||||
|
||||
public RenderService(WorldEngine world) {
|
||||
public RenderService(WorldEngine world, ServiceThreadPool serviceThreadPool) {
|
||||
this.modelService = new ModelBakerySubsystem(world.getMapper());
|
||||
|
||||
//Max sections: ~500k
|
||||
@@ -53,7 +54,7 @@ public class RenderService<T extends AbstractSectionRenderer<J, ?>, J extends Vi
|
||||
this.nodeManager = new HierarchicalNodeManager(1<<21, this.sectionRenderer.getGeometryManager(), positionFilterForwarder);
|
||||
|
||||
this.viewportSelector = new ViewportSelector<>(this.sectionRenderer::createViewport);
|
||||
this.renderGen = new RenderGenerationService(world, this.modelService, VoxyConfig.CONFIG.renderThreads, this.sectionBuildResultQueue::add, this.sectionRenderer.getGeometryManager() instanceof IUsesMeshlets);
|
||||
this.renderGen = new RenderGenerationService(world, this.modelService, serviceThreadPool, this.sectionBuildResultQueue::add, this.sectionRenderer.getGeometryManager() instanceof IUsesMeshlets);
|
||||
positionFilterForwarder.setCallback(this.renderGen::enqueueTask);
|
||||
|
||||
this.traversal = new HierarchicalOcclusionTraverser(this.nodeManager, 512);
|
||||
|
||||
@@ -7,6 +7,8 @@ import me.cortex.voxy.client.core.model.ModelBakerySubsystem;
|
||||
import me.cortex.voxy.common.world.WorldEngine;
|
||||
import me.cortex.voxy.common.world.WorldSection;
|
||||
import me.cortex.voxy.common.world.other.Mapper;
|
||||
import me.cortex.voxy.common.world.thread.ServiceSlice;
|
||||
import me.cortex.voxy.common.world.thread.ServiceThreadPool;
|
||||
import net.minecraft.client.MinecraftClient;
|
||||
import net.minecraft.text.Text;
|
||||
|
||||
@@ -20,31 +22,30 @@ public class RenderGenerationService {
|
||||
public interface TaskChecker {boolean check(int lvl, int x, int y, int z);}
|
||||
private record BuildTask(long position, Supplier<WorldSection> sectionSupplier, boolean[] hasDoneModelRequest) {}
|
||||
|
||||
private volatile boolean running = true;
|
||||
private final Thread[] workers;
|
||||
|
||||
private final Long2ObjectLinkedOpenHashMap<BuildTask> taskQueue = new Long2ObjectLinkedOpenHashMap<>();
|
||||
|
||||
private final Semaphore taskCounter = new Semaphore(0);
|
||||
private final WorldEngine world;
|
||||
private final ModelBakerySubsystem modelBakery;
|
||||
private final Consumer<BuiltSection> resultConsumer;
|
||||
private final BuiltSectionMeshCache meshCache = new BuiltSectionMeshCache();
|
||||
private final boolean emitMeshlets;
|
||||
|
||||
public RenderGenerationService(WorldEngine world, ModelBakerySubsystem modelBakery, int workers, Consumer<BuiltSection> consumer, boolean emitMeshlets) {
|
||||
private final ServiceSlice threads;
|
||||
|
||||
|
||||
public RenderGenerationService(WorldEngine world, ModelBakerySubsystem modelBakery, ServiceThreadPool serviceThreadPool, Consumer<BuiltSection> consumer, boolean emitMeshlets) {
|
||||
this.emitMeshlets = emitMeshlets;
|
||||
this.world = world;
|
||||
this.modelBakery = modelBakery;
|
||||
this.resultConsumer = consumer;
|
||||
this.workers = new Thread[workers];
|
||||
for (int i = 0; i < workers; i++) {
|
||||
this.workers[i] = new Thread(this::renderWorker);
|
||||
this.workers[i].setPriority(3);
|
||||
this.workers[i].setDaemon(true);
|
||||
this.workers[i].setName("Render generation service #" + i);
|
||||
this.workers[i].start();
|
||||
}
|
||||
|
||||
this.threads = serviceThreadPool.createService("Section mesh generation service", 100, ()->{
|
||||
//Thread local instance of the factory
|
||||
var factory = new RenderDataFactory(this.world, this.modelBakery.factory, this.emitMeshlets);
|
||||
return () -> {
|
||||
this.processJob(factory);
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
//NOTE: the biomes are always fully populated/kept up to date
|
||||
@@ -65,13 +66,7 @@ public class RenderGenerationService {
|
||||
}
|
||||
|
||||
//TODO: add a generated render data cache
|
||||
private void renderWorker() {
|
||||
//Thread local instance of the factory
|
||||
var factory = new RenderDataFactory(this.world, this.modelBakery.factory, this.emitMeshlets);
|
||||
while (this.running) {
|
||||
this.taskCounter.acquireUninterruptibly();
|
||||
if (!this.running) break;
|
||||
try {
|
||||
private void processJob(RenderDataFactory factory) {
|
||||
BuildTask task;
|
||||
synchronized (this.taskQueue) {
|
||||
task = this.taskQueue.removeFirst();
|
||||
@@ -79,7 +74,7 @@ public class RenderGenerationService {
|
||||
var section = task.sectionSupplier.get();
|
||||
if (section == null) {
|
||||
this.resultConsumer.accept(new BuiltSection(task.position));
|
||||
continue;
|
||||
return;
|
||||
}
|
||||
section.assertNotFree();
|
||||
BuiltSection mesh = null;
|
||||
@@ -106,7 +101,7 @@ public class RenderGenerationService {
|
||||
queuedTask.hasDoneModelRequest[0] = true;//Mark (or remark) the section as having chunks requested
|
||||
|
||||
if (queuedTask == task) {//use the == not .equal to see if we need to release a permit
|
||||
this.taskCounter.release();//Since we put in queue, release permit
|
||||
this.threads.execute();//Since we put in queue, release permit
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -120,11 +115,6 @@ public class RenderGenerationService {
|
||||
mesh.free();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
MinecraftClient.getInstance().executeSync(()->MinecraftClient.getInstance().player.sendMessage(Text.literal("Voxy render service had an exception while executing please check logs and report error")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public int getMeshCacheCount() {
|
||||
@@ -169,7 +159,7 @@ public class RenderGenerationService {
|
||||
}
|
||||
synchronized (this.taskQueue) {
|
||||
this.taskQueue.computeIfAbsent(ikey, key->{
|
||||
this.taskCounter.release();
|
||||
this.threads.execute();
|
||||
return new BuildTask(ikey, ()->{
|
||||
if (checker.check(WorldEngine.getLevel(ikey), WorldEngine.getX(ikey), WorldEngine.getY(ikey), WorldEngine.getZ(ikey))) {
|
||||
return this.world.acquireIfExists(WorldEngine.getLevel(ikey), WorldEngine.getX(ikey), WorldEngine.getY(ikey), WorldEngine.getZ(ikey));
|
||||
@@ -196,6 +186,7 @@ public class RenderGenerationService {
|
||||
this.meshCache.clearMesh(WorldEngine.getWorldSectionId(lvl, x, y, z));
|
||||
}
|
||||
|
||||
/*
|
||||
public void removeTask(int lvl, int x, int y, int z) {
|
||||
synchronized (this.taskQueue) {
|
||||
if (this.taskQueue.remove(WorldEngine.getWorldSectionId(lvl, x, y, z)) != null) {
|
||||
@@ -203,32 +194,14 @@ public class RenderGenerationService {
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
public int getTaskCount() {
|
||||
return this.taskCounter.availablePermits();
|
||||
return this.threads.getJobCount();
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
boolean anyAlive = false;
|
||||
for (var worker : this.workers) {
|
||||
anyAlive |= worker.isAlive();
|
||||
}
|
||||
|
||||
if (!anyAlive) {
|
||||
System.err.println("Render gen workers already dead on shutdown! this is very very bad, check log for errors from this thread");
|
||||
return;
|
||||
}
|
||||
|
||||
//Since this is just render data, dont care about any tasks needing to finish
|
||||
this.running = false;
|
||||
this.taskCounter.release(1000);
|
||||
|
||||
//Wait for thread to join
|
||||
try {
|
||||
for (var worker : this.workers) {
|
||||
worker.join();
|
||||
}
|
||||
} catch (InterruptedException e) {throw new RuntimeException(e);}
|
||||
this.threads.shutdown();
|
||||
|
||||
//Cleanup any remaining data
|
||||
while (!this.taskQueue.isEmpty()) {
|
||||
|
||||
@@ -9,6 +9,7 @@ import me.cortex.voxy.common.storage.config.StorageConfig;
|
||||
import me.cortex.voxy.common.storage.other.CompressionStorageAdaptor;
|
||||
import me.cortex.voxy.common.storage.rocksdb.RocksDBStorageBackend;
|
||||
import me.cortex.voxy.common.world.WorldEngine;
|
||||
import me.cortex.voxy.common.world.thread.ServiceThreadPool;
|
||||
import net.minecraft.client.MinecraftClient;
|
||||
import net.minecraft.client.world.ClientWorld;
|
||||
import net.minecraft.util.WorldSavePath;
|
||||
@@ -96,8 +97,8 @@ public class ContextSelectionSystem {
|
||||
return this.config.storageConfig.build(ctx);
|
||||
}
|
||||
|
||||
public WorldEngine createEngine() {
|
||||
return new WorldEngine(this.createStorageBackend(), VoxyConfig.CONFIG.ingestThreads, VoxyConfig.CONFIG.savingThreads, 5);
|
||||
public WorldEngine createEngine(ServiceThreadPool serviceThreadPool) {
|
||||
return new WorldEngine(this.createStorageBackend(), serviceThreadPool, 5);
|
||||
}
|
||||
|
||||
//Saves the config for the world selection or something, need to figure out how to make it work with dimensional configs maybe?
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package me.cortex.voxy.common.world;
|
||||
|
||||
import me.cortex.voxy.common.storage.StorageCompressor;
|
||||
import me.cortex.voxy.common.voxelization.VoxelizedSection;
|
||||
import me.cortex.voxy.common.world.other.Mapper;
|
||||
import me.cortex.voxy.common.world.service.SectionSavingService;
|
||||
import me.cortex.voxy.common.world.service.VoxelIngestService;
|
||||
import me.cortex.voxy.common.storage.StorageBackend;
|
||||
import me.cortex.voxy.common.world.thread.ServiceThreadPool;
|
||||
import org.lwjgl.system.MemoryUtil;
|
||||
|
||||
import java.util.Arrays;
|
||||
@@ -22,22 +22,21 @@ public class WorldEngine {
|
||||
private Consumer<WorldSection> dirtyCallback;
|
||||
private final int maxMipLevels;
|
||||
|
||||
|
||||
public void setDirtyCallback(Consumer<WorldSection> tracker) {
|
||||
this.dirtyCallback = tracker;
|
||||
}
|
||||
|
||||
public Mapper getMapper() {return this.mapper;}
|
||||
|
||||
public WorldEngine(StorageBackend storageBackend, int ingestWorkers, int savingServiceWorkers, int maxMipLayers) {
|
||||
public WorldEngine(StorageBackend storageBackend, ServiceThreadPool serviceThreadPool, int maxMipLayers) {
|
||||
this.maxMipLevels = maxMipLayers;
|
||||
this.storage = storageBackend;
|
||||
this.mapper = new Mapper(this.storage);
|
||||
//4 cache size bits means that the section tracker has 16 separate maps that it uses
|
||||
this.sectionTracker = new ActiveSectionTracker(3, this::unsafeLoadSection);
|
||||
|
||||
this.savingService = new SectionSavingService(this, savingServiceWorkers);
|
||||
this.ingestService = new VoxelIngestService(this, ingestWorkers);
|
||||
this.savingService = new SectionSavingService(this, serviceThreadPool);
|
||||
this.ingestService = new VoxelIngestService(this, serviceThreadPool);
|
||||
}
|
||||
|
||||
private int unsafeLoadSection(WorldSection into) {
|
||||
|
||||
@@ -1,45 +1,30 @@
|
||||
package me.cortex.voxy.common.world.service;
|
||||
|
||||
import me.cortex.voxy.common.storage.StorageCompressor;
|
||||
import me.cortex.voxy.common.world.SaveLoadSystem;
|
||||
import me.cortex.voxy.common.world.WorldEngine;
|
||||
import me.cortex.voxy.common.world.WorldSection;
|
||||
import me.cortex.voxy.common.world.thread.ServiceSlice;
|
||||
import me.cortex.voxy.common.world.thread.ServiceThreadPool;
|
||||
import net.minecraft.client.MinecraftClient;
|
||||
import net.minecraft.text.Text;
|
||||
import org.lwjgl.system.MemoryUtil;
|
||||
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
//TODO: add an option for having synced saving, that is when call enqueueSave, that will instead, instantly
|
||||
// save to the db, this can be useful for just reducing the amount of thread pools in total
|
||||
// might have some issues with threading if the same section is saved from multiple threads?
|
||||
public class SectionSavingService {
|
||||
private volatile boolean running = true;
|
||||
private final Thread[] workers;
|
||||
|
||||
private final ServiceSlice threads;
|
||||
private final ConcurrentLinkedDeque<WorldSection> saveQueue = new ConcurrentLinkedDeque<>();
|
||||
private final Semaphore saveCounter = new Semaphore(0);
|
||||
|
||||
private final WorldEngine world;
|
||||
|
||||
|
||||
public SectionSavingService(WorldEngine worldEngine, int workers) {
|
||||
this.workers = new Thread[workers];
|
||||
for (int i = 0; i < workers; i++) {
|
||||
var worker = new Thread(this::saveWorker);
|
||||
worker.setDaemon(false);
|
||||
worker.setName("Saving service #" + i);
|
||||
worker.start();
|
||||
this.workers[i] = worker;
|
||||
}
|
||||
public SectionSavingService(WorldEngine worldEngine, ServiceThreadPool threadPool) {
|
||||
this.world = worldEngine;
|
||||
this.threads = threadPool.createService("Section saving service", 100, () -> this::processJob);
|
||||
}
|
||||
|
||||
private void saveWorker() {
|
||||
while (running) {
|
||||
this.saveCounter.acquireUninterruptibly();
|
||||
if (!this.running) break;
|
||||
private void processJob() {
|
||||
var section = this.saveQueue.pop();
|
||||
section.assertNotFree();
|
||||
try {
|
||||
@@ -53,7 +38,6 @@ public class SectionSavingService {
|
||||
}
|
||||
section.release();
|
||||
}
|
||||
}
|
||||
|
||||
public void enqueueSave(WorldSection section) {
|
||||
//If its not enqueued for saving then enqueue it
|
||||
@@ -61,47 +45,25 @@ public class SectionSavingService {
|
||||
//Acquire the section for use
|
||||
section.acquire();
|
||||
this.saveQueue.add(section);
|
||||
this.saveCounter.release();
|
||||
this.threads.execute();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
boolean anyAlive = false;
|
||||
boolean allAlive = true;
|
||||
for (var worker : this.workers) {
|
||||
anyAlive |= worker.isAlive();
|
||||
allAlive &= worker.isAlive();
|
||||
}
|
||||
|
||||
if (!anyAlive) {
|
||||
System.err.println("Section saving workers already dead on shutdown! this is very very bad, check log for errors from this thread");
|
||||
return;
|
||||
}
|
||||
if (!allAlive) {
|
||||
System.err.println("Some section saving works have died, please check log and report errors.");
|
||||
}
|
||||
|
||||
|
||||
int i = 0;
|
||||
//Wait for all the saving to finish
|
||||
while (this.saveCounter.availablePermits() != 0) {
|
||||
try {Thread.sleep(500);} catch (InterruptedException e) {break;}
|
||||
if (i++%10 == 0) {
|
||||
System.out.println("Section saving shutdown has " + this.saveCounter.availablePermits() + " tasks remaining");
|
||||
if (this.threads.getJobCount() != 0) {
|
||||
System.err.println("Voxy section saving still in progress, estimated " + this.threads.getJobCount() + " sections remaining.");
|
||||
while (this.threads.getJobCount() != 0) {
|
||||
Thread.onSpinWait();
|
||||
}
|
||||
}
|
||||
//Shutdown
|
||||
this.running = false;
|
||||
this.saveCounter.release(1000);
|
||||
//Wait for threads to join
|
||||
try {
|
||||
for (var worker : this.workers) {
|
||||
worker.join();
|
||||
this.threads.shutdown();
|
||||
//Manually save any remaining entries
|
||||
while (!this.saveQueue.isEmpty()) {
|
||||
this.processJob();
|
||||
}
|
||||
} catch (InterruptedException e) {throw new RuntimeException(e);}
|
||||
}
|
||||
|
||||
public int getTaskCount() {
|
||||
return this.saveCounter.availablePermits();
|
||||
return this.threads.getJobCount();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,72 +0,0 @@
|
||||
package me.cortex.voxy.common.world.service;
|
||||
|
||||
import me.cortex.voxy.common.world.WorldEngine;
|
||||
import me.cortex.voxy.common.world.WorldSection;
|
||||
import net.minecraft.client.MinecraftClient;
|
||||
import net.minecraft.text.Text;
|
||||
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
//TODO:
|
||||
//FIXME:
|
||||
// FINISHME:
|
||||
// Use this instead of seperate thread pools, use a single shared pool where tasks are submitted to and worked on
|
||||
|
||||
public class ServiceThreadPool {
|
||||
private volatile boolean running = true;
|
||||
private final Thread[] workers;
|
||||
private final Semaphore jobCounter = new Semaphore(0);
|
||||
//TODO: have a wrapper to specify extra information about the job for debugging
|
||||
private final ConcurrentLinkedDeque<Runnable> jobQueue = new ConcurrentLinkedDeque<>();
|
||||
|
||||
|
||||
public ServiceThreadPool(int workers) {
|
||||
this.workers = new Thread[workers];
|
||||
for (int i = 0; i < workers; i++) {
|
||||
var worker = new Thread(this::worker);
|
||||
worker.setDaemon(false);
|
||||
worker.setName("Service worker #" + i);
|
||||
worker.start();
|
||||
this.workers[i] = worker;
|
||||
}
|
||||
}
|
||||
|
||||
private void worker() {
|
||||
while (true) {
|
||||
this.jobCounter.acquireUninterruptibly();
|
||||
if (!this.running) {
|
||||
break;
|
||||
}
|
||||
var job = this.jobQueue.pop();
|
||||
try {
|
||||
job.run();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
MinecraftClient.getInstance().executeSync(()->
|
||||
MinecraftClient.getInstance().player.sendMessage(
|
||||
Text.literal(
|
||||
"Voxy ingester had an exception while executing service job please check logs and report error")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void shutdown() {
|
||||
//Wait for the tasks to finish
|
||||
while (this.jobCounter.availablePermits() != 0) {
|
||||
Thread.onSpinWait();
|
||||
}
|
||||
|
||||
//Shutdown
|
||||
this.running = false;
|
||||
this.jobCounter.release(1000);
|
||||
|
||||
//Wait for thread to join
|
||||
try {
|
||||
for (var worker : this.workers) {
|
||||
worker.join();
|
||||
}
|
||||
} catch (InterruptedException e) {throw new RuntimeException(e);}
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,8 @@ import it.unimi.dsi.fastutil.Pair;
|
||||
import me.cortex.voxy.common.voxelization.VoxelizedSection;
|
||||
import me.cortex.voxy.common.voxelization.WorldConversionFactory;
|
||||
import me.cortex.voxy.common.world.WorldEngine;
|
||||
import me.cortex.voxy.common.world.thread.ServiceSlice;
|
||||
import me.cortex.voxy.common.world.thread.ServiceThreadPool;
|
||||
import net.minecraft.client.MinecraftClient;
|
||||
import net.minecraft.text.Text;
|
||||
import net.minecraft.util.math.ChunkSectionPos;
|
||||
@@ -18,33 +20,17 @@ import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
public class VoxelIngestService {
|
||||
private volatile boolean running = true;
|
||||
private final Thread[] workers;
|
||||
|
||||
private final ServiceSlice threads;
|
||||
private final ConcurrentLinkedDeque<WorldChunk> ingestQueue = new ConcurrentLinkedDeque<>();
|
||||
private final Semaphore ingestCounter = new Semaphore(0);
|
||||
|
||||
private final ConcurrentHashMap<Long, Pair<ChunkNibbleArray, ChunkNibbleArray>> captureLightMap = new ConcurrentHashMap<>(1000,0.75f, 7);
|
||||
|
||||
private final WorldEngine world;
|
||||
public VoxelIngestService(WorldEngine world, int workers) {
|
||||
public VoxelIngestService(WorldEngine world, ServiceThreadPool pool) {
|
||||
this.world = world;
|
||||
|
||||
this.workers = new Thread[workers];
|
||||
for (int i = 0; i < workers; i++) {
|
||||
var worker = new Thread(this::ingestWorker);
|
||||
worker.setDaemon(false);
|
||||
worker.setName("Ingest service #" + i);
|
||||
worker.start();
|
||||
this.workers[i] = worker;
|
||||
}
|
||||
this.threads = pool.createService("Ingest service", 100, ()-> this::processJob);
|
||||
}
|
||||
|
||||
private void ingestWorker() {
|
||||
while (this.running) {
|
||||
this.ingestCounter.acquireUninterruptibly();
|
||||
if (!this.running) break;
|
||||
try {
|
||||
private void processJob() {
|
||||
var chunk = this.ingestQueue.pop();
|
||||
int i = chunk.getBottomSectionCoord() - 1;
|
||||
for (var section : chunk.getSectionArray()) {
|
||||
@@ -78,11 +64,6 @@ public class VoxelIngestService {
|
||||
this.world.insertUpdate(csec);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
MinecraftClient.getInstance().executeSync(()->MinecraftClient.getInstance().player.sendMessage(Text.literal("Voxy ingester had an exception while executing please check logs and report error")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void fetchLightingData(Map<Long, Pair<ChunkNibbleArray, ChunkNibbleArray>> out, WorldChunk chunk) {
|
||||
@@ -118,41 +99,14 @@ public class VoxelIngestService {
|
||||
public void enqueueIngest(WorldChunk chunk) {
|
||||
fetchLightingData(this.captureLightMap, chunk);
|
||||
this.ingestQueue.add(chunk);
|
||||
this.ingestCounter.release();
|
||||
this.threads.execute();
|
||||
}
|
||||
|
||||
public int getTaskCount() {
|
||||
return this.ingestCounter.availablePermits();
|
||||
return this.threads.getJobCount();
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
boolean anyAlive = false;
|
||||
boolean allAlive = true;
|
||||
for (var worker : this.workers) {
|
||||
anyAlive |= worker.isAlive();
|
||||
allAlive &= worker.isAlive();
|
||||
}
|
||||
|
||||
if (!anyAlive) {
|
||||
System.err.println("Ingest workers already dead on shutdown! this is very very bad, check log for errors from this thread");
|
||||
return;
|
||||
}
|
||||
if (!allAlive) {
|
||||
System.err.println("Some ingest workers already dead on shutdown! this is very very bad, check log for errors from this thread");
|
||||
}
|
||||
|
||||
//Wait for the ingest to finish
|
||||
while (this.ingestCounter.availablePermits() != 0) {
|
||||
Thread.onSpinWait();
|
||||
}
|
||||
//Shutdown
|
||||
this.running = false;
|
||||
this.ingestCounter.release(1000);
|
||||
//Wait for thread to join
|
||||
try {
|
||||
for (var worker : this.workers) {
|
||||
worker.join();
|
||||
}
|
||||
} catch (InterruptedException e) {throw new RuntimeException(e);}
|
||||
this.threads.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
package me.cortex.voxy.common.world.thread;
|
||||
|
||||
import me.cortex.voxy.common.util.TrackedObject;
|
||||
import net.minecraft.client.MinecraftClient;
|
||||
import net.minecraft.text.Text;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class ServiceSlice extends TrackedObject {
|
||||
private final String name;
|
||||
final int weightPerJob;
|
||||
private volatile boolean alive = true;
|
||||
private final ServiceThreadPool threadPool;
|
||||
private final Supplier<Runnable> workerGenerator;
|
||||
final Semaphore jobCount = new Semaphore(0);
|
||||
private final Runnable[] runningCtxs;
|
||||
private final AtomicInteger activeCount = new AtomicInteger();
|
||||
|
||||
ServiceSlice(ServiceThreadPool threadPool, Supplier<Runnable> workerGenerator, String name, int weightPerJob) {
|
||||
this.threadPool = threadPool;
|
||||
this.runningCtxs = new Runnable[threadPool.getThreadCount()];
|
||||
this.workerGenerator = workerGenerator;
|
||||
this.name = name;
|
||||
this.weightPerJob = weightPerJob;
|
||||
}
|
||||
|
||||
boolean doRun(int threadIndex) {
|
||||
//Run this thread once if possible
|
||||
if (!this.jobCount.tryAcquire()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!this.alive) {
|
||||
return true;//Return true because we have "consumed" the job (needed to keep weight tracking correct)
|
||||
}
|
||||
|
||||
this.activeCount.incrementAndGet();
|
||||
|
||||
//Check that we are still alive
|
||||
if (!this.alive) {
|
||||
if (this.activeCount.decrementAndGet() < 0) {
|
||||
throw new IllegalStateException("Alive count negative!");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
//If the running context is null, create and set it
|
||||
var ctx = this.runningCtxs[threadIndex];
|
||||
if (ctx == null) {
|
||||
ctx = this.workerGenerator.get();
|
||||
this.runningCtxs[threadIndex] = ctx;
|
||||
}
|
||||
|
||||
//Run the job
|
||||
try {
|
||||
ctx.run();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Unexpected error occurred while executing a service job, expect things to break badly");
|
||||
e.printStackTrace();
|
||||
MinecraftClient.getInstance().execute(()->MinecraftClient.getInstance().player.sendMessage(Text.literal("A voxy service had an exception while executing please check logs and report error")));
|
||||
} finally {
|
||||
if (this.activeCount.decrementAndGet() < 0) {
|
||||
throw new IllegalStateException("Alive count negative!");
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
//Tells the system that a single instance of this service needs executing
|
||||
public void execute() {
|
||||
if (!this.alive) {
|
||||
throw new IllegalStateException("Tried to do work on a dead service");
|
||||
}
|
||||
this.threadPool.execute(this);
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
this.alive = false;
|
||||
|
||||
//Wait till all is finished
|
||||
while (this.activeCount.get() != 0) {
|
||||
Thread.onSpinWait();
|
||||
}
|
||||
|
||||
//Tell parent to remove
|
||||
this.threadPool.removeService(this);
|
||||
|
||||
super.free0();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void free() {
|
||||
this.shutdown();
|
||||
}
|
||||
|
||||
public int getJobCount() {
|
||||
return this.jobCount.availablePermits();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,146 @@
|
||||
package me.cortex.voxy.common.world.thread;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
||||
//TODO: could also probably replace all of this with just VirtualThreads and a Executors.newThreadPerTaskExecutor with a fixed thread pool
|
||||
// it is probably better anyway
|
||||
public class ServiceThreadPool {
|
||||
private volatile boolean running = true;
|
||||
private final Thread[] workers;
|
||||
private final Semaphore jobCounter = new Semaphore(0);
|
||||
|
||||
private volatile ServiceSlice[] serviceSlices = new ServiceSlice[0];
|
||||
private final AtomicLong totalJobWeight = new AtomicLong();
|
||||
|
||||
public ServiceThreadPool(int workers) {
|
||||
this.workers = new Thread[workers];
|
||||
for (int i = 0; i < workers; i++) {
|
||||
int threadId = i;
|
||||
var worker = new Thread(()->this.worker(threadId));
|
||||
worker.setDaemon(false);
|
||||
worker.setName("Service worker #" + i);
|
||||
worker.start();
|
||||
worker.setUncaughtExceptionHandler(this::handleUncaughtException);
|
||||
this.workers[i] = worker;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized ServiceSlice createService(String name, int weight, Supplier<Runnable> workGenerator) {
|
||||
var current = this.serviceSlices;
|
||||
var newList = new ServiceSlice[current.length + 1];
|
||||
System.arraycopy(current, 0, newList, 0, current.length);
|
||||
var service = new ServiceSlice(this, workGenerator, name, weight);
|
||||
newList[current.length] = service;
|
||||
this.serviceSlices = newList;
|
||||
return service;
|
||||
}
|
||||
|
||||
synchronized void removeService(ServiceSlice service) {
|
||||
this.removeServiceFromArray(service);
|
||||
this.totalJobWeight.addAndGet(-((long) service.weightPerJob) * service.jobCount.availablePermits());
|
||||
}
|
||||
|
||||
private synchronized void removeServiceFromArray(ServiceSlice service) {
|
||||
var lst = this.serviceSlices;
|
||||
int idx;
|
||||
for (idx = 0; idx < lst.length; idx++) {
|
||||
if (lst[idx] == service) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (idx == lst.length) {
|
||||
throw new IllegalStateException("Service not in service list");
|
||||
}
|
||||
|
||||
//Remove the slice from the array and set it back
|
||||
|
||||
if (lst.length-1 == 0) {
|
||||
this.serviceSlices = new ServiceSlice[0];
|
||||
return;
|
||||
}
|
||||
|
||||
ServiceSlice[] newArr = new ServiceSlice[lst.length-1];
|
||||
System.arraycopy(lst, 0, newArr, 0, idx);
|
||||
if (lst.length-1 != idx) {
|
||||
//Need to do a second copy
|
||||
System.arraycopy(lst, idx+1, newArr, idx, newArr.length-idx);
|
||||
}
|
||||
this.serviceSlices = newArr;
|
||||
}
|
||||
|
||||
void execute(ServiceSlice service) {
|
||||
this.totalJobWeight.addAndGet(service.weightPerJob);
|
||||
this.jobCounter.release(1);
|
||||
}
|
||||
|
||||
private void worker(int threadId) {
|
||||
long seed = 1234342;
|
||||
while (true) {
|
||||
seed = (seed ^ seed >>> 30) * -4658895280553007687L;
|
||||
seed = (seed ^ seed >>> 27) * -7723592293110705685L;
|
||||
long clamped = seed&((1L<<63)-1);
|
||||
this.jobCounter.acquireUninterruptibly();
|
||||
if (!this.running) {
|
||||
break;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
var ref = this.serviceSlices;
|
||||
long chosenNumber = clamped % this.totalJobWeight.get();
|
||||
ServiceSlice service = ref[(int) (clamped % ref.length)];
|
||||
for (var slice : ref) {
|
||||
chosenNumber -= ((long) slice.weightPerJob) * slice.jobCount.availablePermits();
|
||||
if (chosenNumber <= 0) {
|
||||
service = slice;
|
||||
}
|
||||
}
|
||||
|
||||
//Run the job
|
||||
if (!service.doRun(threadId)) {
|
||||
//Didnt consume the job, find a new job
|
||||
continue;
|
||||
}
|
||||
//Consumed a job from the service, decrease weight by the amount
|
||||
if (this.totalJobWeight.addAndGet(-service.weightPerJob)<0) {
|
||||
throw new IllegalStateException("Total job weight is negative");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleUncaughtException(Thread thread, Throwable throwable) {
|
||||
System.err.println("Service worker thread has exploded unexpectedly! this is really not good very very bad.");
|
||||
throwable.printStackTrace();
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
if (this.serviceSlices.length != 0) {
|
||||
throw new IllegalStateException("All service slices must be shutdown before thread pool can exit");
|
||||
}
|
||||
|
||||
//Wait for the tasks to finish
|
||||
while (this.jobCounter.availablePermits() != 0) {
|
||||
Thread.onSpinWait();
|
||||
}
|
||||
|
||||
//Shutdown
|
||||
this.running = false;
|
||||
this.jobCounter.release(1000);
|
||||
|
||||
//Wait for thread to join
|
||||
try {
|
||||
for (var worker : this.workers) {
|
||||
worker.join();
|
||||
}
|
||||
} catch (InterruptedException e) {throw new RuntimeException(e);}
|
||||
}
|
||||
|
||||
public int getThreadCount() {
|
||||
return this.workers.length;
|
||||
}
|
||||
}
|
||||
@@ -10,7 +10,7 @@
|
||||
"voxy.config.general.ingest": "Chunk Ingest",
|
||||
"voxy.config.general.ingest.tooltip": "Enables or disables voxies ability to convert new chunks into LoDs",
|
||||
"voxy.config.general.quality": "LoD Quality",
|
||||
"voxy.config.general.quality.tooltip": "How far each LoD ring lasts before its downgraded to a lower detail level",
|
||||
"voxy.config.general.quality.tooltip": "How big of an area a section should be on screen before it subdivides (pixels^2)",
|
||||
"voxy.config.general.geometryBuffer": "Geometry Buffer Quads",
|
||||
"voxy.config.general.geometryBuffer.tooltip": "How many quads the geometry buffer can hold",
|
||||
"voxy.config.general.maxSections": "Max Sections",
|
||||
|
||||
Reference in New Issue
Block a user