最新消息:www.yxkong.com也可以访问

spring与springboot中kafka消费分析

常用工具 yxkong 101浏览

之前一直用kafka,也了解了kafka的本身的一些机制,包括顺序读、顺序写、零拷贝、分治、水位等。但一直没详细的了解下kafka消费端是如何工作的。 趁着假期分析下,环境如下:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
        <relativePath/>
    </parent>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.1</version>
    </dependency>

我们都知道如何使用,yml文件中配置一下,加个注解

spring:
  kafka:
    bootstrap-servers: 10.255.200.214:9092
    consumer:
      group-id: ${spring.application.name}_consumer
      enable-auto-commit: true
      auto-commit-interval: 5000
      auto-offset-reset: latest  #earliest
      max-poll-records: 1
    producer:
      client-id: ${spring.application.name}_producer
      retries: 3
      batch-size: 1048576
      buffer-memory: 6291456
      acks: all
      compression-type: gzip
    listener:
      type: batch
      concurrency: 5

@KafkaListener(topics = {"topic"}, containerFactory = "kafkaListenerContainerFactory")
public void single(ConsumerRecord<String, String> consumerRecord) {

}


我们先看下kafka如何被引入到spring中的

在spring的autoconfigure包中。

KafkaAutoConfiguration 中Import了KafkaAnnotationDrivenConfiguration

@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {

}


KafkaAnnotationDrivenConfiguration中定义了Bean EnableKafkaConfiguration,用了@EnableKafka,同时在这个类里还定义了

AbstractKafkaListenerContainerFactory的实现类:ConcurrentKafkaListenerContainerFactory

class KafkaAnnotationDrivenConfiguration {
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
//这里将ConcurrentKafkaListenerContainerFactoryConfigurer中的参数映射了到ConcurrentKafkaListenerContainerFactory中,所以factory有各种要插入的参数以及记录拦截器、消息转换器、异常处理器等
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
}

@Configuration(proxyBeanMethods = false)
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
static class EnableKafkaConfiguration {

}

}


@EnableKafka中import了KafkaListenerConfigurationSelector

@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}


KafkaListenerConfigurationSelector最终导入了KafkaBootstrapConfiguration

@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {

@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
    return new String[] { KafkaBootstrapConfiguration.class.getName() };
}

}


KafkaBootstrapConfiguration通过registerBeanDefinitions注入了 KafkaListenerAnnotationBeanPostProcessor和KafkaListenerEndpointRegistry

public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    if (!registry.containsBeanDefinition(
            KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

        registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
                new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
    }

    if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
        registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
    }
}

}


在KafkaListenerAnnotationBeanPostProcessor中因为实现了BeanPostProcessor所以具体可以看下postProcessAfterInitialization(bean实例化以后会触发这里),在这里将注解为KafkaListener的方法找了出来,并组装实例,启动

public class KafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List multiMethods = new ArrayList<>();
Map<Method, Set> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set>) method -> {
Set listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
processKafkaListener(listener, method, bean, beanName);
}
}
this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"

  • beanName + "': " + annotatedMethods);
    }
    if (hasClassLevelListeners) {
    processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
    }
    }
    return bean;
    }
    }

到这里,我们拆为两个分支,一条线看下postProcessAfterInitialization如何被执行,一条线看如何加载KafkaListener

#### postProcessAfterInitialization如何被执行

BeanPostProcessor是Spring IOC容器提供的一个扩展接口

public interface BeanPostProcessor {
//bean初始化调用前调用(全局)
@Nullable
default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
//bean初始化调用后调用(全局)
@Nullable
default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
}

