首页 慕课教程 Netty 教程 Netty 教程 如何自定义编解码器

如何自定义编解码器

1. 前言

上一节我们一节了解了什么是编码解码、序列化和反序列化了,并且留有一道思考题,本节内容主要是深入解析该思考题。

思考题:能否把我们的编码和解码封装成独立的 Handler 呢?那么应该如何去封装呢?

2. 为什么要封装独立 Handler?

即使我们把编码和解码封装成了方法,但是还是需要在 Handler 业务逻辑里面进行手工调用,虽然看似不怎么影响,但是业务 Handler 不够纯粹,应该让 Handler 只是专心的负责处理业务逻辑就好。

实例:

ch.pipeline().addLast(new MyEncoderHandler());//解码Handler
ch.pipeline().addLast(new MyDecoderHandler());//编码Handler
ch.pipeline().addLast(new MyBusiHandler());//业务Handler

public class MyBusiHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //1.接受参数,可以直接强转
        UserReq userReq=(UserReq)msg;

        //2.相应数据,直接写对象
        UserRes res=new UserRes();
        res.setCode(0);
        res.setMsg("接受成功");
        ctx.writeAndFlush(res);
    }
}

通过以上的代码,我们把编码和解码封装成两个独立的 Handler,并且加入到 ChannelPipeline 里面进行管理。在我们的业务 Handler 里面就可以直接操作实体数据,无需手工转换成字节数组了。

思考:那么如何进行封装 Handler 呢?

3. StringDecoder 和 StringEncoder

3.1 简单使用

StringDecoder 和 StringEncoder 是 Netty 为我们提供的专门针对普通字符串的解码和编码器,使用起来非常的简单。

客户端直接发送字符串。

实例:

ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ClientTestHandler());

public class ClientTestHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
		//客户端直接写字符串,没有任何的数据加工
        ctx.channel().writeAndFlush("hello world");
    }
}

服务端直接强转字符串。

实例:

ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ServerTestHandler());

public class ServerTestHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//直接把msg转换成String类型
        String str=msg.toString();
        System.out.println("str="+str);
    }
}

总结,这种模式开发起来实在太方便了,无需做数据的加工,我们还是按照我们熟悉的方式去写代码,非常的方便。

但是,它只是支持普通的字符串类型进行编码和解码而已,对于复杂的引用类型则无效。

3.2 大体流程

其实原理是非常的简单的,请看下图。

图片描述
执行流程说明:

  1. StringDecoder 必须放在业务 Handler 之前,因为都是 InboundHandler,需要按顺序执行;
  2. StringEncoder 放在业务 Handler 之前,则可以使用 ctx.writeAndFlush () 输出数据,也可以使用 ctx.channel ().writeAndFlus () 输出数据(ChannelHandler 已经讲过原理了);
  3. StringEncoder 放在业务 Handler 之后,则只能使用 ctx.channel().writeAndFlush() 输出数据。

3.3 源码阅读

思考:StringDecoder 和 StringEncoder 到底怎么实现的呢?

StringDecoder 源码:

@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        //直接msg.toString()
        out.add(msg.toString(this.charset));
    }
}

发现 StringDecoder 的源码非常的简单,直接.toString() 转换即可。

StringEncoder 源码:

@Sharable
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {
    protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
        if (msg.length() != 0) {
            //继续跟进源码
            out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), this.charset));
        }
    }
}

public static ByteBuf encodeString(ByteBufAllocator alloc, CharBuffer src, Charset charset) {
    //继续跟进源码
    return encodeString0(alloc, false, src, charset, 0);
}

保留核心源码

static ByteBuf encodeString0(ByteBufAllocator alloc, boolean enforceHeap, CharBuffer src, Charset charset, int extraCapacity) {
    CharsetEncoder encoder = CharsetUtil.encoder(charset);
    int length = (int)((double)src.remaining() * (double)encoder.maxBytesPerChar()) + extraCapacity;
    boolean release = true;
    
    //1.创建ByteBuf分配器
    ByteBuf dst;
    if (enforceHeap) {
        dst = alloc.heapBuffer(length);
    } else {
        dst = alloc.buffer(length);
    }

    ByteBuf var12;
    try {
        //2.得到NIO的ByteBuffer【跟进Netty的ByteBuf基本上一样】
        ByteBuffer dstBuf = dst.internalNioBuffer(0, length);
        int pos = dstBuf.position();
        
        //3.把内容写得NIO的ByteBuffer
        CoderResult cr = encoder.encode(src, dstBuf, true);
        cr = encoder.flush(dstBuf);
		
        //4.更新ByteBuf的写指针writeIndex
        dst.writerIndex(dst.writerIndex() + dstBuf.position() - pos);
        
        //5.给var12赋值
        var12 = dst;
    } catch (CharacterCodingException var16) {
        throw new IllegalStateException(var16);
    }
    return var12;
}

