World importer uses service system

This commit is contained in:
mcrcortex
2024-08-07 01:47:22 +10:00
parent 6eed71656d
commit 8797ee56a8
6 changed files with 121 additions and 58 deletions

View File

@@ -193,15 +193,18 @@ public class VoxelCore {
public boolean createWorldImporter(World mcWorld, File worldPath) {
if (this.importer != null) {
this.importer = new WorldImporter(this.world, mcWorld, this.serviceThreadPool);
}
if (this.importer.isBusy()) {
return false;
}
var importer = new WorldImporter(this.world, mcWorld);
var bossBar = new ClientBossBar(MathHelper.randomUuid(), Text.of("Voxy world importer"), 0.0f, BossBar.Color.GREEN, BossBar.Style.PROGRESS, false, false, false);
MinecraftClient.getInstance().inGameHud.getBossBarHud().bossBars.put(bossBar.getUuid(), bossBar);
importer.importWorldAsyncStart(worldPath, 4, (a,b)->
this.importer.importWorldAsyncStart(worldPath, (a,b)->
MinecraftClient.getInstance().executeSync(()-> {
bossBar.setPercent(((float) a)/((float) b));
bossBar.setName(Text.of("Voxy import: "+ a+"/"+b + " region files"));
bossBar.setName(Text.of("Voxy import: "+ a+"/"+b + " chunks"));
}),
()-> {
MinecraftClient.getInstance().executeSync(()-> {
@@ -210,9 +213,7 @@ public class VoxelCore {
MinecraftClient.getInstance().inGameHud.getChatHud().addMessage(Text.literal(msg));
System.err.println(msg);
});
this.importer = null;
});
this.importer = importer;
return true;
}

View File

@@ -6,6 +6,8 @@ import me.cortex.voxy.common.voxelization.VoxelizedSection;
import me.cortex.voxy.common.voxelization.WorldConversionFactory;
import me.cortex.voxy.common.world.WorldEngine;
import me.cortex.voxy.common.world.other.Mipper;
import me.cortex.voxy.common.world.thread.ServiceSlice;
import me.cortex.voxy.common.world.thread.ServiceThreadPool;
import net.minecraft.block.Block;
import net.minecraft.block.BlockState;
import net.minecraft.block.Blocks;
@@ -28,6 +30,8 @@ import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -36,6 +40,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
public class WorldImporter {
public interface UpdateCallback {
void update(int finished, int outof);
}
@@ -43,12 +48,16 @@ public class WorldImporter {
private final WorldEngine world;
private final ReadableContainer<RegistryEntry<Biome>> defaultBiomeProvider;
private final Codec<ReadableContainer<RegistryEntry<Biome>>> biomeCodec;
private final AtomicInteger totalRegions = new AtomicInteger();
private final AtomicInteger regionsProcessed = new AtomicInteger();
private final AtomicInteger totalChunks = new AtomicInteger();
private final AtomicInteger chunksProcessed = new AtomicInteger();
private final ConcurrentLinkedDeque<Runnable> jobQueue = new ConcurrentLinkedDeque<>();
private final ServiceSlice threadPool;
private volatile boolean isRunning;
public WorldImporter(WorldEngine worldEngine, World mcWorld) {
public WorldImporter(WorldEngine worldEngine, World mcWorld, ServiceThreadPool servicePool) {
this.world = worldEngine;
this.threadPool = servicePool.createService("World importer", 1, ()-> ()->jobQueue.poll().run(), ()->this.world.savingService.getTaskCount() < 4000);
var biomeRegistry = mcWorld.getRegistryManager().get(RegistryKeys.BIOME);
var defaultBiome = biomeRegistry.entryOf(BiomeKeys.PLAINS);
@@ -103,13 +112,17 @@ public class WorldImporter {
public void shutdown() {
this.isRunning = false;
try {this.worker.join();} catch (InterruptedException e) {throw new RuntimeException(e);}
this.threadPool.shutdown();
}
private Thread worker;
public void importWorldAsyncStart(File directory, int threads, UpdateCallback updateCallback, Runnable onCompletion) {
private volatile Thread worker;
private UpdateCallback updateCallback;
public void importWorldAsyncStart(File directory, UpdateCallback updateCallback, Runnable onCompletion) {
this.totalChunks.set(0);
this.chunksProcessed.set(0);
this.updateCallback = updateCallback;
this.worker = new Thread(() -> {
this.isRunning = true;
var workers = new ForkJoinPool(threads);
var files = directory.listFiles();
for (var file : files) {
if (!file.isFile()) {
@@ -123,29 +136,32 @@ public class WorldImporter {
}
int rx = Integer.parseInt(sections[1]);
int rz = Integer.parseInt(sections[2]);
this.totalRegions.addAndGet(1);
workers.submit(() -> {
try {
this.importRegionFile(file.toPath(), rx, rz);
} catch (IOException e) {
throw new RuntimeException(e);
}
while ((this.totalChunks.get()-this.chunksProcessed.get() > 10_000) && this.isRunning) {
Thread.onSpinWait();
}
if (!this.isRunning) {
return;
}
this.importRegionFile(file.toPath(), rx, rz);
int regionsProcessedCount = this.regionsProcessed.addAndGet(1);
updateCallback.update(regionsProcessedCount, this.totalRegions.get());
} catch (
Exception e) {
e.printStackTrace();
}
});
this.threadPool.blockTillEmpty();
while (this.chunksProcessed.get() != this.totalChunks.get() && this.isRunning) {
Thread.onSpinWait();
}
workers.shutdown();
try {
workers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {}
onCompletion.run();
this.worker = null;
});
this.worker.setName("World importer");
this.worker.start();
}
public boolean isBusy() {
return this.worker != null;
}
private void importRegionFile(Path file, int x, int z) throws IOException {
@@ -171,6 +187,7 @@ public class WorldImporter {
var data = MemoryUtil.memAlloc(sectorCount*4096).order(ByteOrder.BIG_ENDIAN);
fileStream.read(data, sectorStart*4096L);
data.flip();
boolean addedToQueue = false;
{
int m = data.getInt();
byte b = data.get();
@@ -188,6 +205,12 @@ public class WorldImporter {
} else if (n < 0) {
System.err.println("Declared size of chunk is negative");
} else {
addedToQueue = true;
this.jobQueue.add(()-> {
if (!this.isRunning) {
return;
}
try {
try (var decompressedData = this.decompress(b, new ByteBufferBackedInputStream(data))) {
if (decompressedData == null) {
System.err.println("Error decompressing chunk data");
@@ -196,12 +219,21 @@ public class WorldImporter {
this.importChunkNBT(nbt);
}
}
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
MemoryUtil.memFree(data);
}
});
this.totalChunks.incrementAndGet();
this.threadPool.execute();
}
}
}
if (!addedToQueue) {
MemoryUtil.memFree(data);
}
}
MemoryUtil.memFree(sectorsSavesBB);
}
@@ -230,6 +262,8 @@ public class WorldImporter {
System.err.println("Exception importing world chunk:");
e.printStackTrace();
}
this.updateCallback.update(this.chunksProcessed.incrementAndGet(), this.totalChunks.get());
}
private static int getIndex(int x, int y, int z) {
@@ -291,13 +325,6 @@ public class WorldImporter {
WorldConversionFactory.mipSection(csec, this.world.getMapper());
this.world.insertUpdate(csec);
while (this.world.savingService.getTaskCount() > 4000) {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@@ -25,7 +25,7 @@ public class UnsafeUtil {
//Copy the entire length of src to the dst memory where dst is a byte array (source length from dst)
public static void memcpy(long src, byte[] dst) {
UNSAFE.copyMemory(0, src, dst, BYTE_ARRAY_BASE_OFFSET, dst.length);
UNSAFE.copyMemory(null, src, dst, BYTE_ARRAY_BASE_OFFSET, dst.length);
}
//Copy the entire length of src to the dst memory where src is a byte array (source length from src)

View File

@@ -1,5 +1,6 @@
package me.cortex.voxy.common.world;
import it.unimi.dsi.fastutil.longs.Long2ShortFunction;
import it.unimi.dsi.fastutil.longs.Long2ShortOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import me.cortex.voxy.common.util.MemoryBuffer;
@@ -33,28 +34,32 @@ public class SaveLoadSystem {
var data = section.copyData();
var compressed = new short[data.length];
Long2ShortOpenHashMap LUT = new Long2ShortOpenHashMap(data.length);
LongArrayList LUTVAL = new LongArrayList();
LUT.defaultReturnValue((short) -1);
long[] lutValues = new long[32*16*16];//If there are more than this many states in a section... im concerned
short lutIndex = 0;
long pHash = 99;
for (int i = 0; i < data.length; i++) {
long block = data[i];
short mapping = LUT.computeIfAbsent(block, id->{
LUTVAL.add(id);
return (short)(LUTVAL.size()-1);
});
short mapping = LUT.putIfAbsent(block, lutIndex);
if (mapping == -1) {
mapping = lutIndex++;
lutValues[mapping] = block;
}
compressed[lin2z(i)] = mapping;
pHash *= 127817112311121L;
pHash ^= pHash>>31;
pHash += 9918322711L;
pHash ^= block;
}
long[] lut = LUTVAL.toLongArray();
MemoryBuffer raw = new MemoryBuffer(compressed.length*2L+lut.length*8L+512);
MemoryBuffer raw = new MemoryBuffer(compressed.length*2L+lutIndex*8L+512);
long ptr = raw.address;
long hash = section.key^(lut.length*1293481298141L);
long hash = section.key^(lutIndex*1293481298141L);
MemoryUtil.memPutLong(ptr, section.key); ptr += 8;
MemoryUtil.memPutInt(ptr, lut.length); ptr += 8;
for (long id : lut) {
MemoryUtil.memPutInt(ptr, lutIndex); ptr += 4;
for (int i = 0; i < lutIndex; i++) {
long id = lutValues[i];
MemoryUtil.memPutLong(ptr, id); ptr += 8;
hash *= 1230987149811L;
hash += 12831;

View File

@@ -7,6 +7,7 @@ import net.minecraft.text.Text;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
public class ServiceSlice extends TrackedObject {
@@ -18,9 +19,12 @@ public class ServiceSlice extends TrackedObject {
final Semaphore jobCount = new Semaphore(0);
private final Runnable[] runningCtxs;
private final AtomicInteger activeCount = new AtomicInteger();
private final AtomicInteger jobCount2 = new AtomicInteger();
private final BooleanSupplier condition;
ServiceSlice(ServiceThreadPool threadPool, Supplier<Runnable> workerGenerator, String name, int weightPerJob) {
ServiceSlice(ServiceThreadPool threadPool, Supplier<Runnable> workerGenerator, String name, int weightPerJob, BooleanSupplier condition) {
this.threadPool = threadPool;
this.condition = condition;
this.runningCtxs = new Runnable[threadPool.getThreadCount()];
this.workerGenerator = workerGenerator;
this.name = name;
@@ -28,6 +32,11 @@ public class ServiceSlice extends TrackedObject {
}
boolean doRun(int threadIndex) {
//If executable
if (!this.condition.getAsBoolean()) {
return false;
}
//Run this thread once if possible
if (!this.jobCount.tryAcquire()) {
return false;
@@ -65,6 +74,7 @@ public class ServiceSlice extends TrackedObject {
if (this.activeCount.decrementAndGet() < 0) {
throw new IllegalStateException("Alive count negative!");
}
this.jobCount2.decrementAndGet();
}
return true;
}
@@ -75,6 +85,7 @@ public class ServiceSlice extends TrackedObject {
throw new IllegalStateException("Tried to do work on a dead service");
}
this.jobCount.release();
this.jobCount2.incrementAndGet();
this.threadPool.execute(this);
}
@@ -104,4 +115,18 @@ public class ServiceSlice extends TrackedObject {
public boolean hasJobs() {
return this.jobCount.availablePermits() != 0;
}
public void blockTillEmpty() {
while (this.activeCount.get() != 0 && this.alive) {
while (this.jobCount2.get() != 0 && this.alive) {
Thread.onSpinWait();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Thread.yield();
}
}
}

View File

@@ -3,6 +3,7 @@ package me.cortex.voxy.common.world.thread;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
@@ -30,10 +31,14 @@ public class ServiceThreadPool {
}
public synchronized ServiceSlice createService(String name, int weight, Supplier<Runnable> workGenerator) {
return this.createService(name, weight, workGenerator, ()->true);
}
public synchronized ServiceSlice createService(String name, int weight, Supplier<Runnable> workGenerator, BooleanSupplier executionCondition) {
var current = this.serviceSlices;
var newList = new ServiceSlice[current.length + 1];
System.arraycopy(current, 0, newList, 0, current.length);
var service = new ServiceSlice(this, workGenerator, name, weight);
var service = new ServiceSlice(this, workGenerator, name, weight, executionCondition);
newList[current.length] = service;
this.serviceSlices = newList;
return service;