博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Netty三分钟手写一个RPC
阅读量:6241 次
发布时间:2019-06-22

本文共 9787 字,大约阅读时间需要 32 分钟。

个人技术博客:www.zhenganwen.top

流程概览

项目结构

依赖

io.netty
netty-all
5.0.0.Alpha1
org.reflections
reflections
0.9.10
org.projectlombok
lombok
1.16.18
provided
复制代码

通用模块

ClassInfo

实体类,封装了服务调用信息:

package top.zhenganwen.rpc.common;import lombok.Data;import java.io.Serializable;/** * ClassInfo class *      使用JDK的序列化技术必须实现接口Serializable * * @author : zaw * @date : 2019/3/30 */@Datapublic class ClassInfo implements Serializable {    /**     * 调用服务的接口名     */    private String className;    /**     * 调用服务的方法名     */    private String methodName;    /**     * 调用方法的参数列表类型     */    private Class[] paramTypes;    /**     * 调用服务传参     */    private Object[] params;}复制代码

需要注意的是客户端在发送调用信息时会将该类对象序列化并发送给服务端,而服务的则需要反序列化回来,如果使用的是JDK的序列化技术则需要将此类实现Serializable接口

服务接口

为了便于维护,服务接口通常会被独立出来到通用模块中,以jar包的形式被服务调用方和服务提供方依赖。这里简单的写了两个接口,一个包含无參服务,一个包含有参服务。

public interface HasArgsHelloService {    String hello(String msg);}public interface NoArgsHelloService {    String hello();}复制代码

服务调用方

client

这个包中是依赖Service接口的一些类,RPC服务的调用对于他们来说是透明的,他们仅通过client_stub中的ServiceProxy来获取服务实现类并调用服务。

public class RPCClient {    public static void main(String[] args){        NoArgsHelloService noArgsHelloService = (NoArgsHelloService) ServiceProxy.create(NoArgsHelloService.class);        System.out.println(noArgsHelloService.hello());        HasArgsHelloService hasArgsHelloService = (HasArgsHelloService) ServiceProxy.create(HasArgsHelloService.class);        System.out.println(hasArgsHelloService.hello("hello netty rpc"));    }}复制代码

client_stub

真正处理RPC调用逻辑的包,ServiceProxy通过JDK代理Proxy.newProxyInstance来代理所有的服务,所有client中调用服务的动作都将被该代理逻辑中设置的InvocationHandler拦截,拦截后获取调用信息(接口名、方法名、方法参列类型、实参列表)并通过Netty与服务端建立连接发送调用信息,然后阻塞等待连接关闭事件(RPCClientHandler在收到服务端返回的调用结果时会保存该结果并关闭连接),若此事件被触发说明RPCClientHandler已拿到调用结果,于是此次InvocationHandler的拦截可以返回了。

  • ServiceProxy