大致流程就是把字符串内容转换成 NIO 的 ByteBuffer,这里大致知道整个流程即可,不用深究每行代码的意思,其实 Netty 的 ByteBuf 底层就是基于 ByteBuffer 进行封装的。

3. 自定义编解码器

通过上面 Demo 的学习,以及 StringDecoder 和 StringEncoder 两个类的学习,相信大家更加能理解编解码器了,毕竟 StringDecoder 和 StringEncoder 从字面意思也能理解它们是针对字符串格式的,如果我们想要传递一个实体那么怎么办呢?

主要解决方案有两种:

方案一: 把实体转换成 json 格式字符串,然后依然使用 StringDecoder 和 StringEncoder 编解码器,但是每次手工转换和解析,非常的麻烦;
方案二: 自定义针对实体的编解码器,并且加入到双向链表里面,这样就可以传递自定义实体了。

下面主要讲解如何实现针对实体的编解码器:

3.1 实体

实例:

@Data
public class User {
    private String name;
    private Integer age;
}

3.2 编码器

核心步骤:

  1. 继承 MessageToByteEncoder,重写 encode 方法;
  2. 把 User 对象转换成 byte [];
  3. 把 byte [] 写到 ByteBuf。

实例:

public class MyEncoder extends MessageToByteEncoder<User> {
    protected void encode(ChannelHandlerContext channelHandlerContext, 
                          User user, 
                          ByteBuf byteBuf) throws Exception {
        
        //1.对象流
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(os);
        oos.writeObject(user);
        byte[] bytes=os.toByteArray();
        
		//2.关闭流
        oos.close();
        os.close();
		
        //3.写到ByteBuf容器
        byteBuf.writeBytes(bytes);
    }
}

3.3 解码器

核心步骤:

  1. 继承 ByteToMessageDecoder,重写 decode 方法;
  2. 自定义一个 byte [] 数组,长度是 ByteBuf 的可读长度;
  3. 把 ByteBuf 转换成 User 实体。

实例:

public class MyDecoder extends ByteToMessageDecoder {
    protected void decode(ChannelHandlerContext channelHandlerContext, 
                          ByteBuf byteBuf, 
                          List<Object> list) throws Exception {
        //1.定义byte[],长度为ByteBuf可读长度
        byte[] bytes=new byte[byteBuf.readableBytes()];
        //2.往byte[]读取数据
        byteBuf.readBytes(bytes);
		//3.对象流
        ByteArrayInputStream is=new ByteArrayInputStream(bytes);
        ObjectInputStream iss=new ObjectInputStream(is);
        User user=(User)iss.readObject();
        //4.关闭流
        is.close();
        iss.close();
		//5.添加到集合
        list.add(user);
    }
}

3.4 添加到 Pipeline

实例:

.childHandler(new ChannelInitializer<NioSocketChannel>() {
    protected void initChannel(NioSocketChannel ch) {
        //1.解码器
        ch.pipeline().addLast(new MyDecoder());
        //2.编码器
        ch.pipeline().addLast(new MyEncoder());
        //3.业务Handler
        ch.pipeline().addLast(new ServerTestHandler());
    }
});

4. 小结

通常情况下,需要把编解码器分别独立封装成 Handler,并且加入到 ChannelPipeline 进行管理,主要目的是简化繁琐的编码和解码的步骤,让业务 Handler 更加专注去处理业务逻辑,更加的符合开发人员的习惯。

本节主要掌握以下两点内容

  1. 如果针对字符串,那么可以使用 Netty 内置的编解码器,分别是 StringEncoder 和 StringDecoder;
  2. 如果是其它引用类型,主要有两种方式,①转换成字符串格式;②自定义编解码器。