public abstract class AbstractAutowireCapableBeanFactory extends AbstractBeanFactory implements AutowireCapableBeanFactory {
   // 在applyBeanPostProcessorsAfterInitialization中调用了postProcessAfterInitialization
    @Override
    public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName) throws BeansException {
        Object result = existingBean;
        //获取所有的BeanPostProcessor对象(这块是关键点)
        for (BeanPostProcessor processor : getBeanPostProcessors()) {
            //挨个执行postProcessAfterInitialization
            Object current = processor.postProcessAfterInitialization(result, beanName);
        }
        return result;
    }

    // applyBeanPostProcessorsAfterInitialization 被以下三个方法(resolveBeforeInstantiation,initializeBean,postProcessObjectFromFactoryBean)调用

    protected Object resolveBeforeInstantiation(String beanName, RootBeanDefinition mbd) {
        bean = applyBeanPostProcessorsAfterInitialization(bean, beanName);
    }
    //实例化bean(必走方法)
    protected Object initializeBean(String beanName, Object bean, @Nullable RootBeanDefinition mbd) {
        // Aware处理
        invokeAwareMethods(beanName, bean);

        // 前置处理
        wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);

        // 执行initMethod
        invokeInitMethods(beanName, wrappedBean, mbd);

        // 后置处理
        wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);

        return wrappedBean;
    }
    protected Object postProcessObjectFromFactoryBean(Object object, String beanName) {
        return applyBeanPostProcessorsAfterInitialization(object, beanName);
    }

    //我们重点跟进下initializeBean,initializeBean被以下三个方法(configureBean,initializeBean,doCreateBean)调用

    public Object configureBean(Object existingBean, String beanName) throws BeansException {
        BeanWrapper bw = new BeanWrapperImpl(existingBean);
        initBeanWrapper(bw);
        populateBean(beanName, bd, bw);
        return initializeBean(beanName, existingBean, bd);
    }

    public Object initializeBean(Object existingBean, String beanName) {
        return initializeBean(beanName, existingBean, null);
    }
    //实例化Bean
    protected Object doCreateBean(String beanName, RootBeanDefinition mbd, @Nullable Object[] args)
            throws BeanCreationException {
        BeanWrapper instanceWrapper = null;
        if (mbd.isSingleton()) {
            instanceWrapper = this.factoryBeanInstanceCache.remove(beanName);
        }
        if (instanceWrapper == null) {
            instanceWrapper = createBeanInstance(beanName, mbd, args);
        }
        Object bean = instanceWrapper.getWrappedInstance();
        // Initialize the bean instance.
        Object exposedObject = bean;
        try {
            //构建Bean,主要是init-method和依赖bean的处理
            populateBean(beanName, mbd, instanceWrapper);
            //实例化Bean
            exposedObject = initializeBean(beanName, exposedObject, mbd);
        }
        // Register bean as disposable.
        try {
            registerDisposableBeanIfNecessary(beanName, bean, mbd);
        }
    }
    //构建Bean
    protected void populateBean(String beanName, RootBeanDefinition mbd, @Nullable BeanWrapper bw) {

        PropertyValues pvs = (mbd.hasPropertyValues() ? mbd.getPropertyValues() : null);
        //根据AutowireMode构建类型,这里会把依赖bean也注册了
        int resolvedAutowireMode = mbd.getResolvedAutowireMode();
        if (resolvedAutowireMode == AUTOWIRE_BY_NAME || resolvedAutowireMode == AUTOWIRE_BY_TYPE) {
            MutablePropertyValues newPvs = new MutablePropertyValues(pvs);
            // Add property values based on autowire by name if applicable.
            if (resolvedAutowireMode == AUTOWIRE_BY_NAME) {
                autowireByName(beanName, mbd, bw, newPvs);
            }
            // Add property values based on autowire by type if applicable.
            if (resolvedAutowireMode == AUTOWIRE_BY_TYPE) {
                autowireByType(beanName, mbd, bw, newPvs);
            }
            pvs = newPvs;
        }
        //
        boolean hasInstAwareBpps = hasInstantiationAwareBeanPostProcessors();

        boolean needsDepCheck = (mbd.getDependencyCheck() != AbstractBeanDefinition.DEPENDENCY_CHECK_NONE);
        PropertyDescriptor[] filteredPds = null;
        if (hasInstAwareBpps) {

            //有初始化方法,设置属性
            for (BeanPostProcessor bp : getBeanPostProcessors()) {
                if (bp instanceof InstantiationAwareBeanPostProcessor) {
                    InstantiationAwareBeanPostProcessor ibp = (InstantiationAwareBeanPostProcessor) bp;
                    PropertyValues pvsToUse = ibp.postProcessProperties(pvs, bw.getWrappedInstance(), beanName);
                    if (pvsToUse == null) {
                        if (filteredPds == null) {
                            filteredPds = filterPropertyDescriptorsForDependencyCheck(bw, mbd.allowCaching);
                        }
                        pvsToUse = ibp.postProcessPropertyValues(pvs, filteredPds, bw.getWrappedInstance(), beanName);
                        if (pvsToUse == null) {
                            return;
                        }
                    }
                    pvs = pvsToUse;
                }
            }
        }
        if (needsDepCheck) {
            //有依赖,设置依赖属性
            if (filteredPds == null) {
                filteredPds = filterPropertyDescriptorsForDependencyCheck(bw, mbd.allowCaching);
            }
            checkDependencies(beanName, mbd, filteredPds, pvs);
        }

        if (pvs != null) {
            applyPropertyValues(beanName, mbd, bw, pvs);
        }
    }

}

