Dubbo服务暴露注册原理
微信公众号:PersistentCoder
一、概述
dubbo是java领域应用最广泛的rpc框架之一,服务端启动时把服务注册到注册中心,客户端启动时向注册中心订阅服务,然后通过客户端的负载和路由机器选择对应的服务进行连接调用。
通过上图我们可以看出,对于服务端,如果想要被调用,需要把自己注册到注册中心,以便于客户端能发现,对于客户端,需要把自己需要调用的服务订阅到注册中心。就好比,工厂生产的东西,放到商店才能够被别人购买,同样客户去商店买东西,要告诉商店想要买什么。
本篇文章将会详细的介绍dubbo服务的暴露注册流程和原理。
在分析之前还要搬一张dubbo的图,来对服务注册有一个大致的认识和概念:
注意看右半边浅绿色部分,涉及到了服务注册和服务调用,官方也给出了服务暴露的时序图:
但是只能说比较抽象和宽泛,具体的细节和实现思路还需要我们自己去理解和领会。
接下来我们将详细地分析dubbo服务暴露注册的原理。
注意
dubbo版本2.7.8
二、原理&源码分析
为了理解方便,我们将原理拆分成两部分来介绍,分别是服务实例化和服务暴露注册。
1.服务实例化
从EnableDubbo开始看:
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@EnableDubboConfig
@DubboComponentScan
public @interface EnableDubbo {
@AliasFor(annotation = DubboComponentScan.class, attribute = "basePackages")
String[] scanBasePackages() default {};
@AliasFor(annotation = DubboComponentScan.class, attribute = "basePackageClasses")
Class<?>[] scanBasePackageClasses() default {};
@AliasFor(annotation = EnableDubboConfig.class, attribute = "multiple")
boolean multipleConfig() default true;
}
定义扫描的包路径,并且EnableDubbo是一个组合注解,被另外两个注解@EnableDubboConfig和@DubboComponentScan标注,@EnableDubboConfig是配置相关并注册一些基础组件BeanDefinition,暂且不管,看一下@DubboComponentScan注解。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {
String[] value() default {};
String[] basePackages() default {};
Class<?>[] basePackageClasses() default {};
}
该注解导入了DubboComponentScanRegistrar,是一个ImportBeanDefinitionRegistrar,容器启动时会被ConfigurationClassPostProcessor调用,可参考《ImportBeanDefinitionRegistrar原理》,DubboComponentScanRegistrar重写registerBeanDefinitions方法:
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);
// @since 2.7.6 Register the common beans
registerCommonBeans(registry);
}
先获取要扫描的包路径,然后注册ServiceAnnotationBeanPostProcessor类的BeanDefinition:
private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
BeanDefinitionBuilder builder = rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);
builder.addConstructorArgValue(packagesToScan);
builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);
}
ServiceAnnotationBeanPostProcessor是BeanFactoryProcessor也是BeanDefinitionRegistryPostProcessor,在AbstractApplicationContext#refresh刷新上下文之后会调用其
postProcessBeanDefinitionRegistry方法。看一下方法定义:
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
// @since 2.7.5
registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME, DubboBootstrapApplicationListener.class);
Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
registerServiceBeans(resolvedPackagesToScan, registry);
}
}
先注册了DubboBootstrapApplicationListener类定义(后边分析服务注册会用到),然后调用registerServiceBeans方法注册ServiceBean:
private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
DubboClassPathBeanDefinitionScanner scanner =
new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
scanner.setBeanNameGenerator(beanNameGenerator);
// refactor @since 2.7.7
serviceAnnotationTypes.forEach(annotationType -> {
scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));
});
for (String packageToScan : packagesToScan) {
// Registers @Service Bean first
scanner.scan(packageToScan);
// Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
Set<BeanDefinitionHolder> beanDefinitionHolders =
findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
registerServiceBean(beanDefinitionHolder, registry, scanner);
}
}
}
}
在指定包路径下扫描所支持服务注解的类,然后调用registerServiceBean逐个注册成ServiceBean:
private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
DubboClassPathBeanDefinitionScanner scanner) {
Class<?> beanClass = resolveClass(beanDefinitionHolder);
Annotation service = findServiceAnnotation(beanClass);
AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);
Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);
String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();
AbstractBeanDefinition serviceBeanDefinition =
buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);
String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);
if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean
registry.registerBeanDefinition(beanName, serviceBeanDefinition);
}
}
该方法将扫描得到的BeanDefinition包装转换成ServiceBean然后注册到容器中:
private AbstractBeanDefinition buildServiceBeanDefinition(Annotation serviceAnnotation,
AnnotationAttributes serviceAnnotationAttributes,
Class<?> interfaceClass,
String annotatedServiceBeanName) {
BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class);
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
//省略···
return builder.getBeanDefinition();
}
到这里我们可以看到,改BeanFactoryProcessor回把我们应用中使用Service和DubboService标注的接口注册成ServiceBean类型的BeanDefinition;目前服务暴露支持三个注解:
private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList(
// @since 2.7.7 Add the @DubboService , the issue : https://github.com/apache/dubbo/issues/6007
DubboService.class,
// @since 2.7.0 the substitute @com.alibaba.dubbo.config.annotation.Service
Service.class,
// @since 2.7.3 Add the compatibility for legacy Dubbo's @Service , the issue : https://github.com/apache/dubbo/issues/4330
com.alibaba.dubbo.config.annotation.Service.class
);
在AbstractApplicationContext#refresh最后会调用finishBeanFactoryInitialization方法完成普通BeanDefinition的实例化。
然后我们看一下ServiceBean是个什么东东.
从继承关系中分析出,服务暴露在ServiceConfig中实现,并且在父类AbstractConfig中定义了初始化方法addInfoConfigManager.
@PostConstruct
public void addIntoConfigManager() {
ApplicationModel.getConfigManager().addConfig(this);
}
在之前文章《spring bean生命周期管理》中有分析到@PostConstruct会在bean实例化之后,InitializingBean和自定义init方法之前执行。
该方法执行后,会把ServiceBean实例添加到ConfigManager中的configsCache缓存中等后续备用。
到这里,ServiceBean实例化就完成了,整个流程大致如下:
2.服务暴露注册
服务的暴露与注册是两个不同的概念;在Dubbo中,微服务之间的交互默认是通过Netty进行的,而服务之间的通信是基于TCP方式进行的。也就是说,每个服务都会存在一个ip和port。所谓的服务暴露就是指根据配置将当前服务使用Netty绑定一个本地的端口号(对于消费者而言,则是尝试连接目标服务的ip和端口)。至于注册,由于微服务架构中对于新添加的服务,需要一定的机制来通知消费者,有新的服务可用,或者对于某些下线的服务,也需要通知消费者,将这个已经下线的服务给移除。
前边有说过ServiceClassPostProcessor会注册DubboBootstrapApplicationListener类定义,应用启动会初始化。
它是一个ApplicationListener,会接受Spring容器的事件,重写onApplicationContextEvent接收上下文刷新和上下文关闭事件:
@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
处理接收到上下文刷新成功事件调用了DubboBootstrap的start方法:
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
ready.set(false);
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// 1. export Dubbo Services
exportServices();
//省略
}
return this;
}
做一些初始化操作之后调用exportServices导出服务:
private void exportServices() {
configManager.getServices().forEach(sc -> {
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
exportedServices.add(sc);
});
asyncExportingFutures.add(future);
} else {
sc.export();
exportedServices.add(sc);
}
});
}
这里会用到前边我们分析的ServiceBean实例化之后会调用@PostConstruct将其添加到ConfigManager中,这里会拿到ServiceBean实例列表遍历,然后调用export方法,会调用ServiceBean的父类ServiceConfig的export方法:
public synchronized void export() {
if (!shouldExport()) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.initialize();
}
checkAndUpdateSubConfigs();
//init serviceMetadata
serviceMetadata.setVersion(getVersion());
serviceMetadata.setGroup(getGroup());
serviceMetadata.setDefaultGroup(getGroup());
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport();
}
exported();
}
然后调用内部方法doExportUrlsFor1Protocol导出服务:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//省略
// export service
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
重点是PROTOCOL.export调用,Protocol是自定义扩展实现,常见的有InjvmProtocol、DubboProtocol和RegistryProtocol,RegistryProtocol不是真正的协议,是其他类型实现的包装,因为我们需要向注册中心注册服务,所以会调用RegistryProtocol实现:
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
//省略
return new DestroyableExporter<>(exporter);
}
该方法有两个核心功能,调用doLocalExport暴露服务和调用register注册服务。先看一下暴露服务实现:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
这里protocol.export会调用DubboProtocol实现:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
openServer(url);
optimizeSerialization(url);
return exporter;
}
openServer其实是进行服务的本地暴露,本质上就是根据配置使用Netty绑定本地的某个端口,从而完成服务暴露工作。然后调用到
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
最后会调用到Dubbo封装的NettyServer的构造方法。
NettyServer会调用父类的构造方法:
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
executor = executorRepository.createExecutorIfAbsent(url);
}
然后会调用NettyServer的doOpen实现开启服务:
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
这段代码看起来应该比较熟悉,就是开启netty服务,并且会添加一些编解码组件,服务调用处理组件等。
然后接着看服务注册register方法实现:
private void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
注册中心我们选择Nacos进行分析,RegistryFactory会生成NacosRegistry并返回。
register会调用父类FailbackRegistry实现:
@Override
public void register(URL url) {
if (!acceptable(url)) {
return;
}
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
//省略
}
}
然后就调用到子类NacosRegistry的doRegister实现:
@Override
public void doRegister(URL url) {
final String serviceName = getServiceName(url);
final Instance instance = createInstance(url);
execute(namingService -> namingService.registerInstance(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP), instance));
}
接着调用NamingProxy#registerService注册服务:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
最后会调用到callSever方法,通过http请求将服务信息写到Nacos对应节点:
public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
String method) throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params);
Header header = builderHeader();
String url;
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
}
url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
}
try {
HttpRestResult<String> restResult = nacosRestTemplate
.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
end = System.currentTimeMillis();
if (restResult.ok()) {
return restResult.getData();
}
if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
return StringUtils.EMPTY;
}
throw new NacosException(restResult.getCode(), restResult.getMessage());
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
到这里服务暴露和服务注册都完成了,看一下整个流程:
三、总结
本篇文章分析了dubbo服务的暴露和注册,里边涉及了spring的扩展能力、dubbo自定义SPI实现、分层和模块化设计等众多内容,希望对大家平时dubbo的使用和理解有所帮助,同时作为文末彩蛋,感兴趣的读者可以关注下一下几个知识点或问题:
- dubbo的injvm是怎么一回事,并且是如何实现的?
- 对于使用dubbo注解@Service或者@DubboService标注的类,应用启动后能否像spring管理的组件一样注入和使用?
- 被@Service或者@DubboService标注的类,会被解析定义成ServiceBean,而ServiceBean不是FactoryBean,如何保证上下文管理差异化?