2

I am new to Netty and use version 4. In my project, a server returns client a Java object, which can be large. I started by using ObjectEncoder/Decoder and NioSocketChannel for this purpose. Though it works, the performance is significantly worse than it is with old blocking IO. Thread dumps show that ObjectEncoder reallocates direct buffers all the time. My guess is it is serializing the whole object in the direct buffer and only then sends it over the network. This is slow and may cause OutOfMemoryError if there are multiple requests like this running simultaneously. What is your suggestion for an efficient implementation, which would be fast and use a limited size buffer? Also, some (but not all) of the objects, which the server returns, contain a long byte array field. Can this fact be used to further improve the performance?

As @MattBakaitis asked, I am pasting the code sample, which is a slight modification of the ObjectEchoServer example. It sends a constant large object back to the client in response to a message received.

public final class MyObjectEchoServer {

    static final int PORT = Integer.parseInt(System.getProperty("port", "11000"));

    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    p.addLast(
                            new ObjectEncoder(),
                            new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)),
                            new ObjectEchoServerHandler());
                }
             });

            // Bind and start to accept incoming connections.
            b.bind(PORT).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

public class ObjectEchoServerHandler extends ChannelInboundHandlerAdapter {
    public static class Response implements Serializable {
        public byte[] bytes;
    }

    private static Response response;

    static {
        int len = 256 * 1024 * 1024;
        response = new Response();
        response.bytes = new byte[len];
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("Received: msg=" + msg);
        // Echo back the received object to the client.
        System.out.println("Sending response. length: " + response.bytes.length);
        ctx.write(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        System.out.println("Flushing");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

It works with no errors if JVM has enough memory, however it is slow and throws a Direct buffer OutOfMemeoryError if multiple clients are running or the response object is too large. I did multiple thread dumps and they always like the one I pasted below and show that ObjectEncoder writes a response object in a direct buffer and constantly resizes this buffer as the response is large. Therefore, I think that this kind of straight forward implementation is not efficient and looking for an advice what would be the efficient approach.

Thread stack I mentioned:

    "nioEventLoopGroup-3-1" prio=10 tid=0x000000000bf88800 nid=0x205c runnable [0x000000000cb5e000]
   java.lang.Thread.State: RUNNABLE
        at sun.misc.Unsafe.copyMemory(Native Method)
        at sun.misc.Unsafe.copyMemory(Unsafe.java:560)
        at java.nio.DirectByteBuffer.put(DirectByteBuffer.java:326)
        at io.netty.buffer.UnpooledUnsafeDirectByteBuf.capacity(UnpooledUnsafeDirectByteBuf.java:160)
        at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:818)
        at io.netty.buffer.ByteBufOutputStream.write(ByteBufOutputStream.java:66)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
        at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1847)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1333)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
        at io.netty.handler.codec.serialization.ObjectEncoder.encode(ObjectEncoder.java:47)
        at io.netty.handler.codec.serialization.ObjectEncoder.encode(ObjectEncoder.java:36)
        at io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:111)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:657)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:715)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:650)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:636)
        at io.netty.example.objectecho.ObjectEchoServerHandler.channelRead(ObjectEchoServerHandler.java:46)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:332)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:318)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:332)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:318)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:125)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:507)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:464)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:378)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:350)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at java.lang.Thread.run(Thread.java:745)
2
  • You will get better results and answers if you can include the code or configuration you think is involved in causing the errors. This question is pretty broad and may be hard to answer as-written. Commented Aug 13, 2014 at 20:34
  • @MattBakaitis, I updated the post with code example and thread stack. Commented Aug 14, 2014 at 15:46

2 Answers 2

1

If you are writing a large object, as you mentioned in the question, it is possible that multiple memory copies happen to expand the output buffer. To fix that problem, you can override the allocateBuffer() method in ObjectEncoder (MessageToByteEncoder to be correct), and allocate a buffer with higher initial capacity. For example:

@Override
protected ByteBuf allocateBuffer(
        ChannelHandlerContext ctx, Object msg, boolean preferDirect) {

    return ctx.alloc().heapBuffer(1048576);
}

To reduce the number of memory copies even more, I'd recommend using a direct buffer (i.e. ctx.alloc().directBuffer(1048576)) and using the PooledByteBufAllocator.

However, this will not solve your concern about getting OutOfMemoryError under load due to many peers exchanging a large object. Java object serialization was never implemented to work well with a non-blocking connection; it always assumes that the stream has data. Without re-implementing object serialization, it is not possible without keeping the whole object in a buffer. Actually this applies to other object serialization implementations that use only InputStream and OutputStream.

