光有core也可以进行rpc调用,但目前的话还是需要整合到主流的框架中,目前我只会spring相关的,所以接下来就是整合spring和springboot的操作;其实就是注解和非注解的形式;
其实这里主要偏重于spring一些功能讲解,因为用了xml处理器,我觉得挺有用的,所以这里也放上来讲下;但是spring后面我没有更新了,很多代码应该是没有很完善的,有很多优化的地方;项目名称是:
spring-simple-rpc
;
其实不是很复杂:
. └── spring ├── RpcNamespaceXmlHandler.java ├── RpcXmlBeanDefinitionParser.java ├── annotation │ └── Enable SimpleRpc.java ├── beans │ ├── ConsumerBean.java │ ├── ProviderBean.java │ ├── ServerBean.java │ └── parser │ └── ParseServerBean.java ├── exception │ └── BeanNotFoundException.java └── transfer ├── BaseData.java └── DataMap.java resources └── META-INF ├── rpc.xsd ├── spring.handlers └── spring.schemas
下面就稍微看看;
这里的话主要就是几个bean重要:
ServerBean
、ProviderBean
、ConsumerBean
;分别对应服务端启动,生产者提供服务,消费者消费服务;
ServerBean
:
public class ServerBean extends ServerConfig implements ApplicationContextAware { ExecutorService executorService = Executors.newFixedThreadPool(10); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SimpleRpcUrl simpleRpcUrl = ParseServerBean.parse(this); DataMap.DATA_TRANSFER_MAP.put(DataMap.DATA_TRANSFER, simpleRpcUrl); //启动注册中心 RegisterCenterFactory.create(simpleRpcUrl.getType()).init(simpleRpcUrl); SimpleRpcLog.info("注册中心初始化:{}", address); //初始化服务端 RpcServerSocket serverSocket = new RpcServerSocket(new Request()); executorService.submit(serverSocket); while (!serverSocket.isActiveSocketServer()) { try { Thread.sleep(500); } catch (InterruptedException ignore) { } } SimpleRpcLog.info("初始化生产端服务完成 {} {}", LocalAddressInfo.LOCAL_HOST, LocalAddressInfo.PORT); } }
这里就是实现ApplicationContextAware
,然后在setApplicationContext
做处理,然后ServerConfig
这里类似于springboot的propreties的操作,这里使用xml进行处理,这里就可以先理解为我们可以拿到ServerConfig
里面的值了;
setApplicationContext
里面就是解析配置参数address
,然后初始化注册中心,然后就是启动服务端,这里的new Request()
理论上是需要构建的,但是这里spring已经很久没改了,就不构建了;
ProviderBean
public class ProviderBean extends ProviderConfig implements ApplicationContextAware { @Resource private ServerBean serverBean; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SimpleRpcUrl simpleRpcUrl = ParseServerBean.parse(serverBean); RegisterCenter registerCenter = RegisterCenterFactory.create(simpleRpcUrl.getType()); if (Objects.isNull(registerCenter)) { throw new BeanNotFoundException("注册中心未初始化"); } Request providerRequest = new Request(); providerRequest.setInterfaceName(interfaceName); providerRequest.setBeanName(beanName); providerRequest.setAlias(alias); providerRequest.setHost(LocalAddressInfo.LOCAL_HOST); providerRequest.setPort(LocalAddressInfo.PORT); //注册生产者 boolean flag = registerCenter.register(Request.request2Register(providerRequest)); SimpleRpcServiceCache.addService(alias, applicationContext.getBean(beanName)); SimpleRpcLog.info("注册生产者:{}, 是否成功:{} ", JSON.toJSONString(providerRequest), flag); } }
这里跟上面切入点是一样的,但是这里有点就是private ServerBean serverBean;
先把这个bean注入,这里是为了保证bean的注入先后顺序;
然后再合理就是把接口信息注册到注册中心,然后将服务存到缓存里面,服务提供者做这些就够了;
ConsumerBean
public class ConsumerBean<T> extends ConsumerConfig implements FactoryBean<T> { @Resource private ServerBean serverBean; @Override public T getObject() throws BeanNotFoundException, NettyInitException, ClassNotFoundException { RegistryConfig registryConfig = ParseServerBean.serverToRegister(serverBean); BaseConfig baseConfig = new BaseConfig(); baseConfig.setLoadBalanceRule("round"); CommonConfig config = new CommonConfig(); config.setConsumerConfig(this); config.setRegistryConfig(registryConfig); config.setBaseConfig(baseConfig); return (T) RpcProxy.invoke(ClassLoaderUtils.forName(interfaceName), config); } @Override public Class<?> getObjectType() { try { if (!Objects.isNull(interfaceName)) { return ClassLoaderUtils.forName(interfaceName); } return null; } catch (ClassNotFoundException e) { return null; } } @Override public boolean isSingleton() { return true; } }
这里的ConsumerBean
是一个FactoryBean
;在创建bean实例的时候,会从这里来加载;这是spring相关的东西,不深究,然后这里同样是不完善的,很多配置都没有,这里的话就是取调用之前分析的代理方法RpcProxy.invoke
;所以消费者在通过@Autowired
去注入对象的时候,会放在三级缓存里面,然后往上传递,这里就生成了动态代理类,下次调用就会就走之前分析的逻辑,走handler里面处理;这里也比较简单;
这里涉及到了xsd的东西,也就是xml命名空间定义的问题,比较有意思,因为之前我没接触过;首先我们前面说了,ServerConfig
这些类,其实就是一些配置定义,然后各个业务bean继承,然后注入到spring中,这里是以xml形式注册的,所以需要有对应的xsd文件;
先看看spring里面的注入形式:
public class RpcNamespaceXmlHandler extends NamespaceHandlerSupport { /** * 命名空间初始化 */ @Override public void init() { registerBeanDefinitionParser("server", new RpcXmlBeanDefinitionParser(ServerBean.class)); registerBeanDefinitionParser("provider", new RpcXmlBeanDefinitionParser(ProviderBean.class)); registerBeanDefinitionParser("consumer", new RpcXmlBeanDefinitionParser(ConsumerBean.class)); } }
这里就是继承NamespaceHandlerSupport
类,但是这里有个问题,好想不能保证这几个bean的加载顺序,目前也不知道怎么处理,所以前面就是会在ProviderBean、ConsumerBean里面注入ServerBean,通过这样来保证顺序;
这里写好了处理器,然后就是讲处理器加载到spring中,我们需要在resources
资源文件下面新建文件srping.handlers
:
http\://rpc.simple.com/schema/rpc=com.simple.rpc.spring.RpcNamespaceXmlHandler
然后就是加载定义的xsd文件内容,也就是配置schema
spring.schemas
http\://rpc.simple.com/schema/rpc/rpc.xsd=META-INF/rpc.xsd
然后这里就会去加载rpc.xsd这个文件里面的内容(粘贴部分代码):
<xsd:element name="server"> <xsd:element name="provider"> <xsd:element name="consumer">
现在就是配置都加载到了,就是解析配置;我们只需要定义一个paser,RpcXmlBeanDefinitionParser
:
public class RpcXmlBeanDefinitionParser implements BeanDefinitionParser { private final Class<?> beanClass; RpcXmlBeanDefinitionParser(Class<?> beanClass) { this.beanClass = beanClass; } @Override public BeanDefinition parse(Element element, ParserContext parserContext) { RootBeanDefinition beanDefinition = new RootBeanDefinition(); beanDefinition.setBeanClass(beanClass); beanDefinition.setLazyInit(false); String beanName = element.getAttribute("id"); parserContext.getRegistry().registerBeanDefinition(beanName, beanDefinition); for (Method method : beanClass.getMethods()) { if (!isProperty(method, beanClass)) {continue;} String name = method.getName(); String methodName = name.substring(3, 4).toLowerCase() + name.substring(4); String value = element.getAttribute(methodName); beanDefinition.getPropertyValues().addPropertyValue(methodName, value); } return beanDefinition; } private boolean isProperty(Method method, Class beanClass) { // 判断是否是getter、setter方法 } }
这里就是获取对应配置类的getter、setter方法,然后对其属性赋值;然后配置信息就被加载到了;
整合spring大概就是这个原理,然后使用这里就不说了;有专门介绍大大小小功能的文章;
相较于spring的整合,Springboot整合相对来说也比较简单,只是加了很多注解信息,使用注解的方法来注入服务和引用;先看看注解信息;
SimpleRpcService:
这个表明一个类是RPC提供的类,然后有个alias
方法用来和接口的权限定名+别名来做RPC服务中的唯一值的,一般就是和beanName是同一个值;SimpleRpcReference
:这个注解就相当于我们用的@Autowired
,标记这是一个RPC的调用,里面是设计了当前这个rpc接口的负载均衡策略、版本号等,但是后面没去实现;SimpleRpcScan
:这个是用来扫描对应包下面的所有的RPC提供的接口,会配合Scanner
类来进行包扫描和类加载;. └── springboot ├── ApplicationClosedEventListener.java ├── ServerInitBeanPostProcessor.java ├── ServiceBeanPostProcessor.java ├── annotaton │ └── SimpleRpcScan.java ├── config │ ├── BootBaseConfig.java │ └── BootRegisterConfig.java └── scanner ├── SimpleRpcScanner.java └── SimpleRpcScannerRegistrar.java
SimpleRpcScanner
:
public class SimpleRpcScanner extends ClassPathBeanDefinitionScanner { @SafeVarargs public SimpleRpcScanner(BeanDefinitionRegistry registry, Class<? extends Annotation>... annotationTypes) { // 注册bean信息 super(registry); // 获取包含对应注解的类 for (Class<? extends Annotation> annotationType : annotationTypes) { super.addIncludeFilter(new AnnotationTypeFilter(annotationType)); } } }
这里就是将使用了指定注解的类加载到过滤器中;
然后配合注册处理器使用,将扫描到的bean信息注入到spring容器中:
public class SimpleRpcScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware { /** * 服务扫描的基础包,是 @RpcScan 的哪个属性 */ private static final String SERVER_SCANNER_BASE_PACKAGE_FIELD = "basePackages"; /** * 默认基础包 */ private static final String[] DEFAULT_SCANNER_BASE_PACKAGES = {"com.simple.rpc"}; private ResourceLoader resourceLoader; @Override public void setResourceLoader(ResourceLoader resourceLoader) { this.resourceLoader = resourceLoader; } /** * 注册bean信息,丢到spring容器中 * * @param importingClassMetadata * @param registry */ @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { //扫描注解 Map<String, Object> annotationAttributes = importingClassMetadata.getAnnotationAttributes(SimpleRpcScan.class.getName()); SimpleRpcScanner serviceScanner = new SimpleRpcScanner(registry, SimpleRpcService.class); if (resourceLoader != null) { serviceScanner.setResourceLoader(resourceLoader); } String[] serviceBasePackages = (String[]) annotationAttributes.get(SERVER_SCANNER_BASE_PACKAGE_FIELD); if (serviceBasePackages.length < 1) { serviceBasePackages = DEFAULT_SCANNER_BASE_PACKAGES; } int serviceCount = serviceScanner.scan(serviceBasePackages); SimpleRpcLog.info(StrUtil.format("serviceScanner. packages={}, count={}", serviceBasePackages, serviceCount)); } }
这里的逻辑就是,先是拿到所有的@SimpleRpcScan
注解信息,然后在构建注解扫描器:
new SimpleRpcScanner(registry, SimpleRpcService.class);
然后获取所有的basePackages
路径,配合serviceScanner.scan(serviceBasePackages)
将所有符合的类都注入到spring中;当然这里只是元数据信息注册,并没有实例化对象;
先看看初始化,没想到放在那个节点合适;所以就继承了BeanPostProcessor
,之前的想法是想放在SpringBoot的启动时机,但是需要使用到配置类的;我这里没去验证启动时间和配置类加载的顺序,所以就使用现在这套方案;看看代码:
@Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if (initFlag) { init(bootRegisterConfig); initFlag = false; } return bean; } public void init(BootRegisterConfig bootRegisterConfig) throws BeansException { SimpleRpcUrl simpleRpcUrl = SimpleRpcUrl.toSimpleRpcUrl(bootRegisterConfig); // 保存注册中心信息 RegisterInfoCache.save(bootRegisterConfig); //启动注册中心 RegisterCenterFactory.create(simpleRpcUrl.getType()).init(simpleRpcUrl); SimpleRpcLog.info("注册中心初始化:{}", bootRegisterConfig.getAddress()); //初始化服务端 RpcServerSocket serverSocket = new RpcServerSocket(buildRequest(bootBaseConfig)); executorService.submit(serverSocket); while (!serverSocket.isActiveSocketServer()) { try { Thread.sleep(500); } catch (InterruptedException ignore) { } } SimpleRpcLog.info("初始化生产端服务完成 {} {}", LocalAddressInfo.LOCAL_HOST, LocalAddressInfo.PORT); }
这里的话会去判断是否已经记初始化过了;
初始化跟spring的差不多,也是初始化注册中心和初始化netty服务端;
服务启动就这些东西,然后就是服务注册操作了,同样是ServiceBeanPostProcessor
切入,这里跟上面初始化操作是根据Ordered
来实现加载顺序的;先看服务注册:
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { // 判断是否是被@SimpleRpcService注解的类 SimpleRpcService rpcService = bean.getClass().getAnnotation(SimpleRpcService.class); SimpleRpcUrl simpleRpcUrl = SimpleRpcUrl.toSimpleRpcUrl(bootRegisterConfig); // rpc 服务需要发布到注册中心 if (rpcService != null) { RegisterCenter registerCenter = RegisterCenterFactory.create(simpleRpcUrl.getType()); Request request = new Request(); String applicationName = ApplicationCache.APPLICATION_NAME; request.setApplicationName(applicationName); request.setLoadBalanceRule(bootBaseConfig.getLoadBalanceRule()); request.setSerializer(bootBaseConfig.getSerializer()); request.setCompressor(bootBaseConfig.getCompressor()); request.setRegister(bootBaseConfig.getRegister()); request.setWeights(bootBaseConfig.getWeights()); request.setHost(LocalAddressInfo.LOCAL_HOST); request.setPort(LocalAddressInfo.PORT); request.setHealth(HealthStatus.IS_HEALTH.getCode()); Class<?>[] interfaces = bean.getClass().getInterfaces(); if (!CollectionUtils.isEmpty(Arrays.asList(interfaces))) { for (Class<?> anInterface : interfaces) { String alias = getBeanName(anInterface.getCanonicalName()); request.setAlias(StrUtil.isBlank(rpcService.alias()) ? alias : rpcService.alias()); request.setBeanName(alias); request.setInterfaceName(anInterface.getCanonicalName()); String registerKey = registerCenter.register(Request.request2Register(request)); // 将对应的bean存入到缓存之中 SimpleRpcServiceCache.addService(registerKey, bean); } } } return bean; }
拿到注解信息,然后拿到这个bean 可能的所有实现的接口,然后拿到对应的接口的权限定名+ alias作为key存入到注册中心,然后将该baan存到缓存之中,在netty-server的handler中会使用到;这里已经把服务元信息注册到注册中心了,然后bean也注入到缓存中了;spring中也有了;这里就是注册;
然后就是服务的使用了,同样在这个类里面,放在了另外一个方法里面:
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { SimpleRpcReference rpcReference = field.getAnnotation(SimpleRpcReference.class); if (rpcReference != null) { // 生成代理对象 Object proxy = null; try { ConsumerConfig consumerConfig = buildConsumerConfig(field, rpcReference); proxy = RpcProxy.invoke(ClassLoaderUtils.forName(consumerConfig.getInterfaceName()), buildCommonConfig(consumerConfig)); } catch (ClassNotFoundException e) { e.printStackTrace(); } field.setAccessible(true); try { // 设置字段 field.set(bean, proxy); } catch (IllegalAccessException e) { SimpleRpcLog.error("field.set error. bean={}, field={}", bean.getClass(), field.getName(), e); } } } return bean; }
这里就是去获取对象里面的字段,一般我们引入类的方式就是通过AService aService
这种方式注入,然后这里的话就是先为该类生成对应的代理类field.set(bean, proxy);
然后实际调用延迟到handler中去处理,这里就是服务发现,也比较简单;
整合就是这些了!!
谢谢大家阅读!!!
公众号: 搜索关注,爱搞技术的吴同学 ,公众号上会经常写实用性的文章,谢谢关注!!回复:“加好友”,可获取我的微信二维码,欢迎加好友,一起学习!!!
大量源码: 欢迎star,可能会分享微服务实战,分页插件等;gitee