之前一直用kafka,也了解了kafka的本身的一些机制,包括顺序读、顺序写、零拷贝、分治、水位等。但一直没详细的了解下kafka消费端是如何工作的。 趁着假期分析下,环境如下:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
<relativePath/>
</parent>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.1</version>
</dependency>
我们都知道如何使用,yml文件中配置一下,加个注解
spring:
kafka:
bootstrap-servers: 10.255.200.214:9092
consumer:
group-id: ${spring.application.name}_consumer
enable-auto-commit: true
auto-commit-interval: 5000
auto-offset-reset: latest #earliest
max-poll-records: 1
producer:
client-id: ${spring.application.name}_producer
retries: 3
batch-size: 1048576
buffer-memory: 6291456
acks: all
compression-type: gzip
listener:
type: batch
concurrency: 5
@KafkaListener(topics = {"topic"}, containerFactory = "kafkaListenerContainerFactory")
public void single(ConsumerRecord<String, String> consumerRecord) {
}
我们先看下kafka如何被引入到spring中的
在spring的autoconfigure包中。
KafkaAutoConfiguration 中Import了KafkaAnnotationDrivenConfiguration
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
}
KafkaAnnotationDrivenConfiguration中定义了Bean EnableKafkaConfiguration,用了@EnableKafka,同时在这个类里还定义了
AbstractKafkaListenerContainerFactory的实现类:ConcurrentKafkaListenerContainerFactory
class KafkaAnnotationDrivenConfiguration {
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
//这里将ConcurrentKafkaListenerContainerFactoryConfigurer中的参数映射了到ConcurrentKafkaListenerContainerFactory中,所以factory有各种要插入的参数以及记录拦截器、消息转换器、异常处理器等
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
}
@Configuration(proxyBeanMethods = false)
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
static class EnableKafkaConfiguration {
}
}
@EnableKafka中import了KafkaListenerConfigurationSelector
@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}
KafkaListenerConfigurationSelector最终导入了KafkaBootstrapConfiguration
@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
return new String[] { KafkaBootstrapConfiguration.class.getName() };
}
}
KafkaBootstrapConfiguration通过registerBeanDefinitions注入了 KafkaListenerAnnotationBeanPostProcessor和KafkaListenerEndpointRegistry
public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
if (!registry.containsBeanDefinition(
KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
}
if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
}
}
}
在KafkaListenerAnnotationBeanPostProcessor中因为实现了BeanPostProcessor所以具体可以看下postProcessAfterInitialization(bean实例化以后会触发这里),在这里将注解为KafkaListener的方法找了出来,并组装实例,启动
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List
Map<Method, Set
(MethodIntrospector.MetadataLookup<Set
Set
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
Set
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
processKafkaListener(listener, method, bean, beanName);
}
}
this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
- beanName + "': " + annotatedMethods);
}
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}
}
到这里,我们拆为两个分支,一条线看下postProcessAfterInitialization如何被执行,一条线看如何加载KafkaListener
#### postProcessAfterInitialization如何被执行
BeanPostProcessor是Spring IOC容器提供的一个扩展接口
public interface BeanPostProcessor {
//bean初始化调用前调用(全局)
@Nullable
default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
//bean初始化调用后调用(全局)
@Nullable
default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
}
public abstract class AbstractAutowireCapableBeanFactory extends AbstractBeanFactory implements AutowireCapableBeanFactory {
// 在applyBeanPostProcessorsAfterInitialization中调用了postProcessAfterInitialization
@Override
public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName) throws BeansException {
Object result = existingBean;
//获取所有的BeanPostProcessor对象(这块是关键点)
for (BeanPostProcessor processor : getBeanPostProcessors()) {
//挨个执行postProcessAfterInitialization
Object current = processor.postProcessAfterInitialization(result, beanName);
}
return result;
}
// applyBeanPostProcessorsAfterInitialization 被以下三个方法(resolveBeforeInstantiation,initializeBean,postProcessObjectFromFactoryBean)调用
protected Object resolveBeforeInstantiation(String beanName, RootBeanDefinition mbd) {
bean = applyBeanPostProcessorsAfterInitialization(bean, beanName);
}
//实例化bean(必走方法)
protected Object initializeBean(String beanName, Object bean, @Nullable RootBeanDefinition mbd) {
// Aware处理
invokeAwareMethods(beanName, bean);
// 前置处理
wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
// 执行initMethod
invokeInitMethods(beanName, wrappedBean, mbd);
// 后置处理
wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);
return wrappedBean;
}
protected Object postProcessObjectFromFactoryBean(Object object, String beanName) {
return applyBeanPostProcessorsAfterInitialization(object, beanName);
}
//我们重点跟进下initializeBean,initializeBean被以下三个方法(configureBean,initializeBean,doCreateBean)调用
public Object configureBean(Object existingBean, String beanName) throws BeansException {
BeanWrapper bw = new BeanWrapperImpl(existingBean);
initBeanWrapper(bw);
populateBean(beanName, bd, bw);
return initializeBean(beanName, existingBean, bd);
}
public Object initializeBean(Object existingBean, String beanName) {
return initializeBean(beanName, existingBean, null);
}
//实例化Bean
protected Object doCreateBean(String beanName, RootBeanDefinition mbd, @Nullable Object[] args)
throws BeanCreationException {
BeanWrapper instanceWrapper = null;
if (mbd.isSingleton()) {
instanceWrapper = this.factoryBeanInstanceCache.remove(beanName);
}
if (instanceWrapper == null) {
instanceWrapper = createBeanInstance(beanName, mbd, args);
}
Object bean = instanceWrapper.getWrappedInstance();
// Initialize the bean instance.
Object exposedObject = bean;
try {
//构建Bean,主要是init-method和依赖bean的处理
populateBean(beanName, mbd, instanceWrapper);
//实例化Bean
exposedObject = initializeBean(beanName, exposedObject, mbd);
}
// Register bean as disposable.
try {
registerDisposableBeanIfNecessary(beanName, bean, mbd);
}
}
//构建Bean
protected void populateBean(String beanName, RootBeanDefinition mbd, @Nullable BeanWrapper bw) {
PropertyValues pvs = (mbd.hasPropertyValues() ? mbd.getPropertyValues() : null);
//根据AutowireMode构建类型,这里会把依赖bean也注册了
int resolvedAutowireMode = mbd.getResolvedAutowireMode();
if (resolvedAutowireMode == AUTOWIRE_BY_NAME || resolvedAutowireMode == AUTOWIRE_BY_TYPE) {
MutablePropertyValues newPvs = new MutablePropertyValues(pvs);
// Add property values based on autowire by name if applicable.
if (resolvedAutowireMode == AUTOWIRE_BY_NAME) {
autowireByName(beanName, mbd, bw, newPvs);
}
// Add property values based on autowire by type if applicable.
if (resolvedAutowireMode == AUTOWIRE_BY_TYPE) {
autowireByType(beanName, mbd, bw, newPvs);
}
pvs = newPvs;
}
//
boolean hasInstAwareBpps = hasInstantiationAwareBeanPostProcessors();
boolean needsDepCheck = (mbd.getDependencyCheck() != AbstractBeanDefinition.DEPENDENCY_CHECK_NONE);
PropertyDescriptor[] filteredPds = null;
if (hasInstAwareBpps) {
//有初始化方法,设置属性
for (BeanPostProcessor bp : getBeanPostProcessors()) {
if (bp instanceof InstantiationAwareBeanPostProcessor) {
InstantiationAwareBeanPostProcessor ibp = (InstantiationAwareBeanPostProcessor) bp;
PropertyValues pvsToUse = ibp.postProcessProperties(pvs, bw.getWrappedInstance(), beanName);
if (pvsToUse == null) {
if (filteredPds == null) {
filteredPds = filterPropertyDescriptorsForDependencyCheck(bw, mbd.allowCaching);
}
pvsToUse = ibp.postProcessPropertyValues(pvs, filteredPds, bw.getWrappedInstance(), beanName);
if (pvsToUse == null) {
return;
}
}
pvs = pvsToUse;
}
}
}
if (needsDepCheck) {
//有依赖,设置依赖属性
if (filteredPds == null) {
filteredPds = filterPropertyDescriptorsForDependencyCheck(bw, mbd.allowCaching);
}
checkDependencies(beanName, mbd, filteredPds, pvs);
}
if (pvs != null) {
applyPropertyValues(beanName, mbd, bw, pvs);
}
}
}
configureBean和initializeBean 最后追到BeanConfigurerSupport.configureBean 和
public class BeanConfigurerSupport implements BeanFactoryAware, InitializingBean, DisposableBean {
public void configureBean(Object beanInstance) {
beanFactory.initializeBean(beanInstance, (beanName != null ? beanName : ""));
try {
String beanName = bwi.getBeanName();
if (bwi.indicatesAutowiring() || (bwi.isDefaultBeanName() && beanName != null &&
!beanFactory.containsBean(beanName))) {
// Perform autowiring (also applying standard factory / post-processor callbacks).
beanFactory.autowireBeanProperties(beanInstance, bwi.getAutowireMode(), bwi.getDependencyCheck());
beanFactory.initializeBean(beanInstance, (beanName != null ? beanName : ""));
}
else {
// Perform explicit wiring based on the specified bean definition.
beanFactory.configureBean(beanInstance, (beanName != null ? beanName : ""));
}
}
}
}
doCreateBean 最后追到的地方就比较多了AbstractAutowireCapableBeanFactory.createBean、AbstractBeanFactory.createBean、BeanDefinitionValueResolver.resolveInnerBean 整理下流程
到这我们知道KafkaListenerAnnotationBeanPostProcessor是如何运行了吧
我们再看下如何加载KafkaListener
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class> targetClass = AopUtils.getTargetClass(bean);
//获取KafkaListener标注的类
Collection classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List multiMethods = new ArrayList<>();
//解析方法上标注KafkaListener
Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
//传入的回调函数
(MethodIntrospector.MetadataLookup>) method -> {
//通过回调函数将KafkaListener的方法都找出来
Set listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
//类级别的监听需要结合@KafkaHandler执行
Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
//annotatedMethods非空时
for (Map.Entry> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
//处理kafkaListener
processKafkaListener(listener, method, bean, beanName);
}
}
//类级别的监听特殊处理
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}
//通过回调函数将KafkaListener的方法都找出来
private Set findListenerAnnotations(Method method) {
Set listeners = new HashSet<>();
KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class);
if (ann != null) {
listeners.add(ann);
}
KafkaListeners anns = AnnotationUtils.findAnnotation(method, KafkaListeners.class);
if (anns != null) {
listeners.addAll(Arrays.asList(anns.value()));
}
return listeners;
}
我们看下 processKafkaListener中干了什么?
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
//这里做了一次jdk动态代理的判断
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
}
protected void processListener(MethodKafkaListenerEndpoint, ?> endpoint, KafkaListener kafkaListener,
Object bean, Object adminTarget, String beanName) {
String beanRef = kafkaListener.beanRef();
if (StringUtils.hasText(beanRef)) {
this.listenerScope.addListener(beanRef, bean);
}
// 填充MethodKafkaListenerEndpoint的实例对象
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
// 从KafkaListener注解中获取配置的topc,partition
endpoint.setId(getEndpointId(kafkaListener));
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
//partitions配置
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
//可能有多个topic
endpoint.setTopics(resolveTopics(kafkaListener));
//这个应该通过表达式来拉取topic
endpoint.setTopicPattern(resolvePattern(kafkaListener));
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
String group = kafkaListener.containerGroup();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
String concurrency = kafkaListener.concurrency();
if (StringUtils.hasText(concurrency)) {
//设置并行线程数
endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
}
String autoStartup = kafkaListener.autoStartup();
if (StringUtils.hasText(autoStartup)) {
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
}
//解析其他额外的参数
resolveKafkaProperties(endpoint, kafkaListener.properties());
endpoint.setSplitIterables(kafkaListener.splitIterables());
KafkaListenerContainerFactory> factory = null;
//通过配置的工厂名称从ioc中获取实例,并设置进去,这个工厂是在KafkaAnnotationDrivenConfiguration定义的ConcurrentKafkaListenerContainerFactory
String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
}
}
endpoint.setBeanFactory(this.beanFactory);
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
}
//最终将kafka的监听实例构建好以后注入到KafkaListenerEndpointRegistrar的实例registrar中
this.registrar.registerEndpoint(endpoint, factory);
if (StringUtils.hasText(beanRef)) {
this.listenerScope.removeListener(beanRef);
}
}
我们在看下注入到KafkaListenerEndpointRegistrar的实例以后干了什么?
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory> factory) {
//构建一个KafkaListenerEndpointDescriptor
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
//如果this.startImmediately 立刻启动为true
if (this.startImmediately) { // Register and start immediately
// 直接启动
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
}
public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
ApplicationListener
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
boolean startImmediately) {
String id = endpoint.getId();
synchronized (this.listenerContainers) {
//最终将参数都包装到了MessageListenerContainer中,作为一个Listener,通过工厂创建,并将参数都初始化进去(ps这个工厂是ConcurrentKafkaListenerContainerFactory)最终创建的对象是ConcurrentMessageListenerContainer
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List
//通过这可以看到,kafka是以group为单元的,一个group只能有一个实例,有就获取,没有就创建一个单例的
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (startImmediately) {
//自启动
startIfNecessary(container);
}
}
}
//创建一个MessageListenerContainer
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {
//在ConcurrentKafkaListenerContainerFactory中实现最终调用的是AbstractKafkaListenerContainerFactory最终创建了一个ConcurrentMessageListenerContainer
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
int containerPhase = listenerContainer.getPhase();
if (listenerContainer.isAutoStartup() &&
containerPhase != AbstractMessageListenerContainer.DEFAULT_PHASE) { // a custom phase value
if (this.phase != AbstractMessageListenerContainer.DEFAULT_PHASE && this.phase != containerPhase) {
throw new IllegalStateException("Encountered phase mismatch between container "
+ "factory definitions: " + this.phase + " vs " + containerPhase);
}
this.phase = listenerContainer.getPhase();
}
return listenerContainer;
}
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
//所以最终是ConcurrentMessageListenerContainer.start()
listenerContainer.start();
}
}
}
public class ConcurrentKafkaListenerContainerFactory
extends AbstractKafkaListenerContainerFactory, K, V> {
@Override
protected ConcurrentMessageListenerContainer createContainerInstance(KafkaListenerEndpoint endpoint) {
TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
if (topicPartitions != null && topicPartitions.length > 0) {
ContainerProperties properties = new ContainerProperties(topicPartitions);
//最终还会到AbstractKafkaListenerContainerFactory的AbstractMessageListenerContainer
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
else {
Collection topics = endpoint.getTopics();
if (!topics.isEmpty()) {
ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
else {
ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern());
return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
}
}
}
@Override
protected void initializeContainer(ConcurrentMessageListenerContainer instance,
KafkaListenerEndpoint endpoint) {
//最终又到抽象类中的initializeContainer
super.initializeContainer(instance, endpoint);
//设置线程数,一个是kafkaListener中的(优先),一个是配置中的
if (endpoint.getConcurrency() != null) {
instance.setConcurrency(endpoint.getConcurrency());
}
else if (this.concurrency != null) {
instance.setConcurrency(this.concurrency);
}
}
}
//抽象类的关键方法
public abstract class AbstractKafkaListenerContainerFactory, K, V>
implements KafkaListenerContainerFactory, ApplicationEventPublisherAware, InitializingBean,
ApplicationContextAware {
@Override
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
//通过ConcurrentKafkaListenerContainerFactory创建一个ConcurrentMessageListenerContainer
C instance = createContainerInstance(endpoint);
JavaUtils.INSTANCE
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
configureEndpoint((AbstractKafkaListenerEndpoint) endpoint);
}
//设置消息转换器(内有玄机,是一个适配器)
endpoint.setupListenerContainer(instance, this.messageConverter);
initializeContainer(instance, endpoint);
customizeContainer(instance);
return instance;
}
//初始化指定容器,将一些配置都初始化进去
protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
ContainerProperties properties = instance.getContainerProperties();
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
"messageListener", "ackCount", "ackTime", "subBatchPerPartition", "kafkaConsumerProperties");
JavaUtils.INSTANCE
.acceptIfNotNull(this.afterRollbackProcessor, instance::setAfterRollbackProcessor)
.acceptIfCondition(this.containerProperties.getAckCount() > 0, this.containerProperties.getAckCount(),
properties::setAckCount)
.acceptIfCondition(this.containerProperties.getAckTime() > 0, this.containerProperties.getAckTime(),
properties::setAckTime)
.acceptIfNotNull(this.containerProperties.getSubBatchPerPartition(),
properties::setSubBatchPerPartition)
.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler)
.acceptIfNotNull(this.missingTopicsFatal, instance.getContainerProperties()::setMissingTopicsFatal);
if (endpoint.getAutoStartup() != null) {
instance.setAutoStartup(endpoint.getAutoStartup());
}
else if (this.autoStartup != null) {
instance.setAutoStartup(this.autoStartup);
}
instance.setRecordInterceptor(this.recordInterceptor);
JavaUtils.INSTANCE
.acceptIfNotNull(this.phase, instance::setPhase)
.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
.acceptIfNotNull(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
.acceptIfNotNull(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
.acceptIfNotNull(endpoint.getConsumerProperties(),
instance.getContainerProperties()::setKafkaConsumerProperties);
}
private void customizeContainer(C instance) {
if (this.containerCustomizer != null) {
this.containerCustomizer.configure(instance);
}
}
}
start()执行,最终由AbstractMessageListenerContainer.start()重写了Lifecycle.start()方法,在start()方法里调用了抽象方法doStart()。dostart()方法最后由ConcurrentMessageListenerContainer实现
public class ConcurrentMessageListenerContainer extends AbstractMessageListenerContainer {
@Override
protected void doStart() {
if (!isRunning()) {
checkTopics();
ContainerProperties containerProperties = getContainerProperties();
TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
//如果线程数大于partitions的个数,那么线程数设置为partitions的个数
if (topicPartitions != null && this.concurrency > topicPartitions.length) {
this.concurrency = topicPartitions.length;
}
setRunning(true);
//构建KafkaMessageListenerContainer
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer container =
constructContainer(containerProperties, topicPartitions, i);
String beanName = getBeanName();
container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
container.setApplicationContext(getApplicationContext());
if (getApplicationEventPublisher() != null) {
container.setApplicationEventPublisher(getApplicationEventPublisher());
}
container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + i : "");
container.setGenericErrorHandler(getGenericErrorHandler());
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
container.setRecordInterceptor(getRecordInterceptor());
container.setInterceptBeforeTx(isInterceptBeforeTx());
container.setEmergencyStop(() -> {
stop(() -> {
// NOSONAR
});
publishContainerStoppedEvent();
});
if (isPaused()) {
container.pause();
}
//最终调用的是KafkaMessageListenerContainer的dostart()
container.start();
this.containers.add(container);
}
}
}
}
public abstract class AbstractMessageListenerContainer
implements GenericMessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware,
ApplicationContextAware {
@Override
public final void start() {
checkGroupId();
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
doStart();
}
}
}
}
public class KafkaMessageListenerContainer // NOSONAR line count
extends AbstractMessageListenerContainer {
@Override
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
//非自动提交的情况下,默认acktime是5秒
checkAckMode(containerProperties);
Object messageListener = containerProperties.getMessageListener();
//设置消费者的异步线程池AsyncListenableTaskExecutor
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
GenericMessageListener> listener = (GenericMessageListener>) messageListener;
//推断监听类型
ListenerType listenerType = determineListenerType(listener);
//根据ListenerType构建ListenerConsumer
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
this.startLatch = new CountDownLatch(1);
//提交一个异步任务
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
try {
if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) {
this.logger.error("Consumer thread failed to start - does the configured task executor "
+ "have enough threads to support all containers and concurrency?");
//等的时间太长会通过spring抛出一个Event
publishConsumerFailedToStart();
}
}
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
我们看下
private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
@Override
public void run() {
ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
//发送消费Starting事件
publishConsumerStartingEvent();
this.consumerThread = Thread.currentThread();
setupSeeks();
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
this.count = 0;
this.last = System.currentTimeMillis();
initAssignedPartitions();
//发送消费Started事件
publishConsumerStartedEvent();
Throwable exitThrowable = null;
while (isRunning()) {
try {
//关键
pollAndInvoke();
}
catch ( e) {
//各种异常处理
}
}
wrapUp(exitThrowable);
}
//这块就是kafka的核心逻辑
protected void pollAndInvoke() {
if (!this.autoCommit && !this.isRecordAck) {
//非自动提交处理
processCommits();
}
//事务修复
fixTxOffsetsIfNeeded();
//空闲轮训逻辑
idleBetweenPollIfNecessary();
if (this.seeks.size() > 0) {
//处理offset移动
processSeeks();
}
//有必要暂停消费
pauseConsumerIfNecessary();
this.lastPoll = System.currentTimeMillis();
if (!isRunning()) {
return;
}
this.polling.set(true);
//这里会checkRebalanceCommits
ConsumerRecords records = doPoll();
if (!this.polling.compareAndSet(true, false) && records != null) {
/*
* There is a small race condition where wakeIfNecessary was called between
* exiting the poll and before we reset the boolean.
*/
if (records.count() > 0) {
this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());
}
return;
}
//有必要恢复消费
resumeConsumerIfNeccessary();
debugRecords(records);
if (records != null && records.count() > 0) {
savePositionsIfNeeded(records);
if (this.containerProperties.getIdleEventInterval() != null) {
this.lastReceive = System.currentTimeMillis();
}
//最终执行监听
invokeListener(records);
}
else {
checkIdle();
}
}
private void invokeListener(final ConsumerRecords records) {
if (this.isBatchListener) {
invokeBatchListener(records);
}
else {
invokeRecordListener(records);
}
}
//批量消费
private void invokeBatchListener(final ConsumerRecords records) {
List> recordList = null;
if (!this.wantsFullRecords) {
recordList = createRecordList(records);
}
if (this.wantsFullRecords || recordList.size() > 0) {
if (this.transactionTemplate != null) {
invokeBatchListenerInTx(records, recordList);
}
else {
doInvokeBatchListener(records, recordList);
}
}
}
//单条消费
private void invokeRecordListener(final ConsumerRecords records) {
if (this.transactionTemplate != null) {
invokeRecordListenerInTx(records);
}
else {
doInvokeWithRecords(records);
}
}
private void doInvokeWithRecords(final ConsumerRecords records) {
Iterator> iterator = records.iterator();
//单条消费内部是一个迭代器
while (iterator.hasNext()) {
if (this.stopImmediate && !isRunning()) {
break;
}
final ConsumerRecord record = checkEarlyIntercept(iterator.next());
if (record == null) {
continue;
}
this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
doInvokeRecordListener(record, iterator);
if (this.nackSleep >= 0) {
handleNack(records, record);
break;
}
}
}
}