configureBean和initializeBean 最后追到BeanConfigurerSupport.configureBean 和

public class BeanConfigurerSupport implements BeanFactoryAware, InitializingBean, DisposableBean {
    public void configureBean(Object beanInstance) {
        beanFactory.initializeBean(beanInstance, (beanName != null ? beanName : ""));
        try {
            String beanName = bwi.getBeanName();
            if (bwi.indicatesAutowiring() || (bwi.isDefaultBeanName() && beanName != null &&
                    !beanFactory.containsBean(beanName))) {
                // Perform autowiring (also applying standard factory / post-processor callbacks).
                beanFactory.autowireBeanProperties(beanInstance, bwi.getAutowireMode(), bwi.getDependencyCheck());
                beanFactory.initializeBean(beanInstance, (beanName != null ? beanName : ""));
            }
            else {
                // Perform explicit wiring based on the specified bean definition.
                beanFactory.configureBean(beanInstance, (beanName != null ? beanName : ""));
            }
        }
    }

}

doCreateBean 最后追到的地方就比较多了AbstractAutowireCapableBeanFactory.createBean、AbstractBeanFactory.createBean、BeanDefinitionValueResolver.resolveInnerBean 整理下流程

到这我们知道KafkaListenerAnnotationBeanPostProcessor是如何运行了吧

我们再看下如何加载KafkaListener

@Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
            Class targetClass = AopUtils.getTargetClass(bean);
            //获取KafkaListener标注的类
            Collection classLevelListeners = findListenerAnnotations(targetClass);
            final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
            final List multiMethods = new ArrayList<>();
            //解析方法上标注KafkaListener
            Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    //传入的回调函数
                    (MethodIntrospector.MetadataLookup>) method -> {
                        //通过回调函数将KafkaListener的方法都找出来
                        Set listenerMethods = findListenerAnnotations(method);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    });
            if (hasClassLevelListeners) {
                //类级别的监听需要结合@KafkaHandler执行
                Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                        (ReflectionUtils.MethodFilter) method ->
                                AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
                multiMethods.addAll(methodsWithHandler);
            }
            //annotatedMethods非空时
            for (Map.Entry> entry : annotatedMethods.entrySet()) {
                Method method = entry.getKey();
                for (KafkaListener listener : entry.getValue()) {
                    //处理kafkaListener 
                    processKafkaListener(listener, method, bean, beanName);
                }
            }
            //类级别的监听特殊处理
            if (hasClassLevelListeners) {
                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
            }
        }
        return bean;
    }
    //通过回调函数将KafkaListener的方法都找出来
    private Set findListenerAnnotations(Method method) {
        Set listeners = new HashSet<>();
        KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class);
        if (ann != null) {
            listeners.add(ann);
        }
        KafkaListeners anns = AnnotationUtils.findAnnotation(method, KafkaListeners.class);
        if (anns != null) {
            listeners.addAll(Arrays.asList(anns.value()));
        }
        return listeners;
    }

