apollo客户端通知原理
微信公众号:PersistentCoder
一、使用场景
Apollo是携程开源的一个分布式配置中心,提供了丰富的能力,其中就包括配置发布动态通知。
动态通知有很多应用场景,其目的就是将配置的更新实时同步到应用内存粒度,比如:
- 动态规则维护
- 黑白名单
- 半自动化刷新缓存
二、使用
本篇文章主要围绕半自动化刷新缓存展开。在电商环境,分为商家B端和客户C端,商家在平台或者ERP更新或者发布一些配置变更需要同步到C端让用户感知到最新的内容。
首先考虑到B端的配置变更频率不会太频繁,所以C端会做缓存,那么如果B端发生变更如何通知到C端刷新缓存拉取最新配置内容,有两种实现方式:
- B端配置变更后发布消息,C端监听变更消息,然后自动失效缓存
- B端配置变更后,手动通知C端,然后失效缓存,也就是半自动化刷新
自动失效缓存不展开分析,半自动化刷新实现也很简单,基于Apollo的客户端通知机制就可以实现,在配置中心发布变更主体,然后在应用层监听变更内容并做出响应操作即可。
1.Apollo新增配置
在配置平台新增业务相关的key-value:
apollo.xxx.config_refresh = {"buzzId":"xxx","platform":1,"version":3}
包含业务主体信息,以及版本字段(用于处理配置无变更问题)。
2.编写事件监听
使用ApolloConfigChangeListener注解标注处理对应key内容变更的方法。
@ApolloConfigChangeListener(interestedKeys = {APOLLO_XXX_CONFIG_REFRESH})
public void onChange(ConfigChangeEvent changeEvent) {
Set<String> changedKeys = changeEvent.changedKeys();
if(CollectionUtils.isEmpty(changedKeys)) {
log.warn("onChange nothing change;changeKeys={}",changedKeys);
return;
}
if(!changedKeys.contains(APOLLO_XXX_CONFIG_REFRESH)) {
log.warn(".onChange change event not contains config;changeKeys={}",changedKeys);
return;
}
log.info("onChange config change;start reinitialize ...........");
ConfigChange change = changeEvent.getChange(APOLLO_XXX_CONFIG_REFRESH);
String oldVal = change.getOldValue();
String newVal = change.getNewValue();
log.info("onChange;change '{}' from oldVal:{} to newValue:{}",APOLLO_XXX_CONFIG_REFRESH,oldVal,newVal);
if(!this.isJson(newVal)) {
log.info("onChange not valid json;newVal={}",newVal);
return;
}
JSONObject json = JSON.parseObject(newVal);
String buzzId = null;
if(null == (buzzId = json.getString(BUZZ_ID_KEY))) {
return;
}
Integer platform = json.getInteger(PLATFORM_KEY);
Integer version = json.getInteger(VERSION_KEY);
//手动让缓存失效
try {
this.doExpireCache(buzzId,platform,version)
} catch (Exception e) {
log.error("onChange refresh config cache occur error;buzzId={},platform={},version={}",buzzId,platform,version,e);
}
}
这样在发生B端配置变更的时候,在配置平台发布对应key-value,然后C端应用接收到变更内容,就会做出相应处理,将缓存清掉。
三、原理&源码分析
从前边的案例可以看出,核心能力支撑就是Apollo的客户端通知,那么我们就来分析一下Apollo客户端通知能力的实现原理。
Apollo客户端通知的实现,分为三个维度分析,分别是配置变更监听器准备、变更通知准备、变更通知执行。
1.配置变更监听器准备
在不接入其他中间件封装的情况下,使用的入口是EnableApolloConfig注解,我们从该注解着手分析。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(ApolloConfigRegistrar.class)
public @interface EnableApolloConfig {
String[] value() default {ConfigConsts.NAMESPACE_APPLICATION};
int order() default Ordered.LOWEST_PRECEDENCE;
}
该注解导入并激活ApolloConfigRegistrar类。
public class ApolloConfigRegistrar implements ImportBeanDefinitionRegistrar {
private ApolloConfigRegistrarHelper helper = ServiceBootstrap.loadPrimary(ApolloConfigRegistrarHelper.class);
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
helper.registerBeanDefinitions(importingClassMetadata, registry);
}
}
ApolloConfigRegistrar是一个ImportBeanDefinitionRegistrar,其原理和调用时机可参考《ImportBeanDefinitionRegistrar原理》,通过java spi机制加载ApolloConfigRegistrarHelper实现类DefaultApolloConfigRegistrarHelper的实例。
com.ctrip.framework.apollo.spring.spi.DefaultApolloConfigRegistrarHelper
然后调用registerBeanDefinitions方法注册BeanDefinition:
public class DefaultApolloConfigRegistrarHelper implements ApolloConfigRegistrarHelper {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
AnnotationAttributes attributes = AnnotationAttributes
.fromMap(importingClassMetadata.getAnnotationAttributes(EnableApolloConfig.class.getName()));
//省略
BeanRegistrationUtil.registerBeanDefinitionIfNotExists(registry, ApolloAnnotationProcessor.class.getName(),
ApolloAnnotationProcessor.class);
//省略
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}
其中有一行注册ApolloAnnotationProcessor类定义,我们看一下ApolloAnnotationProcessor是何方神圣。
它是一个BeanPostProcessor,父类ApolloProcessor重写了postProcessBeforeInitialization方法:
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
Class clazz = bean.getClass();
for (Field field : findAllField(clazz)) {
processField(bean, beanName, field);
}
for (Method method : findAllMethod(clazz)) {
processMethod(bean, beanName, method);
}
return bean;
}
该方法在Bean实例化之后初始化之前执行,扫描目标类的所有属性和方法然后执行逻辑,我们重点看processMethod方法,看一下ApolloAnnotationProcessor实现:
@Override
protected void processMethod(final Object bean, String beanName, final Method method) {
ApolloConfigChangeListener annotation = AnnotationUtils
.findAnnotation(method, ApolloConfigChangeListener.class);
if (annotation == null) {
return;
}
Class<?>[] parameterTypes = method.getParameterTypes();
ReflectionUtils.makeAccessible(method);
String[] namespaces = annotation.value();
String[] annotatedInterestedKeys = annotation.interestedKeys();
String[] annotatedInterestedKeyPrefixes = annotation.interestedKeyPrefixes();
ConfigChangeListener configChangeListener = new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
ReflectionUtils.invokeMethod(method, bean, changeEvent);
}
};
Set<String> interestedKeys = annotatedInterestedKeys.length > 0 ? Sets.newHashSet(annotatedInterestedKeys) : null;
Set<String> interestedKeyPrefixes = annotatedInterestedKeyPrefixes.length > 0 ? Sets.newHashSet(annotatedInterestedKeyPrefixes) : null;
for (String namespace : namespaces) {
Config config = ConfigService.getConfig(namespace);
if (interestedKeys == null && interestedKeyPrefixes == null) {
config.addChangeListener(configChangeListener);
} else {
config.addChangeListener(configChangeListener, interestedKeys, interestedKeyPrefixes);
}
}
}
将ApolloConfigChangeListener标注的方法包装成ConfigChangeListener然后注册到对应namespace的Config中。
注册流程如下:
2.变更通知准备
前边分析了将客户端的通知变更逻辑封装成了监听器注册备用,那么谁去触发监听器的逻辑呢?
接下来我们分析下如何将变更和通知逻辑关联起来。
apollo-client包中spring.factories定义了ApolloApplicationContextInitializer类型ApplicationContextInitializer,而ApplicationContextInitializer会在应用启动时加载:
public SpringApplication(ResourceLoader resourceLoader, Class<?>... primarySources) {
//...
setInitializers((Collection) getSpringFactoriesInstances(ApplicationContextInitializer.class));
//***
}
并且会在容器创建之后刷新之前执行ApplicationContextInitializer的initialize方法。
private void prepareContext(DefaultBootstrapContext bootstrapContext, ConfigurableApplicationContext context,
ConfigurableEnvironment environment, SpringApplicationRunListeners listeners,
ApplicationArguments applicationArguments, Banner printedBanner) {
//***
applyInitializers(context);
//***
}
protected void applyInitializers(ConfigurableApplicationContext context) {
for (ApplicationContextInitializer initializer : getInitializers()) {
Class<?> requiredType = GenericTypeResolver.resolveTypeArgument(initializer.getClass(),
ApplicationContextInitializer.class);
Assert.isInstanceOf(requiredType, context, "Unable to call initializer.");
initializer.initialize(context);
}
}
所以在应用启动的时候,ApolloApplicationContextInitializer的initialize会被调用到。
@Override
public void initialize(ConfigurableApplicationContext context) {
ConfigurableEnvironment environment = context.getEnvironment();
if (!environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED, Boolean.class, false)) {
logger.debug("Apollo bootstrap config is not enabled for context {}, see property: ${{}}", context, PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED);
return;
}
logger.debug("Apollo bootstrap config is enabled for context {}", context);
initialize(environment);
}
调用内部initialize方法进行初始化操作:
protected void initialize(ConfigurableEnvironment environment) {
if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
return;
}
String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION);
logger.debug("Apollo bootstrap namespaces: {}", namespaces);
List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces);
CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
for (String namespace : namespaceList) {
Config config = ConfigService.getConfig(namespace);
composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
}
environment.getPropertySources().addFirst(composite);
}
调用ConfigService#getConfig获取每个namespace的配置信息,具体会委托给DefaultConfigManager的实现:
public Config getConfig(String namespace) {
Config config = m_configs.get(namespace);
if (config == null) {
synchronized (this) {
config = m_configs.get(namespace);
if (config == null) {
ConfigFactory factory = m_factoryManager.getFactory(namespace);
config = factory.create(namespace);
m_configs.put(namespace, config);
}
}
}
return config;
}
由于系统刚启动,Config还没被缓存,所以会通过调用ConfigFactory的create方法创建Config.
public Config create(String namespace) {
ConfigFileFormat format = determineFileFormat(namespace);
if (ConfigFileFormat.isPropertiesCompatible(format)) {
return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
}
return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
}
然后会调用到RemoteConfigRepository的构造方法:
public RemoteConfigRepository(String namespace) {
//省略
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
}
里边调用了三个方法,首次同步、定时刷新和长轮询刷新。
先看一下首次同步:
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
ApolloConfig previous = m_configCache.get();
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
if (previous != current) {
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
this.fireRepositoryChange(m_namespace, this.getConfig());
}
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
将本地缓存和远程加载的数据进行对比,如果不一致,用远程覆盖本地,然后触发变更事件fireRepositoryChange:
protected void fireRepositoryChange(String namespace, Properties newProperties) {
for (RepositoryChangeListener listener : m_listeners) {
try {
listener.onRepositoryChange(namespace, newProperties);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
}
}
}
然后会调用触发配置变更,调用ConfigChangeListener的逻辑:
protected void fireConfigChange(final ConfigChangeEvent changeEvent) {
for (final ConfigChangeListener listener : m_listeners) {
// check whether the listener is interested in this change event
if (!isConfigChangeListenerInterested(listener, changeEvent)) {
continue;
}
m_executorService.submit(new Runnable() {
@Override
public void run() {
String listenerName = listener.getClass().getName();
Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);
try {
listener.onChange(changeEvent);
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
Tracer.logError(ex);
logger.error("Failed to invoke config change listener {}", listenerName, ex);
} finally {
transaction.complete();
}
}
});
}
}
对于定时刷新和长轮询刷新这两个功能在 apollo 的 github 文档中有介绍:
1.客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送
2.客户端还会定时从Apollo配置中心拉取应用的最新配置
这是一个fallback机制,为了防止推送机制失效导致配置不更新
客户端定时拉取会上报本地版本,通常对于定时拉取操作,服务端都会返回304
定时频率默认每5分钟拉一次,客户端也可以通过在运行时指定来覆盖,单位分钟。3.客户端从Apollo配置中心获取应用的最新配置后,会保存在内存中
4.客户端会把从服务端获取到的配置在本地缓存一份
遇到服务不可用,或网络不通时,依然能从本地恢复配置
5应用程序可以从Apollo客户端获取最新的配置、订阅配置更新通知
长连接是更新配置的主要手段,定时刷新是辅助手段,避免长轮训失败造成数据更新丢失。
看一下定时刷新实现:
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
logger.debug("refresh config for namespace: {}", m_namespace);
trySync();
Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshIntervalTimeUnit());
}
定时调用trySync方法实现数据同步,然后触发ConfigChangeListener逻辑。
然后看一下长轮询实现:
private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}
调用startLongPolling方法开启长轮询:
private void startLongPolling() {
try {
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final String dataCenter = m_configUtil.getDataCenter();
final String secret = m_configUtil.getAccessKeySecret();
final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
if (longPollingInitialDelayInMills > 0) {
try {
TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
} catch (InterruptedException e) {
}
}
doLongPollingRefresh(appId, cluster, dataCenter, secret);
}
});
} catch (Throwable ex) {
m_longPollStarted.set(false);
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
}
}
调用doLongPollingRefresh方法执行长轮询刷新逻辑:
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
String url = null;
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}
url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);
HttpRequest request = new HttpRequest(url);
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
transaction.addData("Url", url);
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpUtil.doGet(request, m_responseType);
if (response.getStatusCode() == 200 && response.getBody() != null) {
updateNotifications(response.getBody());
updateRemoteNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
notify(lastServiceDto, response.getBody());
}
if (response.getStatusCode() == 304 && random.nextBoolean()) {
lastServiceDto = null;
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
}
} finally {
transaction.complete();
}
}
}
每5s主动从Apollo Server拉取数据,如果请求成功,通知RemoteConfigRepository:
public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
m_longPollServiceDto.set(longPollNotifiedServiceDto);
m_remoteMessages.set(remoteMessages);
m_executorService.submit(new Runnable() {
@Override
public void run() {
m_configNeedForceRefresh.set(true);
trySync();
}
});
}
和定时刷新一样,也调用到了trySync逻辑,最后触发注册到对应namespace的Config上的ConfigChangeListener逻辑。
3.变更通知执行
用户更新配置时,客户端如何监听到变更事件并做出响应处理呢?
基于前一小结,如果用户发布了属性变更,RemoteConfigRepository的定时刷新或长轮询逻辑会从Apollo Server拉取最新数据到本地,然后和本地缓存(上一个版本数据)做对比,如果发现不一样则触发配置变更,调用ConfigChangeListener逻辑。
四、相关实现
1.redis事件通知
比如我们要监听redis中的key失效事件,本地做一些定制化逻辑,那么就需要开启redis事件通知能力,然后本地做实现KeyExpirationEventMessageListener接口:
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 针对redis数据失效事件,进行逻辑处理
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
// 失效逻辑
}
}
}
redis中的key失效时会触发KeyExpirationEventMessageListener的onMessage,这样就实现了redis客户端的事件通知。
2.zookeeper watcher机制
在使用zk做做注册中心或者分布式锁场景,我们需要监听zk的节点变更事件,比如节点被删除,那么客户端就需要监听该事件,然后本地做一些逻辑处理。
public class WatcherDemo implements Watcher{
public void process(WatchedEvent event) {
//do something
}
}
节点变更事件类型有NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged和None,对于注册中心场景,服务消费者监听到服务节点被删除,那么可以在本地剔除远程服务节点。
五、为什么使用长轮询
关于为什么使用 HTTP 长轮询,估计接触 Apollo 的人看到客户端通知实现方式时都会疑惑,为什么使用这种方式,而不是其他方式?
在网上找到了Apollo作者对该问题的解答
- 为什么不使用消息系统?太复杂,杀鸡用牛刀。
- 为什么不用 TCP 长连接?对网络环境要求高,容易推送失败。且有双写问题。
- 为什么使用 HTTP 长轮询?性能足够,结合 Servlet3 的异步特性,能够维持万级连接(一个客户端只有一个长连接)。直接使用 Servlet 的 HTTP 协议,比单独用 TCP 连接方便。HTTP 请求/响应模式,保证了不会出现双写的情况。最主要还是简单,性能暂时不是瓶颈。