package top.zhenganwen.rpc.client_stub;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.serialization.ClassResolvers;import io.netty.handler.codec.serialization.ObjectDecoder;import io.netty.handler.codec.serialization.ObjectEncoder;import top.zhenganwen.rpc.common.ClassInfo;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;/** * ServiceProxy class * * @author : zaw * @date : 2019/3/30 */public class ServiceProxy {    public static Object create(Class clazz) {        return Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {            @Override            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {                //构造调用信息                ClassInfo classInfo = new ClassInfo();                classInfo.setClassName(clazz.getName());                classInfo.setMethodName(method.getName());                classInfo.setParamTypes(method.getParameterTypes());                classInfo.setParams(args);                //使用netty发送调用信息给服务提供方                NioEventLoopGroup group = new NioEventLoopGroup();                Bootstrap bootstrap = new Bootstrap();                RPCClientHandler rpcClientHandler = new RPCClientHandler();                try {                    bootstrap.group(group)                        .channel(NioSocketChannel.class)                        .option(ChannelOption.SO_KEEPALIVE, true)                        .option(ChannelOption.TCP_NODELAY, true)                        .handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectEncoder()); //反序列化对象时指定类解析器,null表示使用默认的类加载器 ch.pipeline().addLast(new ObjectDecoder(1024 * 64, ClassResolvers.cacheDisabled(null))); ch.pipeline().addLast(rpcClientHandler); } }); //connect是异步的,但调用其future的sync则是同步等待连接成功 ChannelFuture future = bootstrap.connect("127.0.0.1", 80).sync(); //同步等待调用信息发送成功 future.channel().writeAndFlush(classInfo).sync(); //同步等待RPCClientHandler的channelRead被触发后(意味着收到了调用结果) future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } //返回调用结果 return rpcClientHandler.getRpcResult(); } }); }}复制代码
  • PRCClientHandler
package top.zhenganwen.rpc.client_stub;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;/** * RPCClientHandler class * * @author : zaw * @date : 2019/3/30 */public class RPCClientHandler extends ChannelHandlerAdapter {    /**     * RPC调用返回的结果     */    private Object rpcResult;    public Object getRpcResult() {        return rpcResult;    }    public void setRpcResult(Object rpcResult) {        this.rpcResult = rpcResult;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        setRpcResult(msg);        ctx.close();    }}复制代码

服务提供方

server

首先服务提供方有具体的服务实现类,然后它通过RPCServer建立Netty服务端24小时监听客户端的服务调用请求。请求将被RPCServerHandler处理,它根据请求中的调用信息通过反射找到实现类和服务方法并反射调用获取结果,并立即将结果发送给客户端。

  • 服务实现类
public class NoArgsHelloServiceImpl implements NoArgsHelloService {    @Override    public String hello() {        return "hello";    }}public class HasArgsHelloServiceImpl implements HasArgsHelloService {    @Override    public String hello(String msg) {        return msg;    }}复制代码
  • PRCServer
package top.zhenganwen.rpc.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.serialization.ClassResolvers;import io.netty.handler.codec.serialization.ObjectDecoder;import io.netty.handler.codec.serialization.ObjectEncoder;import top.zhenganwen.rpc.server_stub.RPCServerHandler;/** * RPCServer class * * @author : zaw * @date : 2019/3/30 */public class RPCServer {    public static void main(String[] args){        NioEventLoopGroup boss = new NioEventLoopGroup();        NioEventLoopGroup worker = new NioEventLoopGroup();        ServerBootstrap bootstrap = new ServerBootstrap();        try {            bootstrap.group(boss, worker)                .channel(NioServerSocketChannel.class)                .option(ChannelOption.SO_BACKLOG, 128)                .childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new ObjectDecoder(1024 * 64, ClassResolvers.cacheDisabled(null))); ch.pipeline().addLast(new RPCServerHandler()); } }); //bind初始化端口是异步的,但调用sync则会同步阻塞等待端口绑定成功 ChannelFuture future = bootstrap.bind(80).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } }}复制代码

server_stub

真正根据调用请求反射调用的业务处理类

  • RPCServerHandler
package top.zhenganwen.rpc.server_stub;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import org.reflections.Reflections;import top.zhenganwen.rpc.common.ClassInfo;import java.lang.reflect.Method;import java.util.Set;/** * RPCServerHandler class * * @author : zaw * @date : 2019/3/30 */public class RPCServerHandler extends ChannelHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //获取调用信息,寻找服务实现类        ClassInfo classInfo = (ClassInfo) msg;        String implName = getImplClassName(classInfo.getClassName());        Class
clazz = Class.forName(implName); Method method = clazz.getMethod(classInfo.getMethodName(), classInfo.getParamTypes()); Object result = method.invoke(clazz.newInstance(), classInfo.getParams()); ctx.writeAndFlush(result); } private String getImplClassName(String interfaceName) throws ClassNotFoundException { Class interClass = Class.forName(interfaceName); String servicePath = "top.zhenganwen.rpc.server"; Reflections reflections = new Reflections(servicePath); Set
implClasses = reflections.getSubTypesOf(interClass); if (implClasses.isEmpty()) { System.err.println("impl class is not found!"); } else if (implClasses.size() > 1) { System.err.println("there are many impl classes, not sure invoke which"); } else { Class[] classes = implClasses.toArray(new Class[1]); return classes[0].getName(); } return null; }}复制代码

转载地址:http://zqcia.baihongyu.com/

你可能感兴趣的文章
sqlmap使用笔记
查看>>
U盾技术学习笔记
查看>>
云计算面临的安全挑战 访北大计算机学院院长陈钟
查看>>
一起谈.NET技术,C#中标准Dispose模式的实现
查看>>
艾伟:C#对游戏手柄的编程开发-API篇(2)
查看>>
关于defineProperty的一点理解
查看>>
如何创建只读域控制器RODC(Read-Only Domain Controller)
查看>>
python-字符串
查看>>
LabVIEW串口通信
查看>>
2017UGUI之slider
查看>>
python下载酷狗音乐源码
查看>>
MySQL学习----explain查看一条sql 的性能
查看>>
第零次作业
查看>>
Android + eclipse +ADT安装完全教程
查看>>
【批处理学习笔记】第七课:简单的批处理命令(6)
查看>>
leetcode 【 Subsets 】python 实现
查看>>
leetcode 【 Intersection of Two Linked Lists 】python 实现
查看>>
codeforces 767A Snacktower(模拟)
查看>>
用 Quartz 画聊天对话框背景实例
查看>>
Quartz2D简单绘制之饼状图
查看>>