我们看下 processKafkaListener中干了什么?

    protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
        //这里做了一次jdk动态代理的判断
        Method methodToUse = checkProxy(method, bean);
        MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setMethod(methodToUse);
        processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
    }
    protected void processListener(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener,
            Object bean, Object adminTarget, String beanName) {

        String beanRef = kafkaListener.beanRef();
        if (StringUtils.hasText(beanRef)) {
            this.listenerScope.addListener(beanRef, bean);
        }
        // 填充MethodKafkaListenerEndpoint的实例对象
        endpoint.setBean(bean);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        // 从KafkaListener注解中获取配置的topc,partition
        endpoint.setId(getEndpointId(kafkaListener));
        endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
        //partitions配置
        endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
        //可能有多个topic
        endpoint.setTopics(resolveTopics(kafkaListener));
        //这个应该通过表达式来拉取topic
        endpoint.setTopicPattern(resolvePattern(kafkaListener));
        endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
        String group = kafkaListener.containerGroup();
        if (StringUtils.hasText(group)) {
            Object resolvedGroup = resolveExpression(group);
            if (resolvedGroup instanceof String) {
                endpoint.setGroup((String) resolvedGroup);
            }
        }

        String concurrency = kafkaListener.concurrency();
        if (StringUtils.hasText(concurrency)) {
            //设置并行线程数
            endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
        }
        String autoStartup = kafkaListener.autoStartup();
        if (StringUtils.hasText(autoStartup)) {
            endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
        }
        //解析其他额外的参数
        resolveKafkaProperties(endpoint, kafkaListener.properties());
        endpoint.setSplitIterables(kafkaListener.splitIterables());

        KafkaListenerContainerFactory factory = null;
        //通过配置的工厂名称从ioc中获取实例,并设置进去,这个工厂是在KafkaAnnotationDrivenConfiguration定义的ConcurrentKafkaListenerContainerFactory
        String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
            }
        }

        endpoint.setBeanFactory(this.beanFactory);
        String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
        if (StringUtils.hasText(errorHandlerBeanName)) {
            endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
        }
        //最终将kafka的监听实例构建好以后注入到KafkaListenerEndpointRegistrar的实例registrar中
        this.registrar.registerEndpoint(endpoint, factory);
        if (StringUtils.hasText(beanRef)) {
            this.listenerScope.removeListener(beanRef);
        }
    }

我们在看下注入到KafkaListenerEndpointRegistrar的实例以后干了什么?

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
    public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory factory) {

        //构建一个KafkaListenerEndpointDescriptor
        KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
        synchronized (this.endpointDescriptors) {
            //如果this.startImmediately 立刻启动为true
            if (this.startImmediately) { // Register and start immediately
                // 直接启动
                this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                        resolveContainerFactory(descriptor), true);
            }
            else {
                this.endpointDescriptors.add(descriptor);
            }
        }
    }
}

