Rewired data imports

This commit is contained in:
mcrcortex
2025-03-23 22:59:54 +10:00
parent 5e5c63d109
commit a992a44f40
4 changed files with 111 additions and 43 deletions

View File

@@ -53,13 +53,14 @@ public class WorldImportWrapper {
} }
public interface IImporterFactory { public interface IImporterFactory {
void create(WorldImporter importer, WorldImporter.UpdateCallback updateCallback, Consumer<Integer> onCompletion); void setup(WorldImporter importer);
} }
public boolean createWorldImporter(World mcWorld, IImporterFactory factory) { public boolean createWorldImporter(World mcWorld, IImporterFactory factory) {
if (this.importer == null) { if (this.importer == null) {
this.importer = new WorldImporter(this.world, mcWorld, this.pool, VoxyCommon.getInstance().getSavingService()); this.importer = new WorldImporter(this.world, mcWorld, this.pool, VoxyCommon.getInstance().getSavingService());
} }
if (this.importer.isBusy()) { if (this.importer.isRunning()) {
return false; return false;
} }
@@ -69,9 +70,13 @@ public class WorldImportWrapper {
this.importerBossBarUUID = MathHelper.randomUuid(); this.importerBossBarUUID = MathHelper.randomUuid();
var bossBar = new ClientBossBar(this.importerBossBarUUID, Text.of("Voxy world importer"), 0.0f, BossBar.Color.GREEN, BossBar.Style.PROGRESS, false, false, false); var bossBar = new ClientBossBar(this.importerBossBarUUID, Text.of("Voxy world importer"), 0.0f, BossBar.Color.GREEN, BossBar.Style.PROGRESS, false, false, false);
MinecraftClient.getInstance().inGameHud.getBossBarHud().bossBars.put(bossBar.getUuid(), bossBar); MinecraftClient.getInstance().inGameHud.getBossBarHud().bossBars.put(bossBar.getUuid(), bossBar);
factory.setup(this.importer);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
long[] ticker = new long[1]; long[] ticker = new long[1];
factory.create(this.importer, (a, b)-> {
this.importer.runImport((a, b)-> {
if (System.currentTimeMillis() - ticker[0] > 50) { if (System.currentTimeMillis() - ticker[0] > 50) {
ticker[0] = System.currentTimeMillis(); ticker[0] = System.currentTimeMillis();
MinecraftClient.getInstance().executeSync(() -> { MinecraftClient.getInstance().executeSync(() -> {
@@ -97,6 +102,7 @@ public class WorldImportWrapper {
} }
}); });
}); });
return true; return true;
} }

View File

@@ -53,7 +53,7 @@ public class WorldImportCommand {
return false; return false;
} }
return instance.importWrapper.createWorldImporter(MinecraftClient.getInstance().player.clientWorld, return instance.importWrapper.createWorldImporter(MinecraftClient.getInstance().player.clientWorld,
(importer, up, done)->importer.importRegionDirectoryAsyncStart(directory, up, done)); (importer)->importer.importRegionDirectoryAsyncStart(directory));
} }
private static int importRaw(CommandContext<FabricClientCommandSource> ctx) { private static int importRaw(CommandContext<FabricClientCommandSource> ctx) {
@@ -140,7 +140,7 @@ public class WorldImportCommand {
} }
String finalInnerDir = innerDir; String finalInnerDir = innerDir;
return instance.importWrapper.createWorldImporter(MinecraftClient.getInstance().player.clientWorld, return instance.importWrapper.createWorldImporter(MinecraftClient.getInstance().player.clientWorld,
(importer, up, done)->importer.importZippedRegionDirectoryAsyncStart(zip, finalInnerDir, up, done))?0:1; (importer)->importer.importZippedRegionDirectoryAsyncStart(zip, finalInnerDir))?0:1;
} }
private static int cancelImport(CommandContext<FabricClientCommandSource> fabricClientCommandSourceCommandContext) { private static int cancelImport(CommandContext<FabricClientCommandSource> fabricClientCommandSourceCommandContext) {

View File

@@ -32,11 +32,11 @@ import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
public class DHImporter { public class DHImporter implements IDataImporter {
private final Connection db; private final Connection db;
private final WorldEngine engine; private final WorldEngine engine;
private final ServiceSlice threadPool; private final ServiceSlice threadPool;
@@ -47,8 +47,16 @@ public class DHImporter {
private final Registry<Biome> biomeRegistry; private final Registry<Biome> biomeRegistry;
private final Registry<Block> blockRegistry; private final Registry<Block> blockRegistry;
private Thread runner; private Thread runner;
private volatile boolean isRunning = false;
private final AtomicInteger processedChunks = new AtomicInteger();
private int totalChunks;
private IUpdateCallback updateCallback;
private record Task(int x, int z, int fmt, int compression){} private record Task(int x, int z, int fmt, int compression) {
public long distanceFromZero() {
return ((long)this.x)*this.x+((long)this.z)*this.z;
}
}
private final ConcurrentLinkedDeque<Task> tasks = new ConcurrentLinkedDeque<>(); private final ConcurrentLinkedDeque<Task> tasks = new ConcurrentLinkedDeque<>();
private record WorkCTX(PreparedStatement stmt, ResettableArrayCache cache, long[] storageCache, byte[] colScratch, VoxelizedSection section) { private record WorkCTX(PreparedStatement stmt, ResettableArrayCache cache, long[] storageCache, byte[] colScratch, VoxelizedSection section) {
public WorkCTX(PreparedStatement stmt, int worldHeight) { public WorkCTX(PreparedStatement stmt, int worldHeight) {
@@ -93,41 +101,64 @@ public class DHImporter {
} }
public void runImport() { public void runImport(IUpdateCallback updateCallback, ICompletionCallback completionCallback) {
if (this.isRunning()) {
throw new IllegalStateException();
}
this.updateCallback = updateCallback;
this.runner = new Thread(()-> { this.runner = new Thread(()-> {
Queue<Task> taskQ = new PriorityQueue<>(Comparator.comparingLong(Task::distanceFromZero));
try (var stmt = this.db.createStatement()) { try (var stmt = this.db.createStatement()) {
var resSet = stmt.executeQuery("SELECT PosX,PosZ,CompressionMode,DataFormatVersion FROM FullData WHERE DetailLevel = 0;"); var resSet = stmt.executeQuery("SELECT PosX,PosZ,CompressionMode,DataFormatVersion FROM FullData WHERE DetailLevel = 0;");
int i = 0;
while (resSet.next()) { while (resSet.next()) {
int x = resSet.getInt(1); int x = resSet.getInt(1);
int z = resSet.getInt(2); int z = resSet.getInt(2);
int compression = resSet.getInt(3); int compression = resSet.getInt(3);
int format = resSet.getInt(4); int format = resSet.getInt(4);
if (format != 1) { if (format != 1) {
Logger.warn("Unknown format mode: " + compression); Logger.warn("Unknown format mode: " + format);
continue; continue;
} }
if (compression != 3) { if (compression != 3) {
Logger.warn("Unknown compression mode: " + compression); Logger.warn("Unknown compression mode: " + compression);
continue; continue;
} }
this.tasks.add(new Task(x, z, format, compression)); taskQ.add(new Task(x, z, format, compression));
this.threadPool.execute();
i++;
while (this.tasks.size() > 100) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
} }
resSet.close(); resSet.close();
Logger.info("Importing " + i + " DH section");
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.totalChunks = taskQ.size() * (4*4);//(since there are 4*4 chunks to every dh section)
while (this.isRunning&&!taskQ.isEmpty()) {
this.tasks.add(taskQ.poll());
this.threadPool.execute();
while (this.tasks.size() > 100 && this.isRunning) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
while (!this.tasks.isEmpty()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
completionCallback.onCompletion(this.processedChunks.get());
this.shutdown();
}); });
this.isRunning = true;
this.runner.setDaemon(true);
this.runner.start(); this.runner.start();
} }
@@ -273,6 +304,9 @@ public class DHImporter {
section.setPosition(X*4+(x>>4), sy+(this.bottomOfWorld>>4), (Z*4)+sz); section.setPosition(X*4+(x>>4), sy+(this.bottomOfWorld>>4), (Z*4)+sz);
this.engine.insertUpdate(section); this.engine.insertUpdate(section);
} }
int count = this.processedChunks.incrementAndGet();
this.updateCallback.onUpdate(count, this.totalChunks);
} }
Arrays.fill(storage, 0); Arrays.fill(storage, 0);
//Process batch //Process batch
@@ -281,6 +315,9 @@ public class DHImporter {
stream.close(); stream.close();
} }
private void importSection(PreparedStatement dataFetchStmt, WorkCTX ctx, Task task) { private void importSection(PreparedStatement dataFetchStmt, WorkCTX ctx, Task task) {
if (!this.isRunning) {
return;
}
try { try {
dataFetchStmt.setInt(1, task.x); dataFetchStmt.setInt(1, task.x);
dataFetchStmt.setInt(2, task.z); dataFetchStmt.setInt(2, task.z);
@@ -296,12 +333,30 @@ public class DHImporter {
} }
public void shutdown() { public void shutdown() {
if (!this.isRunning) {
return;
}
this.isRunning = false;
try {
if (this.runner != Thread.currentThread()) {
this.runner.join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.threadPool.shutdown(); this.threadPool.shutdown();
try { try {
this.db.close(); this.db.close();
} catch (SQLException e) { } catch (SQLException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.updateCallback = null;
this.runner = null;
}
@Override
public boolean isRunning() {
return this.isRunning;
} }
private static VarHandle create(Class<?> viewArrayClass) { private static VarHandle create(Class<?> viewArrayClass) {

View File

@@ -42,12 +42,7 @@ import java.util.function.BooleanSupplier;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate; import java.util.function.Predicate;
public class WorldImporter { public class WorldImporter implements IDataImporter {
public interface UpdateCallback {
void update(int finished, int outof);
}
private final WorldEngine world; private final WorldEngine world;
private final ReadableContainer<RegistryEntry<Biome>> defaultBiomeProvider; private final ReadableContainer<RegistryEntry<Biome>> defaultBiomeProvider;
private final Codec<ReadableContainer<RegistryEntry<Biome>>> biomeCodec; private final Codec<ReadableContainer<RegistryEntry<Biome>>> biomeCodec;
@@ -122,6 +117,17 @@ public class WorldImporter {
} }
@Override
public void runImport(IUpdateCallback updateCallback, ICompletionCallback completionCallback) {
if (this.isRunning || this.worker == null) {
throw new IllegalStateException();
}
this.isRunning = true;
this.updateCallback = updateCallback;
this.completionCallback = completionCallback;
this.worker.start();
}
public void shutdown() { public void shutdown() {
this.isRunning = false; this.isRunning = false;
if (this.worker != null) { if (this.worker != null) {
@@ -141,8 +147,9 @@ public class WorldImporter {
} }
private volatile Thread worker; private volatile Thread worker;
private UpdateCallback updateCallback; private IUpdateCallback updateCallback;
public void importRegionDirectoryAsyncStart(File directory, UpdateCallback updateCallback, Consumer<Integer> onCompletion) { private ICompletionCallback completionCallback;
public void importRegionDirectoryAsyncStart(File directory) {
var files = directory.listFiles((dir, name) -> { var files = directory.listFiles((dir, name) -> {
var sections = name.split("\\."); var sections = name.split("\\.");
if (sections.length != 4 || (!sections[0].equals("r")) || (!sections[3].equals("mca"))) { if (sections.length != 4 || (!sections[0].equals("r")) || (!sections[3].equals("mca"))) {
@@ -152,14 +159,13 @@ public class WorldImporter {
return true; return true;
}); });
if (files == null) { if (files == null) {
onCompletion.accept(0);
return; return;
} }
Arrays.sort(files, File::compareTo); Arrays.sort(files, File::compareTo);
this.importRegionsAsyncStart(files, this::importRegionFile, updateCallback, onCompletion); this.importRegionsAsyncStart(files, this::importRegionFile);
} }
public void importZippedRegionDirectoryAsyncStart(File zip, String innerDirectory, UpdateCallback updateCallback, Consumer<Integer> onCompletion) { public void importZippedRegionDirectoryAsyncStart(File zip, String innerDirectory) {
try { try {
innerDirectory = innerDirectory.replace("\\\\", "\\").replace("\\", "/"); innerDirectory = innerDirectory.replace("\\\\", "\\").replace("\\", "/");
var file = ZipFile.builder().setFile(zip).get(); var file = ZipFile.builder().setFile(zip).get();
@@ -193,20 +199,18 @@ public class WorldImporter {
this.importRegion(buf, Integer.parseInt(sections[1]), Integer.parseInt(sections[2])); this.importRegion(buf, Integer.parseInt(sections[1]), Integer.parseInt(sections[2]));
buf.free(); buf.free();
}, updateCallback, onCompletion); });
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
private <T> void importRegionsAsyncStart(T[] regionFiles, IImporterMethod<T> importer, UpdateCallback updateCallback, Consumer<Integer> onCompletion) { private <T> void importRegionsAsyncStart(T[] regionFiles, IImporterMethod<T> importer) {
this.totalChunks.set(0); this.totalChunks.set(0);
this.estimatedTotalChunks.set(0); this.estimatedTotalChunks.set(0);
this.chunksProcessed.set(0); this.chunksProcessed.set(0);
this.updateCallback = updateCallback;
this.worker = new Thread(() -> { this.worker = new Thread(() -> {
this.isRunning = true;
this.estimatedTotalChunks.addAndGet(regionFiles.length*1024); this.estimatedTotalChunks.addAndGet(regionFiles.length*1024);
for (var file : regionFiles) { for (var file : regionFiles) {
this.estimatedTotalChunks.addAndGet(-1024); this.estimatedTotalChunks.addAndGet(-1024);
@@ -224,7 +228,7 @@ public class WorldImporter {
} }
if (!this.isRunning) { if (!this.isRunning) {
this.threadPool.blockTillEmpty(); this.threadPool.blockTillEmpty();
onCompletion.accept(this.totalChunks.get()); this.completionCallback.onCompletion(this.totalChunks.get());
this.worker = null; this.worker = null;
return; return;
} }
@@ -238,17 +242,20 @@ public class WorldImporter {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
onCompletion.accept(this.totalChunks.get()); this.completionCallback.onCompletion(this.totalChunks.get());
this.threadPool.shutdown(); this.threadPool.shutdown();
this.worker = null; this.worker = null;
}); });
this.worker.setName("World importer"); this.worker.setName("World importer");
this.worker.start();
} }
public boolean isBusy() { public boolean isBusy() {
return this.worker != null; return this.isRunning || this.worker != null;
}
public boolean isRunning() {
return this.isRunning || this.worker != null;
} }
private void importRegionFile(File file) throws IOException { private void importRegionFile(File file) throws IOException {
@@ -406,7 +413,7 @@ public class WorldImporter {
Logger.error("Exception importing world chunk:",e); Logger.error("Exception importing world chunk:",e);
} }
this.updateCallback.update(this.chunksProcessed.incrementAndGet(), this.estimatedTotalChunks.get()); this.updateCallback.onUpdate(this.chunksProcessed.incrementAndGet(), this.estimatedTotalChunks.get());
} }
private static final ThreadLocal<VoxelizedSection> SECTION_CACHE = ThreadLocal.withInitial(VoxelizedSection::createEmpty); private static final ThreadLocal<VoxelizedSection> SECTION_CACHE = ThreadLocal.withInitial(VoxelizedSection::createEmpty);