[Experimental] Add connection multiplexer to allow many listen blocks. Secondary listen blocks are configured in bukkit.yml

By: md_5 <md_5@live.com.au>
This commit is contained in:
Spigot
2013-04-19 17:45:39 +10:00
parent 70056940b6
commit a4b017945b
7 changed files with 247 additions and 194 deletions

View File

@@ -1,35 +1,8 @@
From 751bd6f6c7d40e46a599cd8254cac0ef59ad5132 Mon Sep 17 00:00:00 2001
From d2c3009e1ee527e5d6b990264190370ff50444b2 Mon Sep 17 00:00:00 2001
From: md_5 <md_5@live.com.au>
Date: Thu, 14 Feb 2013 17:32:20 +1100
Date: Fri, 19 Apr 2013 17:44:39 +1000
Subject: [PATCH] Netty
Implement an uber efficient network engine based on the
Java NIO framework Netty. This is basically a complete rewrite of the
Minecraft network engine with many distinct advantages. First and foremost,
there will no longer be the horrid, and redundant case of 2, or even at
times, 3 threads per a connection. Instead low level select/epoll based NIO
is used. The number of threads used for network reading and writing will
scale automatically to the number of cores for use on your server. In most
cases this will be around 8 threads for a 4 core server, much better than the
up to 1000 threads that could be in use at one time with the old engine. To
facilitate asynchronous packet sending or receiving (currently only chat), a
thread pool of 16 threads is kept handy. == Plugin incompatibilities As a
side effect of this change, plugins which rely on very specific
implementation level details within Minecraft are broken. At this point in
time, TagAPI and ProtocolLib are affected. If you are a user of ProtocolLib
you are advised to update to the latest build, where full support is enabled.
If you are a user of TagAPI, support has not yet been added, so you will need
to install the updated ProtocolLib so that TagAPI may use its functions. ==
Stability The code within this commit has been very lightly tested in
production (300 players for approximately 24 hours), however it is not
guaranteed to be free from all bugs. If you experence weird connection
behaviour, reporting the bug and steps to reproduce are advised. You are also
free to downgrade to the latest recommend build, which is guaranteed to be
stable. == Summary This commit provides a reduction in threads, which gives
the CPU / operating system more time to allocate to the main server threads,
as well as various other side benefits such as chat thread pooling and a
slight reduction in latency. This commit is licensed under the Creative
Commons Attribution-ShareAlike 3.0 Unported license.
diff --git a/pom.xml b/pom.xml
index da1a0eb..b8c24af 100644
@@ -48,7 +21,7 @@ index da1a0eb..b8c24af 100644
<!-- This builds a completely 'ready to start' jar with all dependencies inside -->
diff --git a/src/main/java/net/minecraft/server/DedicatedServer.java b/src/main/java/net/minecraft/server/DedicatedServer.java
index bd7e41c..c189b1b 100644
index bd7e41c..93cd0a9 100644
--- a/src/main/java/net/minecraft/server/DedicatedServer.java
+++ b/src/main/java/net/minecraft/server/DedicatedServer.java
@@ -34,7 +34,7 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer
@@ -60,19 +33,28 @@ index bd7e41c..c189b1b 100644
}
protected boolean init() throws java.net.UnknownHostException { // CraftBukkit - throws UnknownHostException
@@ -94,7 +94,11 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer
this.getLogger().info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.G());
@@ -91,10 +91,10 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer
this.getLogger().info("Generating keypair");
this.a(MinecraftEncryption.b());
- this.getLogger().info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.G());
+ this.a((PlayerList) (new DedicatedPlayerList(this))); // CraftBukkit
try {
- this.r = new DedicatedServerConnection(this, inetaddress, this.G());
+ // Spigot start
+ this.r = (!Boolean.getBoolean("org.spigotmc.netty.disabled"))
+ ? new org.spigotmc.netty.NettyServerConnection(this, inetaddress, this.G())
+ : new DedicatedServerConnection(this, inetaddress, this.G());
+ // Spigot end
+ this.r = new org.spigotmc.MultiplexingServerConnection(this); // Spigot
} catch (Throwable ioexception) { // CraftBukkit - IOException -> Throwable
this.getLogger().warning("**** FAILED TO BIND TO PORT!");
this.getLogger().warning("The exception was: {0}", new Object[] { ioexception.toString()});
@@ -102,8 +102,6 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer
return false;
}
- this.a((PlayerList) (new DedicatedPlayerList(this))); // CraftBukkit
-
if (!this.getOnlineMode()) {
this.getLogger().warning("**** SERVER IS RUNNING IN OFFLINE/INSECURE MODE!");
this.getLogger().warning("The server will make no attempt to authenticate usernames. Beware.");
diff --git a/src/main/java/net/minecraft/server/INetworkManager.java b/src/main/java/net/minecraft/server/INetworkManager.java
new file mode 100644
index 0000000..6fcc5d7
@@ -152,7 +134,7 @@ index 9f8afe3..b1d3a17 100644
};
// CraftBukkit end
diff --git a/src/main/java/net/minecraft/server/PendingConnection.java b/src/main/java/net/minecraft/server/PendingConnection.java
index eb474f5..bbf1abd 100644
index eb474f5..71e4739 100644
--- a/src/main/java/net/minecraft/server/PendingConnection.java
+++ b/src/main/java/net/minecraft/server/PendingConnection.java
@@ -17,7 +17,7 @@ public class PendingConnection extends Connection {
@@ -190,80 +172,20 @@ index eb474f5..bbf1abd 100644
// CraftBukkit start - Fix decompile issues, don't create a list from an array
Object[] list = new Object[] { 1, 60, this.server.getVersion(), pingEvent.getMotd(), playerlist.getPlayerCount(), pingEvent.getMaxPlayers() };
@@ -173,8 +178,8 @@ public class PendingConnection extends Connection {
@@ -173,9 +178,11 @@ public class PendingConnection extends Connection {
this.networkManager.queue(new Packet255KickDisconnect(s));
this.networkManager.d();
- if (inetaddress != null && this.server.ae() instanceof DedicatedServerConnection) {
- ((DedicatedServerConnection) this.server.ae()).a(inetaddress);
+ if (inetaddress != null) { // Spigot - removed DedicatedServerConnection instance check
+ this.server.ae().a(inetaddress);
+ // Spigot start
+ if (inetaddress != null) {
+ ((org.spigotmc.MultiplexingServerConnection) this.server.ae()).throttle(inetaddress);
}
+ // Spigot end
this.b = true;
diff --git a/src/main/java/net/minecraft/server/ServerConnection.java b/src/main/java/net/minecraft/server/ServerConnection.java
new file mode 100644
index 0000000..0113ed3
--- /dev/null
+++ b/src/main/java/net/minecraft/server/ServerConnection.java
@@ -0,0 +1,57 @@
+package net.minecraft.server;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class ServerConnection {
+
+ private final MinecraftServer b;
+ private final List c = Collections.synchronizedList(new ArrayList());
+ public volatile boolean a = false;
+
+ public ServerConnection(MinecraftServer minecraftserver) {
+ this.b = minecraftserver;
+ this.a = true;
+ }
+
+ public void a(PlayerConnection playerconnection) {
+ this.c.add(playerconnection);
+ }
+
+ public abstract void a(InetAddress address); // Spigot - make a(InetAddress) a abstract void
+
+ public void a() {
+ this.a = false;
+ }
+
+ public void b() {
+ for (int i = 0; i < this.c.size(); ++i) {
+ PlayerConnection playerconnection = (PlayerConnection) this.c.get(i);
+
+ try {
+ playerconnection.d();
+ } catch (Exception exception) {
+ if (playerconnection.networkManager instanceof MemoryNetworkManager) {
+ CrashReport crashreport = CrashReport.a((Throwable) exception, "Ticking memory connection");
+
+ throw new ReportedException(crashreport);
+ }
+
+ this.b.getLogger().warning("Failed to handle packet for " + playerconnection.player.getLocalizedName() + "/" + playerconnection.player.p() + ": " + exception, (Throwable) exception);
+ playerconnection.disconnect("Internal server error");
+ }
+
+ if (playerconnection.disconnected) {
+ this.c.remove(i--);
+ }
+
+ playerconnection.networkManager.a();
+ }
+ }
+
+ public MinecraftServer d() {
+ return this.b;
+ }
+}
} catch (Exception exception) {
diff --git a/src/main/java/net/minecraft/server/ThreadCommandReader.java b/src/main/java/net/minecraft/server/ThreadCommandReader.java
index 489e184..9533b6f 100644
--- a/src/main/java/net/minecraft/server/ThreadCommandReader.java
@@ -288,6 +210,39 @@ index c185f64..abe0b81 100644
this.server = server;
// CraftBukkit end
this.pendingConnection = pendingconnection;
diff --git a/src/main/java/org/bukkit/craftbukkit/CraftServer.java b/src/main/java/org/bukkit/craftbukkit/CraftServer.java
index 9f2be37..c7a804b 100644
--- a/src/main/java/org/bukkit/craftbukkit/CraftServer.java
+++ b/src/main/java/org/bukkit/craftbukkit/CraftServer.java
@@ -129,6 +129,7 @@ import com.avaje.ebean.config.dbplatform.SQLitePlatform;
import com.avaje.ebeaninternal.server.lib.sql.TransactionIsolation;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.MapMaker;
+import java.net.InetSocketAddress;
import jline.console.ConsoleReader;
@@ -1373,4 +1374,20 @@ public final class CraftServer implements Server {
public CraftScoreboardManager getScoreboardManager() {
return scoreboardManager;
}
+
+ // Spigot start
+ @SuppressWarnings("unchecked")
+ public java.util.Collection<java.net.InetSocketAddress> getSecondaryHosts() {
+ java.util.Collection<java.net.InetSocketAddress> ret = new java.util.HashSet<java.net.InetSocketAddress>();
+ List<?> listeners = configuration.getList("listeners");
+ if (listeners != null) {
+ for (Object o : listeners) {
+
+ Map<String, Object> sect = (Map<String, Object>) o;
+ ret.add(new InetSocketAddress((String) sect.get("address"), (Integer) sect.get("port")));
+ }
+ }
+ return ret;
+ }
+ // Spigot end
}
diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java
index 84dcfcc..a30f217 100644
--- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java
@@ -301,6 +256,138 @@ index 84dcfcc..a30f217 100644
private CraftAsyncDebugger debugHead = new CraftAsyncDebugger(-1, null, null) {@Override StringBuilder debugTo(StringBuilder string) {return string;}};
private CraftAsyncDebugger debugTail = debugHead;
private static final int RECENT_TICKS;
diff --git a/src/main/java/org/spigotmc/MultiplexingServerConnection.java b/src/main/java/org/spigotmc/MultiplexingServerConnection.java
new file mode 100644
index 0000000..7dc2533
--- /dev/null
+++ b/src/main/java/org/spigotmc/MultiplexingServerConnection.java
@@ -0,0 +1,126 @@
+package org.spigotmc;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.logging.Level;
+import net.minecraft.server.DedicatedServerConnection;
+import net.minecraft.server.MinecraftServer;
+import net.minecraft.server.PendingConnection;
+import net.minecraft.server.ServerConnection;
+import org.bukkit.Bukkit;
+
+public class MultiplexingServerConnection extends ServerConnection {
+
+ private static final boolean NETTY_DISABLED = Boolean.getBoolean("org.spigotmc.netty.disabled");
+ private final Collection<ServerConnection> children = new HashSet<ServerConnection>();
+ private final List<PendingConnection> pending = Collections.synchronizedList(new ArrayList<PendingConnection>());
+ private final HashMap<InetAddress, Long> throttle = new HashMap<InetAddress, Long>();
+
+ public MultiplexingServerConnection(MinecraftServer ms) {
+ super(ms);
+
+ // Add primary connection
+ start(ms.server.getIp(), ms.server.getPort());
+ // Add all other connections
+ for (InetSocketAddress address : ms.server.getSecondaryHosts()) {
+ start(address.getHostString(), address.getPort());
+ }
+ }
+
+ private void start(String ipAddress, int port) {
+ try {
+ // Calculate address, can't use isEmpty due to Java 5
+ InetAddress socketAddress = (ipAddress.length() == 0) ? null : InetAddress.getByName(ipAddress);
+ // Say hello to the log
+ d().getLogger().info("Starting listener #" + children.size() + " on " + (socketAddress == null ? "*" : ipAddress) + ":" + port);
+ // Start connection: Netty / non Netty
+ ServerConnection listener = (NETTY_DISABLED) ? new DedicatedServerConnection(d(), socketAddress, port) : new org.spigotmc.netty.NettyServerConnection(d(), socketAddress, port);
+ // Register with other connections
+ children.add(listener);
+ // Gotta catch em all
+ } catch (Throwable t) {
+ // Just print some info to the log
+ t.printStackTrace();
+ d().getLogger().warning("**** FAILED TO BIND TO PORT!");
+ d().getLogger().warning("The exception was: {0}", t);
+ d().getLogger().warning("Perhaps a server is already running on that port?");
+ }
+ }
+
+ /**
+ * close.
+ */
+ @Override
+ public void a() {
+ for (ServerConnection child : children) {
+ child.a();
+ }
+ }
+
+ /**
+ * Pulse. This method pulses all connections causing them to update. It is
+ * called from the main server thread a few times a tick.
+ */
+ @Override
+ public void b() {
+ super.b(); // pulse PlayerConnections
+ for (int i = 0; i < pending.size(); ++i) {
+ PendingConnection connection = pending.get(i);
+
+ try {
+ connection.c();
+ } catch (Exception ex) {
+ connection.disconnect("Internal server error");
+ Bukkit.getServer().getLogger().log(Level.WARNING, "Failed to handle packet: " + ex, ex);
+ }
+
+ if (connection.b) {
+ pending.remove(i--);
+ }
+ }
+ }
+
+ /**
+ * Remove the user from connection throttle. This should fix the server ping
+ * bugs.
+ *
+ * @param address the address to remove
+ */
+ public void a(InetAddress address) {
+ if (address != null) {
+ synchronized (throttle) {
+ throttle.remove(address);
+ }
+ }
+ }
+
+ /**
+ * Add a connection to the throttle list.
+ *
+ * @param address
+ * @return Whether they must be disconnected
+ */
+ public boolean throttle(InetAddress address) {
+ long currentTime = System.currentTimeMillis();
+ synchronized (throttle) {
+ Long value = throttle.get(address);
+ if (value != null && !address.isLoopbackAddress() && currentTime - value < d().server.getConnectionThrottle()) {
+ throttle.put(address, currentTime);
+ return true;
+ }
+
+ throttle.put(address, currentTime);
+ }
+ return false;
+ }
+
+ public void register(PendingConnection conn) {
+ pending.add(conn);
+ }
+}
diff --git a/src/main/java/org/spigotmc/netty/CipherCodec.java b/src/main/java/org/spigotmc/netty/CipherCodec.java
new file mode 100644
index 0000000..2dbbf6c
@@ -376,10 +463,10 @@ index 0000000..2dbbf6c
+}
diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
new file mode 100644
index 0000000..85a6c05
index 0000000..0e1b1fd
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
@@ -0,0 +1,271 @@
@@ -0,0 +1,253 @@
+package org.spigotmc.netty;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -387,13 +474,11 @@ index 0000000..85a6c05
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.security.PrivateKey;
+import java.util.AbstractList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
@@ -408,6 +493,7 @@ index 0000000..85a6c05
+import net.minecraft.server.Packet252KeyResponse;
+import net.minecraft.server.PendingConnection;
+import net.minecraft.server.PlayerConnection;
+import org.spigotmc.MultiplexingServerConnection;
+
+/**
+ * This class forms the basis of the Netty integration. It implements
@@ -419,7 +505,7 @@ index 0000000..85a6c05
+ private static final ExecutorService threadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("Async Packet Handler - %1$d").build());
+ private static final MinecraftServer server = MinecraftServer.getServer();
+ private static final PrivateKey key = server.F().getPrivate();
+ private static final NettyServerConnection serverConnection = (NettyServerConnection) server.ae();
+ private static final MultiplexingServerConnection serverConnection = (MultiplexingServerConnection) server.ae();
+ /*========================================================================*/
+ private final Queue<Packet> syncPackets = new ConcurrentLinkedQueue<Packet>();
+ private final List<Packet> highPriorityQueue = new AbstractList<Packet>() {
@@ -447,32 +533,15 @@ index 0000000..85a6c05
+ private Object[] dcArgs;
+ private Socket socketAdaptor;
+ private long writtenBytes;
+ private long connectionThrottle;
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ // Channel and address groundwork first
+ channel = ctx.channel();
+ address = channel.remoteAddress();
+ // Connection throttler (ported from CraftBukkit)
+ long currentTime = System.currentTimeMillis();
+ InetAddress iaddress = ((InetSocketAddress) channel.remoteAddress()).getAddress();
+
+ if (server == null || server.server == null) {
+ // Check the throttle
+ if (serverConnection.throttle(((InetSocketAddress) channel.remoteAddress()).getAddress())) {
+ channel.close();
+ return;
+ }
+
+ connectionThrottle = server.server.getConnectionThrottle();
+
+ synchronized (serverConnection.throttledConnections) {
+ if (serverConnection.throttledConnections.containsKey(iaddress) && !"127.0.0.1".equals(iaddress.getHostAddress()) && currentTime - (serverConnection.throttledConnections.get(iaddress)).longValue() < connectionThrottle) {
+ serverConnection.throttledConnections.put(iaddress, Long.valueOf(currentTime));
+ channel.close();
+ return;
+ }
+
+ serverConnection.throttledConnections.put(iaddress, Long.valueOf(currentTime));
+ }
+ // Then the socket adaptor
+ socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel);
@@ -480,7 +549,7 @@ index 0000000..85a6c05
+ connection = new PendingConnection(server, this);
+ // Finally register the connection
+ connected = true;
+ serverConnection.pendingConnections.add((PendingConnection) connection);
+ serverConnection.register((PendingConnection) connection);
+ }
+
+ @Override
@@ -653,10 +722,10 @@ index 0000000..85a6c05
+}
diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
new file mode 100644
index 0000000..7809aa9
index 0000000..e5d24f7
--- /dev/null
+++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java
@@ -0,0 +1,126 @@
@@ -0,0 +1,90 @@
+package org.spigotmc.netty;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -693,8 +762,8 @@ index 0000000..7809aa9
+public class NettyServerConnection extends ServerConnection {
+
+ private final ChannelFuture socket;
+ final HashMap<InetAddress, Long> throttledConnections = new HashMap<InetAddress, Long>();
+ final List<PendingConnection> pendingConnections = Collections.synchronizedList(new ArrayList<PendingConnection>());
+
+
+
+ public NettyServerConnection(MinecraftServer ms, InetAddress host, int port) {
+ super(ms);
@@ -719,42 +788,6 @@ index 0000000..7809aa9
+ MinecraftServer.getServer().getLogger().info("Using Netty NIO with " + threads + " threads for network connections.");
+ }
+
+ /**
+ * Pulse. This method pulses all connections causing them to update. It is
+ * called from the main server thread a few times a tick.
+ */
+ @Override
+ public void b() {
+ super.b(); // pulse PlayerConnections
+ for (int i = 0; i < pendingConnections.size(); ++i) {
+ PendingConnection connection = pendingConnections.get(i);
+
+ try {
+ connection.c();
+ } catch (Exception ex) {
+ connection.disconnect("Internal server error");
+ Bukkit.getServer().getLogger().log(Level.WARNING, "Failed to handle packet: " + ex, ex);
+ }
+
+ if (connection.b) {
+ pendingConnections.remove(i--);
+ }
+ }
+ }
+
+ /**
+ * Remove the user from connection throttle. This should fix the server ping bugs.
+ *
+ * @param address the address to remove
+ */
+ @Override
+ public void a(InetAddress address) {
+ if (address != null) {
+ synchronized (throttledConnections) {
+ throttledConnections.remove(address);
+ }
+ }
+ }
+
+ /**
+ * Shutdown. This method is called when the server is shutting down and the
@@ -1291,6 +1324,21 @@ index 0000000..5dc3754
+ */
+ DATA;
+}
diff --git a/src/main/resources/configurations/bukkit.yml b/src/main/resources/configurations/bukkit.yml
index a62ba24..aaf9454 100644
--- a/src/main/resources/configurations/bukkit.yml
+++ b/src/main/resources/configurations/bukkit.yml
@@ -12,7 +12,9 @@
# Twitter: http://twitter.com/Craftbukkit
# Bug tracker: http://leaky.bukkit.org/
-
+#listeners:
+# - address: 127.0.0.1
+# port: 25577
settings:
allow-end: true
warn-on-overload: true
--
1.8.2.1