public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
ApplicationListener {
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
boolean startImmediately) {
String id = endpoint.getId();
synchronized (this.listenerContainers) {
//最终将参数都包装到了MessageListenerContainer中,作为一个Listener,通过工厂创建,并将参数都初始化进去(ps这个工厂是ConcurrentKafkaListenerContainerFactory)最终创建的对象是ConcurrentMessageListenerContainer
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List containerGroup;
//通过这可以看到,kafka是以group为单元的,一个group只能有一个实例,有就获取,没有就创建一个单例的
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (startImmediately) {
//自启动
startIfNecessary(container);
}
}
}
//创建一个MessageListenerContainer
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {
//在ConcurrentKafkaListenerContainerFactory中实现最终调用的是AbstractKafkaListenerContainerFactory最终创建了一个ConcurrentMessageListenerContainer
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);

    int containerPhase = listenerContainer.getPhase();
    if (listenerContainer.isAutoStartup() &&
            containerPhase != AbstractMessageListenerContainer.DEFAULT_PHASE) {  // a custom phase value
        if (this.phase != AbstractMessageListenerContainer.DEFAULT_PHASE && this.phase != containerPhase) {
            throw new IllegalStateException("Encountered phase mismatch between container "
                    + "factory definitions: " + this.phase + " vs " + containerPhase);
        }
        this.phase = listenerContainer.getPhase();
    }

    return listenerContainer;
}
private void startIfNecessary(MessageListenerContainer listenerContainer) {
    if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
     //所以最终是ConcurrentMessageListenerContainer.start()
        listenerContainer.start();
    }
}

}

public class ConcurrentKafkaListenerContainerFactory
        extends AbstractKafkaListenerContainerFactory, K, V> {
    @Override
    protected ConcurrentMessageListenerContainer createContainerInstance(KafkaListenerEndpoint endpoint) {
        TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
        if (topicPartitions != null && topicPartitions.length > 0) {
            ContainerProperties properties = new ContainerProperties(topicPartitions);
            //最终还会到AbstractKafkaListenerContainerFactory的AbstractMessageListenerContainer
            return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
        }
        else {
            Collection topics = endpoint.getTopics();
            if (!topics.isEmpty()) {
                ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
                return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
            }
            else {
                ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern());
                return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
            }
        }
    }
    @Override
    protected void initializeContainer(ConcurrentMessageListenerContainer instance,
            KafkaListenerEndpoint endpoint) {
        //最终又到抽象类中的initializeContainer
        super.initializeContainer(instance, endpoint);
        //设置线程数,一个是kafkaListener中的(优先),一个是配置中的
        if (endpoint.getConcurrency() != null) {
            instance.setConcurrency(endpoint.getConcurrency());
        }
        else if (this.concurrency != null) {
            instance.setConcurrency(this.concurrency);
        }
    }
}
//抽象类的关键方法
public abstract class AbstractKafkaListenerContainerFactory, K, V>
        implements KafkaListenerContainerFactory, ApplicationEventPublisherAware, InitializingBean,
            ApplicationContextAware {
    @Override
    public C createListenerContainer(KafkaListenerEndpoint endpoint) {
        //通过ConcurrentKafkaListenerContainerFactory创建一个ConcurrentMessageListenerContainer
        C instance = createContainerInstance(endpoint);
        JavaUtils.INSTANCE
                .acceptIfNotNull(endpoint.getId(), instance::setBeanName);
        if (endpoint instanceof AbstractKafkaListenerEndpoint) {
            configureEndpoint((AbstractKafkaListenerEndpoint) endpoint);
        }
        //设置消息转换器(内有玄机,是一个适配器)
        endpoint.setupListenerContainer(instance, this.messageConverter);
        initializeContainer(instance, endpoint);
        customizeContainer(instance);
        return instance;
    }
    //初始化指定容器,将一些配置都初始化进去
    protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
        ContainerProperties properties = instance.getContainerProperties();
        BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
                "messageListener", "ackCount", "ackTime", "subBatchPerPartition", "kafkaConsumerProperties");
        JavaUtils.INSTANCE
                .acceptIfNotNull(this.afterRollbackProcessor, instance::setAfterRollbackProcessor)
                .acceptIfCondition(this.containerProperties.getAckCount() > 0, this.containerProperties.getAckCount(),
                        properties::setAckCount)
                .acceptIfCondition(this.containerProperties.getAckTime() > 0, this.containerProperties.getAckTime(),
                        properties::setAckTime)
                .acceptIfNotNull(this.containerProperties.getSubBatchPerPartition(),
                        properties::setSubBatchPerPartition)
                .acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler)
                .acceptIfNotNull(this.missingTopicsFatal, instance.getContainerProperties()::setMissingTopicsFatal);
        if (endpoint.getAutoStartup() != null) {
            instance.setAutoStartup(endpoint.getAutoStartup());
        }
        else if (this.autoStartup != null) {
            instance.setAutoStartup(this.autoStartup);
        }
        instance.setRecordInterceptor(this.recordInterceptor);
        JavaUtils.INSTANCE
                .acceptIfNotNull(this.phase, instance::setPhase)
                .acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
                .acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
                .acceptIfNotNull(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
                .acceptIfNotNull(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
                .acceptIfNotNull(endpoint.getConsumerProperties(),
                        instance.getContainerProperties()::setKafkaConsumerProperties);
    }

    private void customizeContainer(C instance) {
        if (this.containerCustomizer != null) {
            this.containerCustomizer.configure(instance);
        }
    }
}

