From 8797ee56a8a058d909eeec9da4fc77473680aeb0 Mon Sep 17 00:00:00 2001 From: mcrcortex <18544518+MCRcortex@users.noreply.github.com> Date: Wed, 7 Aug 2024 01:47:22 +1000 Subject: [PATCH] World importer uses service system --- .../me/cortex/voxy/client/core/VoxelCore.java | 11 +- .../voxy/client/importers/WorldImporter.java | 107 +++++++++++------- .../cortex/voxy/common/util/UnsafeUtil.java | 2 +- .../voxy/common/world/SaveLoadSystem.java | 25 ++-- .../common/world/thread/ServiceSlice.java | 27 ++++- .../world/thread/ServiceThreadPool.java | 7 +- 6 files changed, 121 insertions(+), 58 deletions(-) diff --git a/src/main/java/me/cortex/voxy/client/core/VoxelCore.java b/src/main/java/me/cortex/voxy/client/core/VoxelCore.java index 7e1b9f8e..bfc95e75 100644 --- a/src/main/java/me/cortex/voxy/client/core/VoxelCore.java +++ b/src/main/java/me/cortex/voxy/client/core/VoxelCore.java @@ -193,15 +193,18 @@ public class VoxelCore { public boolean createWorldImporter(World mcWorld, File worldPath) { if (this.importer != null) { + this.importer = new WorldImporter(this.world, mcWorld, this.serviceThreadPool); + } + if (this.importer.isBusy()) { return false; } - var importer = new WorldImporter(this.world, mcWorld); + var bossBar = new ClientBossBar(MathHelper.randomUuid(), Text.of("Voxy world importer"), 0.0f, BossBar.Color.GREEN, BossBar.Style.PROGRESS, false, false, false); MinecraftClient.getInstance().inGameHud.getBossBarHud().bossBars.put(bossBar.getUuid(), bossBar); - importer.importWorldAsyncStart(worldPath, 4, (a,b)-> + this.importer.importWorldAsyncStart(worldPath, (a,b)-> MinecraftClient.getInstance().executeSync(()-> { bossBar.setPercent(((float) a)/((float) b)); - bossBar.setName(Text.of("Voxy import: "+ a+"/"+b + " region files")); + bossBar.setName(Text.of("Voxy import: "+ a+"/"+b + " chunks")); }), ()-> { MinecraftClient.getInstance().executeSync(()-> { @@ -210,9 +213,7 @@ public class VoxelCore { MinecraftClient.getInstance().inGameHud.getChatHud().addMessage(Text.literal(msg)); System.err.println(msg); }); - this.importer = null; }); - this.importer = importer; return true; } diff --git a/src/main/java/me/cortex/voxy/client/importers/WorldImporter.java b/src/main/java/me/cortex/voxy/client/importers/WorldImporter.java index f5c239f9..b6a76a2f 100644 --- a/src/main/java/me/cortex/voxy/client/importers/WorldImporter.java +++ b/src/main/java/me/cortex/voxy/client/importers/WorldImporter.java @@ -6,6 +6,8 @@ import me.cortex.voxy.common.voxelization.VoxelizedSection; import me.cortex.voxy.common.voxelization.WorldConversionFactory; import me.cortex.voxy.common.world.WorldEngine; import me.cortex.voxy.common.world.other.Mipper; +import me.cortex.voxy.common.world.thread.ServiceSlice; +import me.cortex.voxy.common.world.thread.ServiceThreadPool; import net.minecraft.block.Block; import net.minecraft.block.BlockState; import net.minecraft.block.Blocks; @@ -28,6 +30,8 @@ import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Objects; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -36,6 +40,7 @@ import java.util.function.Function; import java.util.function.Predicate; public class WorldImporter { + public interface UpdateCallback { void update(int finished, int outof); } @@ -43,12 +48,16 @@ public class WorldImporter { private final WorldEngine world; private final ReadableContainer> defaultBiomeProvider; private final Codec>> biomeCodec; - private final AtomicInteger totalRegions = new AtomicInteger(); - private final AtomicInteger regionsProcessed = new AtomicInteger(); + private final AtomicInteger totalChunks = new AtomicInteger(); + private final AtomicInteger chunksProcessed = new AtomicInteger(); + + private final ConcurrentLinkedDeque jobQueue = new ConcurrentLinkedDeque<>(); + private final ServiceSlice threadPool; private volatile boolean isRunning; - public WorldImporter(WorldEngine worldEngine, World mcWorld) { + public WorldImporter(WorldEngine worldEngine, World mcWorld, ServiceThreadPool servicePool) { this.world = worldEngine; + this.threadPool = servicePool.createService("World importer", 1, ()-> ()->jobQueue.poll().run(), ()->this.world.savingService.getTaskCount() < 4000); var biomeRegistry = mcWorld.getRegistryManager().get(RegistryKeys.BIOME); var defaultBiome = biomeRegistry.entryOf(BiomeKeys.PLAINS); @@ -103,13 +112,17 @@ public class WorldImporter { public void shutdown() { this.isRunning = false; try {this.worker.join();} catch (InterruptedException e) {throw new RuntimeException(e);} + this.threadPool.shutdown(); } - private Thread worker; - public void importWorldAsyncStart(File directory, int threads, UpdateCallback updateCallback, Runnable onCompletion) { + private volatile Thread worker; + private UpdateCallback updateCallback; + public void importWorldAsyncStart(File directory, UpdateCallback updateCallback, Runnable onCompletion) { + this.totalChunks.set(0); + this.chunksProcessed.set(0); + this.updateCallback = updateCallback; this.worker = new Thread(() -> { this.isRunning = true; - var workers = new ForkJoinPool(threads); var files = directory.listFiles(); for (var file : files) { if (!file.isFile()) { @@ -123,29 +136,32 @@ public class WorldImporter { } int rx = Integer.parseInt(sections[1]); int rz = Integer.parseInt(sections[2]); - this.totalRegions.addAndGet(1); - workers.submit(() -> { - try { - if (!this.isRunning) { - return; - } - this.importRegionFile(file.toPath(), rx, rz); - int regionsProcessedCount = this.regionsProcessed.addAndGet(1); - updateCallback.update(regionsProcessedCount, this.totalRegions.get()); - } catch ( - Exception e) { - e.printStackTrace(); - } - }); + try { + this.importRegionFile(file.toPath(), rx, rz); + } catch (IOException e) { + throw new RuntimeException(e); + } + while ((this.totalChunks.get()-this.chunksProcessed.get() > 10_000) && this.isRunning) { + Thread.onSpinWait(); + } + if (!this.isRunning) { + return; + } + } + this.threadPool.blockTillEmpty(); + while (this.chunksProcessed.get() != this.totalChunks.get() && this.isRunning) { + Thread.onSpinWait(); } - workers.shutdown(); - try { - workers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); - } catch (InterruptedException e) {} onCompletion.run(); + this.worker = null; }); this.worker.setName("World importer"); this.worker.start(); + + } + + public boolean isBusy() { + return this.worker != null; } private void importRegionFile(Path file, int x, int z) throws IOException { @@ -171,6 +187,7 @@ public class WorldImporter { var data = MemoryUtil.memAlloc(sectorCount*4096).order(ByteOrder.BIG_ENDIAN); fileStream.read(data, sectorStart*4096L); data.flip(); + boolean addedToQueue = false; { int m = data.getInt(); byte b = data.get(); @@ -188,19 +205,34 @@ public class WorldImporter { } else if (n < 0) { System.err.println("Declared size of chunk is negative"); } else { - try (var decompressedData = this.decompress(b, new ByteBufferBackedInputStream(data))) { - if (decompressedData == null) { - System.err.println("Error decompressing chunk data"); - } else { - var nbt = NbtIo.readCompound(decompressedData); - this.importChunkNBT(nbt); + addedToQueue = true; + this.jobQueue.add(()-> { + if (!this.isRunning) { + return; } - } + try { + try (var decompressedData = this.decompress(b, new ByteBufferBackedInputStream(data))) { + if (decompressedData == null) { + System.err.println("Error decompressing chunk data"); + } else { + var nbt = NbtIo.readCompound(decompressedData); + this.importChunkNBT(nbt); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + MemoryUtil.memFree(data); + } + }); + this.totalChunks.incrementAndGet(); + this.threadPool.execute(); } } } - - MemoryUtil.memFree(data); + if (!addedToQueue) { + MemoryUtil.memFree(data); + } } MemoryUtil.memFree(sectorsSavesBB); @@ -230,6 +262,8 @@ public class WorldImporter { System.err.println("Exception importing world chunk:"); e.printStackTrace(); } + + this.updateCallback.update(this.chunksProcessed.incrementAndGet(), this.totalChunks.get()); } private static int getIndex(int x, int y, int z) { @@ -291,13 +325,6 @@ public class WorldImporter { WorldConversionFactory.mipSection(csec, this.world.getMapper()); this.world.insertUpdate(csec); - while (this.world.savingService.getTaskCount() > 4000) { - try { - Thread.sleep(250); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } } } diff --git a/src/main/java/me/cortex/voxy/common/util/UnsafeUtil.java b/src/main/java/me/cortex/voxy/common/util/UnsafeUtil.java index a0c21875..d71392be 100644 --- a/src/main/java/me/cortex/voxy/common/util/UnsafeUtil.java +++ b/src/main/java/me/cortex/voxy/common/util/UnsafeUtil.java @@ -25,7 +25,7 @@ public class UnsafeUtil { //Copy the entire length of src to the dst memory where dst is a byte array (source length from dst) public static void memcpy(long src, byte[] dst) { - UNSAFE.copyMemory(0, src, dst, BYTE_ARRAY_BASE_OFFSET, dst.length); + UNSAFE.copyMemory(null, src, dst, BYTE_ARRAY_BASE_OFFSET, dst.length); } //Copy the entire length of src to the dst memory where src is a byte array (source length from src) diff --git a/src/main/java/me/cortex/voxy/common/world/SaveLoadSystem.java b/src/main/java/me/cortex/voxy/common/world/SaveLoadSystem.java index b83d6f25..641f4a15 100644 --- a/src/main/java/me/cortex/voxy/common/world/SaveLoadSystem.java +++ b/src/main/java/me/cortex/voxy/common/world/SaveLoadSystem.java @@ -1,5 +1,6 @@ package me.cortex.voxy.common.world; +import it.unimi.dsi.fastutil.longs.Long2ShortFunction; import it.unimi.dsi.fastutil.longs.Long2ShortOpenHashMap; import it.unimi.dsi.fastutil.longs.LongArrayList; import me.cortex.voxy.common.util.MemoryBuffer; @@ -33,28 +34,32 @@ public class SaveLoadSystem { var data = section.copyData(); var compressed = new short[data.length]; Long2ShortOpenHashMap LUT = new Long2ShortOpenHashMap(data.length); - LongArrayList LUTVAL = new LongArrayList(); + LUT.defaultReturnValue((short) -1); + long[] lutValues = new long[32*16*16];//If there are more than this many states in a section... im concerned + short lutIndex = 0; long pHash = 99; for (int i = 0; i < data.length; i++) { long block = data[i]; - short mapping = LUT.computeIfAbsent(block, id->{ - LUTVAL.add(id); - return (short)(LUTVAL.size()-1); - }); + short mapping = LUT.putIfAbsent(block, lutIndex); + if (mapping == -1) { + mapping = lutIndex++; + lutValues[mapping] = block; + } compressed[lin2z(i)] = mapping; pHash *= 127817112311121L; pHash ^= pHash>>31; pHash += 9918322711L; pHash ^= block; } - long[] lut = LUTVAL.toLongArray(); - MemoryBuffer raw = new MemoryBuffer(compressed.length*2L+lut.length*8L+512); + + MemoryBuffer raw = new MemoryBuffer(compressed.length*2L+lutIndex*8L+512); long ptr = raw.address; - long hash = section.key^(lut.length*1293481298141L); + long hash = section.key^(lutIndex*1293481298141L); MemoryUtil.memPutLong(ptr, section.key); ptr += 8; - MemoryUtil.memPutInt(ptr, lut.length); ptr += 8; - for (long id : lut) { + MemoryUtil.memPutInt(ptr, lutIndex); ptr += 4; + for (int i = 0; i < lutIndex; i++) { + long id = lutValues[i]; MemoryUtil.memPutLong(ptr, id); ptr += 8; hash *= 1230987149811L; hash += 12831; diff --git a/src/main/java/me/cortex/voxy/common/world/thread/ServiceSlice.java b/src/main/java/me/cortex/voxy/common/world/thread/ServiceSlice.java index 3facff56..912dea25 100644 --- a/src/main/java/me/cortex/voxy/common/world/thread/ServiceSlice.java +++ b/src/main/java/me/cortex/voxy/common/world/thread/ServiceSlice.java @@ -7,6 +7,7 @@ import net.minecraft.text.Text; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; import java.util.function.Supplier; public class ServiceSlice extends TrackedObject { @@ -18,9 +19,12 @@ public class ServiceSlice extends TrackedObject { final Semaphore jobCount = new Semaphore(0); private final Runnable[] runningCtxs; private final AtomicInteger activeCount = new AtomicInteger(); + private final AtomicInteger jobCount2 = new AtomicInteger(); + private final BooleanSupplier condition; - ServiceSlice(ServiceThreadPool threadPool, Supplier workerGenerator, String name, int weightPerJob) { + ServiceSlice(ServiceThreadPool threadPool, Supplier workerGenerator, String name, int weightPerJob, BooleanSupplier condition) { this.threadPool = threadPool; + this.condition = condition; this.runningCtxs = new Runnable[threadPool.getThreadCount()]; this.workerGenerator = workerGenerator; this.name = name; @@ -28,6 +32,11 @@ public class ServiceSlice extends TrackedObject { } boolean doRun(int threadIndex) { + //If executable + if (!this.condition.getAsBoolean()) { + return false; + } + //Run this thread once if possible if (!this.jobCount.tryAcquire()) { return false; @@ -65,6 +74,7 @@ public class ServiceSlice extends TrackedObject { if (this.activeCount.decrementAndGet() < 0) { throw new IllegalStateException("Alive count negative!"); } + this.jobCount2.decrementAndGet(); } return true; } @@ -75,6 +85,7 @@ public class ServiceSlice extends TrackedObject { throw new IllegalStateException("Tried to do work on a dead service"); } this.jobCount.release(); + this.jobCount2.incrementAndGet(); this.threadPool.execute(this); } @@ -104,4 +115,18 @@ public class ServiceSlice extends TrackedObject { public boolean hasJobs() { return this.jobCount.availablePermits() != 0; } + + public void blockTillEmpty() { + while (this.activeCount.get() != 0 && this.alive) { + while (this.jobCount2.get() != 0 && this.alive) { + Thread.onSpinWait(); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + Thread.yield(); + } + } } diff --git a/src/main/java/me/cortex/voxy/common/world/thread/ServiceThreadPool.java b/src/main/java/me/cortex/voxy/common/world/thread/ServiceThreadPool.java index f8927428..c2e1eac6 100644 --- a/src/main/java/me/cortex/voxy/common/world/thread/ServiceThreadPool.java +++ b/src/main/java/me/cortex/voxy/common/world/thread/ServiceThreadPool.java @@ -3,6 +3,7 @@ package me.cortex.voxy.common.world.thread; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BooleanSupplier; import java.util.function.Supplier; @@ -30,10 +31,14 @@ public class ServiceThreadPool { } public synchronized ServiceSlice createService(String name, int weight, Supplier workGenerator) { + return this.createService(name, weight, workGenerator, ()->true); + } + + public synchronized ServiceSlice createService(String name, int weight, Supplier workGenerator, BooleanSupplier executionCondition) { var current = this.serviceSlices; var newList = new ServiceSlice[current.length + 1]; System.arraycopy(current, 0, newList, 0, current.length); - var service = new ServiceSlice(this, workGenerator, name, weight); + var service = new ServiceSlice(this, workGenerator, name, weight, executionCondition); newList[current.length] = service; this.serviceSlices = newList; return service;