This commit is contained in:
@@ -18,7 +18,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|||||||
public class DirtyUpdateService {
|
public class DirtyUpdateService {
|
||||||
private final VoxyServerInstance instance;
|
private final VoxyServerInstance instance;
|
||||||
private final ConcurrentLinkedQueue<DirtySection> dirtyQueue = new ConcurrentLinkedQueue<>();
|
private final ConcurrentLinkedQueue<DirtySection> dirtyQueue = new ConcurrentLinkedQueue<>();
|
||||||
private final ConcurrentHashMap<WorldIdentifier, List<DirtySection>> batchedUpdates = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<Long, PendingDirty> pendingSections = new ConcurrentHashMap<>();
|
||||||
|
private static final int MAX_PROCESS_PER_TICK = 1000;
|
||||||
|
private static final long MERGE_WINDOW_MILLIS = 200;
|
||||||
|
|
||||||
public DirtyUpdateService(VoxyServerInstance instance) {
|
public DirtyUpdateService(VoxyServerInstance instance) {
|
||||||
this.instance = instance;
|
this.instance = instance;
|
||||||
@@ -26,13 +28,46 @@ public class DirtyUpdateService {
|
|||||||
|
|
||||||
private record DirtySection(WorldEngine world, WorldSection section, int updateFlags, int neighborMsk) {}
|
private record DirtySection(WorldEngine world, WorldSection section, int updateFlags, int neighborMsk) {}
|
||||||
|
|
||||||
|
private static final class PendingDirty {
|
||||||
|
final WorldEngine world;
|
||||||
|
final WorldSection section;
|
||||||
|
volatile int updateFlags;
|
||||||
|
volatile int neighborMsk;
|
||||||
|
volatile long lastDirtyTime;
|
||||||
|
|
||||||
|
PendingDirty(WorldEngine world, WorldSection section, int updateFlags, int neighborMsk, long time) {
|
||||||
|
this.world = world;
|
||||||
|
this.section = section;
|
||||||
|
this.updateFlags = updateFlags;
|
||||||
|
this.neighborMsk = neighborMsk;
|
||||||
|
this.lastDirtyTime = time;
|
||||||
|
}
|
||||||
|
|
||||||
|
void merge(int flags, int neighbor, long time) {
|
||||||
|
this.updateFlags |= flags;
|
||||||
|
this.neighborMsk |= neighbor;
|
||||||
|
this.lastDirtyTime = time;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void onSectionDirty(WorldEngine world, WorldSection section, int updateFlags, int neighborMsk) {
|
public void onSectionDirty(WorldEngine world, WorldSection section, int updateFlags, int neighborMsk) {
|
||||||
// Filter out sections that are not important for clients or debounce
|
|
||||||
if ((updateFlags & WorldEngine.UPDATE_TYPE_BLOCK_BIT) != 0) {
|
if ((updateFlags & WorldEngine.UPDATE_TYPE_BLOCK_BIT) != 0) {
|
||||||
// We need to keep the section alive until we process it
|
long key = section.key;
|
||||||
section.acquire();
|
long now = System.currentTimeMillis();
|
||||||
this.dirtyQueue.add(new DirtySection(world, section, updateFlags, neighborMsk));
|
PendingDirty existing = this.pendingSections.get(key);
|
||||||
// Logger.info("Section dirty: " + section.lvl + " " + section.x + " " + section.y + " " + section.z);
|
if (existing == null) {
|
||||||
|
section.acquire();
|
||||||
|
PendingDirty created = new PendingDirty(world, section, updateFlags, neighborMsk, now);
|
||||||
|
existing = this.pendingSections.putIfAbsent(key, created);
|
||||||
|
if (existing == null) {
|
||||||
|
this.dirtyQueue.add(new DirtySection(world, section, updateFlags, neighborMsk));
|
||||||
|
} else {
|
||||||
|
existing.merge(updateFlags, neighborMsk, now);
|
||||||
|
section.release();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
existing.merge(updateFlags, neighborMsk, now);
|
||||||
|
}
|
||||||
int lvl = section.lvl;
|
int lvl = section.lvl;
|
||||||
int sx = section.x;
|
int sx = section.x;
|
||||||
int sz = section.z;
|
int sz = section.z;
|
||||||
@@ -48,13 +83,27 @@ public class DirtyUpdateService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void tick() {
|
public void tick() {
|
||||||
// Process queue
|
|
||||||
DirtySection dirty;
|
DirtySection dirty;
|
||||||
int processed = 0;
|
int processed = 0;
|
||||||
// Logger.info("Dirty Queue Size: " + dirtyQueue.size());
|
while ((dirty = this.dirtyQueue.poll()) != null && processed < MAX_PROCESS_PER_TICK) {
|
||||||
while ((dirty = this.dirtyQueue.poll()) != null && processed++ < 1000) {
|
long key = dirty.section.key;
|
||||||
processDirty(dirty);
|
PendingDirty pending = this.pendingSections.get(key);
|
||||||
dirty.section.release();
|
if (pending != null) {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (now - pending.lastDirtyTime < MERGE_WINDOW_MILLIS) {
|
||||||
|
this.dirtyQueue.add(dirty);
|
||||||
|
try {
|
||||||
|
Thread.sleep(1);
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
dirty = new DirtySection(pending.world, pending.section, pending.updateFlags, pending.neighborMsk);
|
||||||
|
this.pendingSections.remove(key);
|
||||||
|
}
|
||||||
|
processDirty(dirty);
|
||||||
|
dirty.section.release();
|
||||||
|
processed++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -121,6 +170,9 @@ public class DirtyUpdateService {
|
|||||||
if (distSq > standardViewDist * standardViewDist && distSq < voxyDistance * voxyDistance) {
|
if (distSq > standardViewDist * standardViewDist && distSq < voxyDistance * voxyDistance) {
|
||||||
int absX = (sx << (lvl + 1)) | dx;
|
int absX = (sx << (lvl + 1)) | dx;
|
||||||
int absZ = (sz << (lvl + 1)) | dz;
|
int absZ = (sz << (lvl + 1)) | dz;
|
||||||
|
if (!matchLevel.getChunkSource().hasChunk(absX, absZ)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
cols.add(new long[]{absX, absZ, Double.doubleToRawLongBits(distSq)});
|
cols.add(new long[]{absX, absZ, Double.doubleToRawLongBits(distSq)});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -140,7 +192,9 @@ public class DirtyUpdateService {
|
|||||||
var voxelized = WorldSectionToVoxelizedConverter.convert(dirty.section, lvl, absX, absY, absZ);
|
var voxelized = WorldSectionToVoxelizedConverter.convert(dirty.section, lvl, absX, absY, absZ);
|
||||||
if (voxelized.lvl0NonAirCount > 0) {
|
if (voxelized.lvl0NonAirCount > 0) {
|
||||||
var payload = VoxyNetwork.LodUpdatePayload.create(voxelized, dirty.world.getMapper());
|
var payload = VoxyNetwork.LodUpdatePayload.create(voxelized, dirty.world.getMapper());
|
||||||
ServerPlayNetworking.send(player, payload);
|
server.execute(() -> {
|
||||||
|
ServerPlayNetworking.send(player, payload);
|
||||||
|
});
|
||||||
perPlayerBudget--;
|
perPlayerBudget--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -77,7 +77,7 @@ public class PlayerLodTracker {
|
|||||||
WorldEngine engine = this.instance.getNullable(wi);
|
WorldEngine engine = this.instance.getNullable(wi);
|
||||||
if (engine == null) return;
|
if (engine == null) return;
|
||||||
|
|
||||||
int standardViewDist = level.getServer().getPlayerList().getViewDistance(); // Chunks
|
int standardViewDist = level.getServer().getPlayerList().getViewDistance();
|
||||||
|
|
||||||
// Use client-negotiated view distance if available (via ConfigSync or just assume Voxy Config)
|
// Use client-negotiated view distance if available (via ConfigSync or just assume Voxy Config)
|
||||||
// Ideally the client sends its requested Voxy distance.
|
// Ideally the client sends its requested Voxy distance.
|
||||||
@@ -90,7 +90,6 @@ public class PlayerLodTracker {
|
|||||||
|
|
||||||
java.util.concurrent.atomic.AtomicInteger budget = new java.util.concurrent.atomic.AtomicInteger(50);
|
java.util.concurrent.atomic.AtomicInteger budget = new java.util.concurrent.atomic.AtomicInteger(50);
|
||||||
state.tracker.process(20, (x, z) -> {
|
state.tracker.process(20, (x, z) -> {
|
||||||
// On Add (Load)
|
|
||||||
double dx = x - state.lastX;
|
double dx = x - state.lastX;
|
||||||
double dz = z - state.lastZ;
|
double dz = z - state.lastZ;
|
||||||
double distSq = dx*dx + dz*dz;
|
double distSq = dx*dx + dz*dz;
|
||||||
@@ -118,7 +117,6 @@ public class PlayerLodTracker {
|
|||||||
// defer processing to global prioritized loop
|
// defer processing to global prioritized loop
|
||||||
}
|
}
|
||||||
}, (x, z) -> {
|
}, (x, z) -> {
|
||||||
// On Remove (Unload)
|
|
||||||
long key = (((long)x) << 32) ^ (z & 0xffffffffL);
|
long key = (((long)x) << 32) ^ (z & 0xffffffffL);
|
||||||
state.sentColumns.remove(key);
|
state.sentColumns.remove(key);
|
||||||
});
|
});
|
||||||
@@ -156,41 +154,7 @@ public class PlayerLodTracker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void sendLodSection(ServerPlayer player, WorldEngine engine, int x, int z, int y) {
|
private void sendLodSection(ServerPlayer player, WorldEngine engine, int x, int z, int y) {
|
||||||
// Decide LOD level based on distance?
|
|
||||||
// For now, let's just send LOD 0 if it exists.
|
|
||||||
// Optimization: Distant things can be higher LOD.
|
|
||||||
|
|
||||||
// Simple heuristic:
|
|
||||||
// 0-64 chunks (1024 blocks): L0
|
|
||||||
// >64 chunks: L1?
|
|
||||||
|
|
||||||
// Actually, Voxy client requests specific LODs usually.
|
|
||||||
// But since we are pushing, we should push what makes sense.
|
|
||||||
// Sending L0 for everything is safe but bandwidth heavy.
|
|
||||||
// Let's stick to L0 for now as a proof of concept to fix the "blank world" issue.
|
|
||||||
|
|
||||||
int lvl = 0;
|
int lvl = 0;
|
||||||
|
|
||||||
// We need to acquire the section from storage.
|
|
||||||
// Warning: acquiring on main thread might lag if not in cache.
|
|
||||||
// We should probably offload this IO?
|
|
||||||
// WorldEngine.acquireIfExists loads from disk if not in memory?
|
|
||||||
// acquireIfExists(..., true) means "only return if loaded in memory"?
|
|
||||||
// No, check WorldEngine code.
|
|
||||||
// acquireIfExists(..., true) -> sectionTracker.acquire(..., true) -> "if (allowLoad) ..."
|
|
||||||
// It seems `acquireIfExists` might still load?
|
|
||||||
// Let's look at WorldEngine.java again.
|
|
||||||
// acquireIfExists calls acquire(..., true).
|
|
||||||
// ActiveSectionTracker.acquire(..., boolean load)
|
|
||||||
// If load is true, it loads.
|
|
||||||
|
|
||||||
// We want to load if it exists in DB, but not generate if missing?
|
|
||||||
// Voxy doesn't really "generate" on demand in the same way, it ingests.
|
|
||||||
// If it's not in DB, it's empty/air.
|
|
||||||
|
|
||||||
// We should use `engine.acquire` but we need to be careful about blocking main thread.
|
|
||||||
// Ideally we schedule a task on the Voxy thread pool to fetch and send.
|
|
||||||
|
|
||||||
this.instance.getThreadPool().execute(() -> {
|
this.instance.getThreadPool().execute(() -> {
|
||||||
try {
|
try {
|
||||||
var sec = engine.acquire(lvl, x >> (lvl + 1), y >> (lvl + 1), z >> (lvl + 1));
|
var sec = engine.acquire(lvl, x >> (lvl + 1), y >> (lvl + 1), z >> (lvl + 1));
|
||||||
@@ -199,7 +163,9 @@ public class PlayerLodTracker {
|
|||||||
var voxelized = WorldSectionToVoxelizedConverter.convert(sec, lvl, x, y, z);
|
var voxelized = WorldSectionToVoxelizedConverter.convert(sec, lvl, x, y, z);
|
||||||
if (voxelized.lvl0NonAirCount > 0) {
|
if (voxelized.lvl0NonAirCount > 0) {
|
||||||
var payload = VoxyNetwork.LodUpdatePayload.create(voxelized, engine.getMapper());
|
var payload = VoxyNetwork.LodUpdatePayload.create(voxelized, engine.getMapper());
|
||||||
ServerPlayNetworking.send(player, payload);
|
this.instance.getServer().execute(() -> {
|
||||||
|
ServerPlayNetworking.send(player, payload);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
sec.release();
|
sec.release();
|
||||||
|
|||||||
@@ -40,12 +40,6 @@ public class VoxyServerInstance extends VoxyInstance {
|
|||||||
me.cortex.voxy.common.world.WorldSection.setArrayReuseCacheTarget(VoxyServerConfig.CONFIG.reuseCacheMax);
|
me.cortex.voxy.common.world.WorldSection.setArrayReuseCacheTarget(VoxyServerConfig.CONFIG.reuseCacheMax);
|
||||||
this.threadPool.serviceManager.createServiceNoCleanup(() -> me.cortex.voxy.common.world.WorldSection::trimReuseCacheToTarget, 60000, "ReuseCacheTrim");
|
this.threadPool.serviceManager.createServiceNoCleanup(() -> me.cortex.voxy.common.world.WorldSection::trimReuseCacheToTarget, 60000, "ReuseCacheTrim");
|
||||||
|
|
||||||
// Start a service to tick the dirty service
|
|
||||||
this.threadPool.serviceManager.createServiceNoCleanup(() -> this.dirtyUpdateService::tick, VoxyServerConfig.CONFIG.dirtyUpdateDelay * 50, "DirtyUpdateService");
|
|
||||||
|
|
||||||
// Start service for player tracker (run every tick or so)
|
|
||||||
this.threadPool.serviceManager.createServiceNoCleanup(() -> this.playerLodTracker::tick, 50, "PlayerLodTracker");
|
|
||||||
|
|
||||||
// Register join handler to sync config and init tracker
|
// Register join handler to sync config and init tracker
|
||||||
net.fabricmc.fabric.api.networking.v1.ServerPlayConnectionEvents.JOIN.register((handler, sender, s) -> {
|
net.fabricmc.fabric.api.networking.v1.ServerPlayConnectionEvents.JOIN.register((handler, sender, s) -> {
|
||||||
var payload = new me.cortex.voxy.common.network.VoxyNetwork.ConfigSyncPayload(VoxyServerConfig.CONFIG.viewDistance);
|
var payload = new me.cortex.voxy.common.network.VoxyNetwork.ConfigSyncPayload(VoxyServerConfig.CONFIG.viewDistance);
|
||||||
|
|||||||
Reference in New Issue
Block a user