start()执行,最终由AbstractMessageListenerContainer.start()重写了Lifecycle.start()方法,在start()方法里调用了抽象方法doStart()。dostart()方法最后由ConcurrentMessageListenerContainer实现

public class ConcurrentMessageListenerContainer extends AbstractMessageListenerContainer {
        @Override
    protected void doStart() {
        if (!isRunning()) {
            checkTopics();
            ContainerProperties containerProperties = getContainerProperties();
            TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
            //如果线程数大于partitions的个数,那么线程数设置为partitions的个数
            if (topicPartitions != null && this.concurrency > topicPartitions.length) {
                this.concurrency = topicPartitions.length;
            }
            setRunning(true);
            //构建KafkaMessageListenerContainer
            for (int i = 0; i < this.concurrency; i++) {
                KafkaMessageListenerContainer container =
                        constructContainer(containerProperties, topicPartitions, i);
                String beanName = getBeanName();
                container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
                container.setApplicationContext(getApplicationContext());
                if (getApplicationEventPublisher() != null) {
                    container.setApplicationEventPublisher(getApplicationEventPublisher());
                }
                container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + i : "");
                container.setGenericErrorHandler(getGenericErrorHandler());
                container.setAfterRollbackProcessor(getAfterRollbackProcessor());
                container.setRecordInterceptor(getRecordInterceptor());
                container.setInterceptBeforeTx(isInterceptBeforeTx());
                container.setEmergencyStop(() -> {
                    stop(() -> {
                        // NOSONAR
                    });
                    publishContainerStoppedEvent();
                });
                if (isPaused()) {
                    container.pause();
                }
                //最终调用的是KafkaMessageListenerContainer的dostart()
                container.start();
                this.containers.add(container);
            }
        }
    }
}

public abstract class AbstractMessageListenerContainer
        implements GenericMessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware,
            ApplicationContextAware {
    @Override
    public final void start() {
        checkGroupId();
        synchronized (this.lifecycleMonitor) {
            if (!isRunning()) {
                Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
                        () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
                doStart();
            }
        }
    }           
}

