diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java index edd0fc56f861abbc0603975d0cd12e9944c9d447..46d61503bc5625e6448e60ae7c266aa6e652b2fe 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java @@ -20,19 +20,24 @@ package org.apache.spark.network.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; import io.netty.channel.oio.OioEventLoopGroup; import io.netty.channel.socket.oio.OioSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + class FileClient { private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); - private FileClientHandler handler = null; + private final FileClientHandler handler; private Channel channel = null; private Bootstrap bootstrap = null; - private int connectTimeout = 60*1000; // 1 min + private EventLoopGroup group = null; + private final int connectTimeout; + private final int sendTimeout = 60; // 1 min public FileClient(FileClientHandler handler, int connectTimeout) { this.handler = handler; @@ -40,8 +45,9 @@ class FileClient { } public void init() { + group = new OioEventLoopGroup(); bootstrap = new Bootstrap(); - bootstrap.group(new OioEventLoopGroup()) + bootstrap.group(group) .channel(OioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) @@ -56,6 +62,7 @@ class FileClient { // ChannelFuture cf = channel.closeFuture(); //cf.addListener(new ChannelCloseListener(this)); } catch (InterruptedException e) { + LOG.warn("FileClient interrupted while trying to connect", e); close(); } } @@ -71,16 +78,21 @@ class FileClient { public void sendRequest(String file) { //assert(file == null); //assert(channel == null); - channel.write(file + "\r\n"); + try { + // Should be able to send the message to network link channel. + boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS); + if (!bSent) { + throw new RuntimeException("Failed to send"); + } + } catch (InterruptedException e) { + LOG.error("Error", e); + } } public void close() { - if(channel != null) { - channel.close(); - channel = null; - } - if ( bootstrap!=null) { - bootstrap.shutdown(); + if (group != null) { + group.shutdownGracefully(); + group = null; bootstrap = null; } } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java index 65ee15d63b8548b27275419a2c1507cf0e23847b..fb61be1c12d79df4a504a640537a1396b9b54541 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java @@ -17,15 +17,13 @@ package org.apache.spark.network.netty; -import io.netty.buffer.BufType; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringEncoder; - class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> { - private FileClientHandler fhandler; + private final FileClientHandler fhandler; public FileClientChannelInitializer(FileClientHandler handler) { fhandler = handler; @@ -35,7 +33,7 @@ class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> { public void initChannel(SocketChannel channel) { // file no more than 2G channel.pipeline() - .addLast("encoder", new StringEncoder(BufType.BYTE)) + .addLast("encoder", new StringEncoder()) .addLast("handler", fhandler); } } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java index 8a09210245fff7eccfdd50fa551be2a9d53e9954..63d3d927255f9613f79a2d5894bb337ba83a1d54 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java @@ -19,11 +19,11 @@ package org.apache.spark.network.netty; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundByteHandlerAdapter; +import io.netty.channel.SimpleChannelInboundHandler; import org.apache.spark.storage.BlockId; -abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter { +abstract class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private FileHeader currentHeader = null; @@ -37,13 +37,7 @@ abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter { public abstract void handleError(BlockId blockId); @Override - public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) { - // Use direct buffer if possible. - return ctx.alloc().ioBuffer(); - } - - @Override - public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) { + public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { // get header if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) { currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE())); diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java index a99af348ce78207d156d51855038c1e63b5f3b24..aea75344594f5023bf912b49c74d4c39a181384a 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java @@ -22,13 +22,12 @@ import java.net.InetSocketAddress; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; import io.netty.channel.oio.OioEventLoopGroup; import io.netty.channel.socket.oio.OioServerSocketChannel; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Server that accept the path of a file an echo back its content. */ @@ -36,7 +35,8 @@ class FileServer { private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); - private ServerBootstrap bootstrap = null; + private EventLoopGroup bossGroup = null; + private EventLoopGroup workerGroup = null; private ChannelFuture channelFuture = null; private int port = 0; private Thread blockingThread = null; @@ -45,8 +45,11 @@ class FileServer { InetSocketAddress addr = new InetSocketAddress(port); // Configure the server. - bootstrap = new ServerBootstrap(); - bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup()) + bossGroup = new OioEventLoopGroup(); + workerGroup = new OioEventLoopGroup(); + + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) .channel(OioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .option(ChannelOption.SO_RCVBUF, 1500) @@ -89,13 +92,19 @@ class FileServer { public void stop() { // Close the bound channel. if (channelFuture != null) { - channelFuture.channel().close(); + channelFuture.channel().close().awaitUninterruptibly(); channelFuture = null; } - // Shutdown bootstrap. - if (bootstrap != null) { - bootstrap.shutdown(); - bootstrap = null; + + // Shutdown event groups + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + bossGroup = null; + } + + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + workerGroup = null; } // TODO: Shutdown all accepted channels as well ? } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java index 833af1632de9ca2b87d0b51f5732ae359c4e2d75..3f15ff898fc1e981e169e6c5ca5bb05dd21c7c83 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java @@ -23,7 +23,6 @@ import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; - class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> { PathResolver pResolver; @@ -36,7 +35,7 @@ class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> { public void initChannel(SocketChannel channel) { channel.pipeline() .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())) - .addLast("strDecoder", new StringDecoder()) + .addLast("stringDecoder", new StringDecoder()) .addLast("handler", new FileServerHandler(pResolver)); } } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index 172c6e4b1cce198020a314b4f6fb3062b3dd5cff..e2d9391b4cc9321b8db676b8713bcddfe8c0b63f 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -21,22 +21,26 @@ import java.io.File; import java.io.FileInputStream; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.DefaultFileRegion; import org.apache.spark.storage.BlockId; import org.apache.spark.storage.FileSegment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> { +class FileServerHandler extends SimpleChannelInboundHandler<String> { - PathResolver pResolver; + private Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); + + private final PathResolver pResolver; public FileServerHandler(PathResolver pResolver){ this.pResolver = pResolver; } @Override - public void messageReceived(ChannelHandlerContext ctx, String blockIdString) { + public void channelRead0(ChannelHandlerContext ctx, String blockIdString) { BlockId blockId = BlockId.apply(blockIdString); FileSegment fileSegment = pResolver.getBlockLocation(blockId); // if getBlockLocation returns null, close the channel @@ -60,10 +64,10 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> { int len = new Long(length).intValue(); ctx.write((new FileHeader(len, blockId)).buffer()); try { - ctx.sendFile(new DefaultFileRegion(new FileInputStream(file) + ctx.write(new DefaultFileRegion(new FileInputStream(file) .getChannel(), fileSegment.offset(), fileSegment.length())); } catch (Exception e) { - e.printStackTrace(); + LOG.error("Exception: ", e); } } else { ctx.write(new FileHeader(0, blockId).buffer()); @@ -73,7 +77,7 @@ class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.printStackTrace(); + LOG.error("Exception: ", cause); ctx.close(); } } diff --git a/pom.xml b/pom.xml index 57e843596fe35450f86b471684c4be6e3d42e0f5..0936ae53b4b3ee3284d4133ae40a1488d329442a 100644 --- a/pom.xml +++ b/pom.xml @@ -282,7 +282,7 @@ <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> - <version>4.0.0.CR1</version> + <version>4.0.13.Final</version> </dependency> <dependency> <groupId>org.apache.derby</groupId> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 7bcbd90bd3f9e38f47443b1ab6985455b0365a9b..1df1abc9a399be4274a9262b0eefcabd319e4f80 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -178,7 +178,7 @@ object SparkBuild extends Build { libraryDependencies ++= Seq( - "io.netty" % "netty-all" % "4.0.0.CR1", + "io.netty" % "netty-all" % "4.0.13.Final", "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),