Alternatively, you could implement an alternative protocol for exchanging a large stream, and reduce the size of the object by using some references to the stream.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks @trustin. allocateBuffer is a new method in 4.0.22. I was using 4.0.20 where this code was buried inside write method. I understand now why serialization of a huge object works faster with ObjectOutputStream than with AbstractByteBuf. ObjectOutputStream doubles the size of an underlying array every time it needs to grow while AbstractByteBuf takes more memory-cautious approach and increases the size by 4MB if the old size is over 4MB threshold. This leads to a larger number of memory copies in case of a huge object.
1

Couple of years back I had a similar problem and then I started to review the Objects being sent through wire. I found that most of the instance variables are primitive and only 5% are another object. I tried with JAVA serialization and extracted the byte[] of these objects and then wrap it to ByteBuf and sent to wire. Sample code could be:

try {

        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(obj);
        oos.close();
        return bos.toByteArray();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return null;

And then to de-serialize:

ByteArrayInputStream ins = new ByteArrayInputStream(b);
    ObjectInputStream ois = new ObjectInputStream(ins);
    return ois.readObject();

Average object size was ~800 bytes and 1GBPS link were used.This will work fine for some thousands of transactions per seconds. Now if you want the pace to increase to some lacs per seconds you need to put some effort and serialize these objects by your own - excited!

Here is a sample example to do so:

My object ->

public class MyObject{

private int i;
private String key;

private AnotherObject obj;

private boolean toUpdate;

public MyObject(int i,String key, AnotherObject obj, boolean toUpdate) {
    this.i=i;
    this.key = key;
    this.obj= obj;
    this.toUpdate = toUpdate;
}

/**
 * Decode in order: 
  int i
  String key 
  AnotherObject obj 
  boolean toUpdate
 * 
 */

public Object decodeAll(ByteBuf b) {
    this.i = readInt(b);
    this.key = readString(b);
    if (b.readableBytes() > 1) {
        AnotherObject obj= new AnotherObject ();
        this.value = AnotherObject .decodeAll(b);
    }
    byte[] bool = new byte[1];
    b.readBytes(bool);
    this.toUpdate = decodeBoolean(bool);
    return this;
}

/**
 * Encode in order: 
  int i
  String key 
  AnotherObject obj 
  boolean toUpdate
 * 
 */


public ByteBuf encodeAll() {
    ByteBuf buffer = allocator.buffer();
    //First the int
    writeInt(buffer, this.i);
   // String key
    writeString(buffer, this.key);
    // AnotherObject 
    if (this.value != null) {
        ByteBuf rel = this.value.encodeAll();
        buffer.writeBytes(rel);
        rel.release();
    }
    // boolean toUpdate
    buffer.writeBytes(encodeBoolean(this.toUpdate));
    return buffer;
}

}

Here I used sun.misc.Unsafe APIs to serialize/deserialize :

protected byte[] encodeBoolean(final boolean value) {
    byte[] b = new byte[1];
    unsafe.putBoolean(b, byteArrayOffset, value);
    return b;
}

public static boolean decodeBoolean(final byte[] b) {
    boolean value = unsafe.getBoolean(b, byteArrayOffset);

    return value;
}
protected byte[] encodeInt(final int value) {
    byte[] b = new byte[4];
    unsafe.putInt(b, byteArrayOffset, value);

    return b;

}
protected void writeString(ByteBuf buffer, String data) {
    if (data == null) {
        data = new String();
    }
    // For String , first write the length as integer and then actual data
    buffer.writeBytes(encodeInt(data.getBytes().length));
    buffer.writeBytes(data.getBytes());
}
public static String readString(ByteBuf data) {
    byte[] temp = new byte[4];
    data.readBytes(temp);
    int len = decodeInt(temp);
    if (len == 0) {
        return null;
    } else {
        temp = new byte[len];
        data.readBytes(temp);
        return new String(temp);
    }
}

Likewise you can encode/decode any of java data structure as well. i.e.

protected void writeArrayList(ByteBuf buffer, ArrayList<String> data) {
    if (data == null) {
        data = new ArrayList<String>();
    }
    // Write the number of elements and then all elements one-by-one
    buffer.writeBytes(encodeInt(data.size()));
    for (String str : data) {
        writeString(buffer, str);
    }
}

protected ArrayList<String> readArrayList(ByteBuf data) {
    byte[] temp = new byte[4];
    data.readBytes(temp);
    int noOfElements = decodeInt(temp, 0);
    ArrayList<String> arr = new ArrayList<>(noOfElements);
    for (int i = 0; i < noOfElements; i++) {
        arr.add(readString(data));
    }

    return arr;
}

Now you can simply write the ByteBuf into channel and can gain lacs of transactions per second.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.