个人技术博客:www.zhenganwen.top
流程概览
项目结构
依赖
io.netty netty-all 5.0.0.Alpha1 org.reflections reflections 0.9.10 复制代码 org.projectlombok lombok 1.16.18 provided
通用模块
ClassInfo
实体类,封装了服务调用信息:
package top.zhenganwen.rpc.common;import lombok.Data;import java.io.Serializable;/** * ClassInfo class * 使用JDK的序列化技术必须实现接口Serializable * * @author : zaw * @date : 2019/3/30 */@Datapublic class ClassInfo implements Serializable { /** * 调用服务的接口名 */ private String className; /** * 调用服务的方法名 */ private String methodName; /** * 调用方法的参数列表类型 */ private Class[] paramTypes; /** * 调用服务传参 */ private Object[] params;}复制代码
需要注意的是客户端在发送调用信息时会将该类对象序列化并发送给服务端,而服务的则需要反序列化回来,如果使用的是JDK的序列化技术则需要将此类实现
Serializable
接口
服务接口
为了便于维护,服务接口通常会被独立出来到通用模块中,以jar
包的形式被服务调用方和服务提供方依赖。这里简单的写了两个接口,一个包含无參服务,一个包含有参服务。
public interface HasArgsHelloService { String hello(String msg);}public interface NoArgsHelloService { String hello();}复制代码
服务调用方
client
这个包中是依赖Service接口的一些类,RPC服务的调用对于他们来说是透明的,他们仅通过client_stub
中的ServiceProxy
来获取服务实现类并调用服务。
public class RPCClient { public static void main(String[] args){ NoArgsHelloService noArgsHelloService = (NoArgsHelloService) ServiceProxy.create(NoArgsHelloService.class); System.out.println(noArgsHelloService.hello()); HasArgsHelloService hasArgsHelloService = (HasArgsHelloService) ServiceProxy.create(HasArgsHelloService.class); System.out.println(hasArgsHelloService.hello("hello netty rpc")); }}复制代码
client_stub
真正处理RPC调用逻辑的包,ServiceProxy
通过JDK代理Proxy.newProxyInstance
来代理所有的服务,所有client
中调用服务的动作都将被该代理逻辑中设置的InvocationHandler
拦截,拦截后获取调用信息(接口名、方法名、方法参列类型、实参列表)并通过Netty与服务端建立连接发送调用信息,然后阻塞等待连接关闭事件(RPCClientHandler
在收到服务端返回的调用结果时会保存该结果并关闭连接),若此事件被触发说明RPCClientHandler
已拿到调用结果,于是此次InvocationHandler
的拦截可以返回了。
- ServiceProxy
package top.zhenganwen.rpc.client_stub;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.serialization.ClassResolvers;import io.netty.handler.codec.serialization.ObjectDecoder;import io.netty.handler.codec.serialization.ObjectEncoder;import top.zhenganwen.rpc.common.ClassInfo;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;/** * ServiceProxy class * * @author : zaw * @date : 2019/3/30 */public class ServiceProxy { public static Object create(Class clazz) { return Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //构造调用信息 ClassInfo classInfo = new ClassInfo(); classInfo.setClassName(clazz.getName()); classInfo.setMethodName(method.getName()); classInfo.setParamTypes(method.getParameterTypes()); classInfo.setParams(args); //使用netty发送调用信息给服务提供方 NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); RPCClientHandler rpcClientHandler = new RPCClientHandler(); try { bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectEncoder()); //反序列化对象时指定类解析器,null表示使用默认的类加载器 ch.pipeline().addLast(new ObjectDecoder(1024 * 64, ClassResolvers.cacheDisabled(null))); ch.pipeline().addLast(rpcClientHandler); } }); //connect是异步的,但调用其future的sync则是同步等待连接成功 ChannelFuture future = bootstrap.connect("127.0.0.1", 80).sync(); //同步等待调用信息发送成功 future.channel().writeAndFlush(classInfo).sync(); //同步等待RPCClientHandler的channelRead被触发后(意味着收到了调用结果) future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } //返回调用结果 return rpcClientHandler.getRpcResult(); } }); }}复制代码
- PRCClientHandler
package top.zhenganwen.rpc.client_stub;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;/** * RPCClientHandler class * * @author : zaw * @date : 2019/3/30 */public class RPCClientHandler extends ChannelHandlerAdapter { /** * RPC调用返回的结果 */ private Object rpcResult; public Object getRpcResult() { return rpcResult; } public void setRpcResult(Object rpcResult) { this.rpcResult = rpcResult; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { setRpcResult(msg); ctx.close(); }}复制代码
服务提供方
server
首先服务提供方有具体的服务实现类,然后它通过RPCServer
建立Netty服务端24小时监听客户端的服务调用请求。请求将被RPCServerHandler
处理,它根据请求中的调用信息通过反射找到实现类和服务方法并反射调用获取结果,并立即将结果发送给客户端。
- 服务实现类
public class NoArgsHelloServiceImpl implements NoArgsHelloService { @Override public String hello() { return "hello"; }}public class HasArgsHelloServiceImpl implements HasArgsHelloService { @Override public String hello(String msg) { return msg; }}复制代码
- PRCServer
package top.zhenganwen.rpc.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.serialization.ClassResolvers;import io.netty.handler.codec.serialization.ObjectDecoder;import io.netty.handler.codec.serialization.ObjectEncoder;import top.zhenganwen.rpc.server_stub.RPCServerHandler;/** * RPCServer class * * @author : zaw * @date : 2019/3/30 */public class RPCServer { public static void main(String[] args){ NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); try { bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new ObjectDecoder(1024 * 64, ClassResolvers.cacheDisabled(null))); ch.pipeline().addLast(new RPCServerHandler()); } }); //bind初始化端口是异步的,但调用sync则会同步阻塞等待端口绑定成功 ChannelFuture future = bootstrap.bind(80).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } }}复制代码
server_stub
真正根据调用请求反射调用的业务处理类
- RPCServerHandler
package top.zhenganwen.rpc.server_stub;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import org.reflections.Reflections;import top.zhenganwen.rpc.common.ClassInfo;import java.lang.reflect.Method;import java.util.Set;/** * RPCServerHandler class * * @author : zaw * @date : 2019/3/30 */public class RPCServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //获取调用信息,寻找服务实现类 ClassInfo classInfo = (ClassInfo) msg; String implName = getImplClassName(classInfo.getClassName()); Class clazz = Class.forName(implName); Method method = clazz.getMethod(classInfo.getMethodName(), classInfo.getParamTypes()); Object result = method.invoke(clazz.newInstance(), classInfo.getParams()); ctx.writeAndFlush(result); } private String getImplClassName(String interfaceName) throws ClassNotFoundException { Class interClass = Class.forName(interfaceName); String servicePath = "top.zhenganwen.rpc.server"; Reflections reflections = new Reflections(servicePath); SetimplClasses = reflections.getSubTypesOf(interClass); if (implClasses.isEmpty()) { System.err.println("impl class is not found!"); } else if (implClasses.size() > 1) { System.err.println("there are many impl classes, not sure invoke which"); } else { Class[] classes = implClasses.toArray(new Class[1]); return classes[0].getName(); } return null; }}复制代码