public class KafkaMessageListenerContainer // NOSONAR line count
        extends AbstractMessageListenerContainer {
    @Override
    protected void doStart() {
        if (isRunning()) {
            return;
        }
        if (this.clientIdSuffix == null) { // stand-alone container
            checkTopics();
        }
        ContainerProperties containerProperties = getContainerProperties();
        //非自动提交的情况下,默认acktime是5秒
        checkAckMode(containerProperties);

        Object messageListener = containerProperties.getMessageListener();
        //设置消费者的异步线程池AsyncListenableTaskExecutor
        if (containerProperties.getConsumerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        GenericMessageListener listener = (GenericMessageListener) messageListener;
        //推断监听类型
        ListenerType listenerType = determineListenerType(listener);
        //根据ListenerType构建ListenerConsumer
        this.listenerConsumer = new ListenerConsumer(listener, listenerType);
        setRunning(true);
        this.startLatch = new CountDownLatch(1);
        //提交一个异步任务
        this.listenerConsumerFuture = containerProperties
                .getConsumerTaskExecutor()
                .submitListenable(this.listenerConsumer);
        try {
            if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.error("Consumer thread failed to start - does the configured task executor "
                        + "have enough threads to support all containers and concurrency?");
                //等的时间太长会通过spring抛出一个Event
                publishConsumerFailedToStart();
            }
        }
        catch (@SuppressWarnings(UNUSED) InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }           
}

我们看下

private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
    @Override
    public void run() {
        ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
        //发送消费Starting事件
        publishConsumerStartingEvent();
        this.consumerThread = Thread.currentThread();
        setupSeeks();
        KafkaUtils.setConsumerGroupId(this.consumerGroupId);
        this.count = 0;
        this.last = System.currentTimeMillis();
        initAssignedPartitions();
        //发送消费Started事件
        publishConsumerStartedEvent();
        Throwable exitThrowable = null;
        while (isRunning()) {
            try {
                //关键
                pollAndInvoke();
            }
            catch ( e) {
                //各种异常处理
            }

        }
        wrapUp(exitThrowable);
    }
    //这块就是kafka的核心逻辑
    protected void pollAndInvoke() {
        if (!this.autoCommit && !this.isRecordAck) {
            //非自动提交处理
            processCommits();
        }
        //事务修复
        fixTxOffsetsIfNeeded();
        //空闲轮训逻辑
        idleBetweenPollIfNecessary();
        if (this.seeks.size() > 0) {
            //处理offset移动
            processSeeks();
        }
        //有必要暂停消费
        pauseConsumerIfNecessary();
        this.lastPoll = System.currentTimeMillis();
        if (!isRunning()) {
            return;
        }
        this.polling.set(true);
        //这里会checkRebalanceCommits
        ConsumerRecords records = doPoll();
        if (!this.polling.compareAndSet(true, false) && records != null) {
            /*
             * There is a small race condition where wakeIfNecessary was called between
             * exiting the poll and before we reset the boolean.
             */
            if (records.count() > 0) {
                this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());
            }
            return;
        }
        //有必要恢复消费
        resumeConsumerIfNeccessary();
        debugRecords(records);
        if (records != null && records.count() > 0) {
            savePositionsIfNeeded(records);
            if (this.containerProperties.getIdleEventInterval() != null) {
                this.lastReceive = System.currentTimeMillis();
            }
            //最终执行监听
            invokeListener(records);
        }
        else {
            checkIdle();
        }
    }
    private void invokeListener(final ConsumerRecords records) {
        if (this.isBatchListener) {
            invokeBatchListener(records);
        }
        else {
            invokeRecordListener(records);
        }
    }
    //批量消费
    private void invokeBatchListener(final ConsumerRecords records) {
        List> recordList = null;
        if (!this.wantsFullRecords) {
            recordList = createRecordList(records);
        }
        if (this.wantsFullRecords || recordList.size() > 0) {
            if (this.transactionTemplate != null) {
                invokeBatchListenerInTx(records, recordList);
            }
            else {
                doInvokeBatchListener(records, recordList);
            }
        }
    }
    //单条消费
    private void invokeRecordListener(final ConsumerRecords records) {
        if (this.transactionTemplate != null) {
            invokeRecordListenerInTx(records);
        }
        else {
            doInvokeWithRecords(records);
        }
    }
    private void doInvokeWithRecords(final ConsumerRecords records) {
        Iterator> iterator = records.iterator();
        //单条消费内部是一个迭代器
        while (iterator.hasNext()) {
            if (this.stopImmediate && !isRunning()) {
                break;
            }
            final ConsumerRecord record = checkEarlyIntercept(iterator.next());
            if (record == null) {
                continue;
            }
            this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
            doInvokeRecordListener(record, iterator);
            if (this.nackSleep >= 0) {
                handleNack(records, record);
                break;
            }
        }
    }
}

转载请注明:我要编程 » spring与springboot中kafka消费分析