/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.internals.AcknowledgementCommitCallbackHandler;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
import org.apache.kafka.clients.consumer.internals.ShareConsumerDelegate;
import org.apache.kafka.clients.consumer.internals.ShareConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ShareConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.ShareFetch;
import org.apache.kafka.clients.consumer.internals.ShareFetchBuffer;
import org.apache.kafka.clients.consumer.internals.ShareFetchCollector;
import org.apache.kafka.clients.consumer.internals.ShareFetchConfig;
import org.apache.kafka.clients.consumer.internals.ShareFetchException;
import org.apache.kafka.clients.consumer.internals.ShareFetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.WakeupTrigger;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.event.Level;

public class ShareConsumerImpl<K, V>
implements ShareConsumerDelegate<K, V> {
    private static final long NO_CURRENT_THREAD = -1L;
    private final ApplicationEventHandler applicationEventHandler;
    private final Time time;
    private final KafkaShareConsumerMetrics kafkaShareConsumerMetrics;
    private final AsyncConsumerMetrics asyncConsumerMetrics;
    private Logger log;
    private final String clientId;
    private final String groupId;
    private final ShareAcknowledgementEventHandler acknowledgementEventHandler;
    private final BlockingQueue<ShareAcknowledgementEvent> acknowledgementEventQueue;
    private final ShareAcknowledgementEventProcessor acknowledgementEventProcessor;
    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
    private final BackgroundEventHandler backgroundEventHandler;
    private final BackgroundEventProcessor backgroundEventProcessor;
    private final CompletableEventReaper backgroundEventReaper;
    private final Deserializers<K, V> deserializers;
    private ShareFetch<K, V> currentFetch;
    private AcknowledgementCommitCallbackHandler acknowledgementCommitCallbackHandler;
    private final List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements;
    private final ShareAcknowledgementMode acknowledgementMode;
    private final ShareFetchBuffer fetchBuffer;
    private final ShareFetchCollector<K, V> fetchCollector;
    private final SubscriptionState subscriptions;
    private final ShareConsumerMetadata metadata;
    private final Metrics metrics;
    private final int requestTimeoutMs;
    private final int defaultApiTimeoutMs;
    private volatile boolean closed = false;
    private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
    private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
    private final AtomicLong currentThread = new AtomicLong(-1L);
    private final AtomicInteger refCount = new AtomicInteger(0);
    private boolean shouldSendShareFetchEvent = false;

    ShareConsumerImpl(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(config, keyDeserializer, valueDeserializer, Time.SYSTEM, ApplicationEventHandler::new, CompletableEventReaper::new, ShareFetchCollector::new, new LinkedBlockingQueue<ShareAcknowledgementEvent>(), new LinkedBlockingQueue<BackgroundEvent>());
    }

    ShareConsumerImpl(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Time time, ApplicationEventHandlerFactory applicationEventHandlerFactory, AsyncKafkaConsumer.CompletableEventReaperFactory backgroundEventReaperFactory, ShareFetchCollectorFactory<K, V> fetchCollectorFactory, LinkedBlockingQueue<ShareAcknowledgementEvent> acknowledgementEventQueue, LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue) {
        try {
            GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.SHARE);
            this.clientId = config.getString("client.id");
            this.groupId = config.getString("group.id");
            this.maybeThrowInvalidGroupIdException();
            LogContext logContext = ShareConsumerImpl.createLogContext(this.clientId, this.groupId);
            this.acknowledgementEventQueue = acknowledgementEventQueue;
            this.backgroundEventQueue = backgroundEventQueue;
            this.log = logContext.logger(this.getClass());
            this.log.debug("Initializing the Kafka share consumer");
            this.requestTimeoutMs = config.getInt("request.timeout.ms");
            this.defaultApiTimeoutMs = config.getInt("default.api.timeout.ms");
            this.time = time;
            List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, (AbstractConfig)config);
            this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(this.clientId, config);
            this.clientTelemetryReporter.ifPresent(reporters::add);
            this.metrics = ConsumerUtils.createMetrics(config, time, reporters);
            this.asyncConsumerMetrics = new AsyncConsumerMetrics(this.metrics, "consumer-share-metrics");
            this.acknowledgementMode = ShareConsumerImpl.initializeAcknowledgementMode(config, this.log);
            this.deserializers = new Deserializers<K, V>(config, keyDeserializer, valueDeserializer, this.metrics);
            this.currentFetch = ShareFetch.empty();
            this.subscriptions = ConsumerUtils.createSubscriptionState(config, logContext);
            ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(this.metrics.reporters(), Arrays.asList(this.deserializers.keyDeserializer(), this.deserializers.valueDeserializer()));
            this.metadata = new ShareConsumerMetadata(config, this.subscriptions, logContext, clusterResourceListeners);
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
            this.metadata.bootstrap(addresses);
            ShareFetchMetricsManager shareFetchMetricsManager = ConsumerUtils.createShareFetchMetricsManager(this.metrics);
            ApiVersions apiVersions = new ApiVersions();
            LinkedBlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<ApplicationEvent>();
            this.acknowledgementEventHandler = new ShareAcknowledgementEventHandler(acknowledgementEventQueue);
            this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, this.asyncConsumerMetrics);
            this.fetchBuffer = new ShareFetchBuffer(logContext);
            Supplier<NetworkClientDelegate> networkClientDelegateSupplier = NetworkClientDelegate.supplier(time, logContext, this.metadata, config, apiVersions, this.metrics, shareFetchMetricsManager.throttleTimeSensor(), this.clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), this.backgroundEventHandler, true, this.asyncConsumerMetrics);
            this.completedAcknowledgements = new LinkedList<Map<TopicIdPartition, Acknowledgements>>();
            Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time, logContext, this.acknowledgementEventHandler, this.backgroundEventHandler, this.metadata, this.subscriptions, this.fetchBuffer, config, groupRebalanceConfig, shareFetchMetricsManager, this.clientTelemetryReporter, this.metrics);
            Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, this.metadata, this.subscriptions, requestManagersSupplier);
            this.applicationEventHandler = applicationEventHandlerFactory.build(logContext, time, config.getInt("default.api.timeout.ms"), applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, this.asyncConsumerMetrics);
            this.acknowledgementEventProcessor = new ShareAcknowledgementEventProcessor();
            this.backgroundEventProcessor = new BackgroundEventProcessor();
            this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext);
            this.fetchCollector = fetchCollectorFactory.build(logContext, this.metadata, this.subscriptions, new ShareFetchConfig(config), this.deserializers);
            this.kafkaShareConsumerMetrics = new KafkaShareConsumerMetrics(this.metrics);
            config.logUnused();
            AppInfoParser.registerAppInfo("kafka.consumer", this.clientId, this.metrics, time.milliseconds());
            this.log.debug("Kafka share consumer initialized");
        }
        catch (Throwable t) {
            if (this.log != null) {
                this.close(Duration.ZERO, true);
            }
            throw new KafkaException("Failed to construct Kafka share consumer", t);
        }
    }

    ShareConsumerImpl(LogContext logContext, String clientId, String groupId, ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Time time, KafkaClient client, SubscriptionState subscriptions, ShareConsumerMetadata metadata) {
        this.clientId = clientId;
        this.groupId = groupId;
        this.log = logContext.logger(this.getClass());
        this.time = time;
        this.metrics = new Metrics(time);
        this.clientTelemetryReporter = Optional.empty();
        this.deserializers = new Deserializers<K, V>(config, keyDeserializer, valueDeserializer, this.metrics);
        this.currentFetch = ShareFetch.empty();
        this.subscriptions = subscriptions;
        this.metadata = metadata;
        this.requestTimeoutMs = config.getInt("request.timeout.ms");
        this.defaultApiTimeoutMs = config.getInt("default.api.timeout.ms");
        this.acknowledgementMode = ShareConsumerImpl.initializeAcknowledgementMode(config, this.log);
        this.fetchBuffer = new ShareFetchBuffer(logContext);
        this.completedAcknowledgements = new LinkedList<Map<TopicIdPartition, Acknowledgements>>();
        ShareConsumerMetrics metricsRegistry = new ShareConsumerMetrics();
        ShareFetchMetricsManager shareFetchMetricsManager = new ShareFetchMetricsManager(this.metrics, metricsRegistry.shareFetchMetrics);
        this.fetchCollector = new ShareFetchCollector<K, V>(logContext, metadata, subscriptions, new ShareFetchConfig(config), this.deserializers);
        this.kafkaShareConsumerMetrics = new KafkaShareConsumerMetrics(this.metrics);
        this.asyncConsumerMetrics = new AsyncConsumerMetrics(this.metrics, "consumer-share-metrics");
        LinkedBlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<ApplicationEvent>();
        this.acknowledgementEventQueue = new LinkedBlockingQueue<ShareAcknowledgementEvent>();
        this.acknowledgementEventHandler = new ShareAcknowledgementEventHandler(this.acknowledgementEventQueue);
        this.backgroundEventQueue = new LinkedBlockingQueue<BackgroundEvent>();
        this.backgroundEventHandler = new BackgroundEventHandler(this.backgroundEventQueue, time, this.asyncConsumerMetrics);
        Supplier<NetworkClientDelegate> networkClientDelegateSupplier = NetworkClientDelegate.supplier(time, config, logContext, client, metadata, this.backgroundEventHandler, true, this.asyncConsumerMetrics);
        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.SHARE);
        Supplier<RequestManagers> requestManagersSupplier = RequestManagers.supplier(time, logContext, this.acknowledgementEventHandler, this.backgroundEventHandler, metadata, subscriptions, this.fetchBuffer, config, groupRebalanceConfig, shareFetchMetricsManager, this.clientTelemetryReporter, this.metrics);
        Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, subscriptions, requestManagersSupplier);
        this.applicationEventHandler = new ApplicationEventHandler(logContext, time, config.getInt("default.api.timeout.ms"), applicationEventQueue, new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, this.asyncConsumerMetrics);
        this.acknowledgementEventProcessor = new ShareAcknowledgementEventProcessor();
        this.backgroundEventProcessor = new BackgroundEventProcessor();
        this.backgroundEventReaper = new CompletableEventReaper(logContext);
        config.logUnused();
        AppInfoParser.registerAppInfo("kafka.consumer", clientId, this.metrics, time.milliseconds());
    }

    ShareConsumerImpl(LogContext logContext, String clientId, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, ShareFetchBuffer fetchBuffer, ShareFetchCollector<K, V> fetchCollector, Time time, ApplicationEventHandler applicationEventHandler, BlockingQueue<ShareAcknowledgementEvent> acknowledgementEventQueue, BlockingQueue<BackgroundEvent> backgroundEventQueue, CompletableEventReaper backgroundEventReaper, Metrics metrics, SubscriptionState subscriptions, ShareConsumerMetadata metadata, int requestTimeoutMs, int defaultApiTimeoutMs, String groupId, String acknowledgementModeConfig) {
        this.log = logContext.logger(this.getClass());
        this.subscriptions = subscriptions;
        this.clientId = clientId;
        this.groupId = groupId;
        this.fetchBuffer = fetchBuffer;
        this.fetchCollector = fetchCollector;
        this.time = time;
        this.acknowledgementEventQueue = acknowledgementEventQueue;
        this.acknowledgementEventProcessor = new ShareAcknowledgementEventProcessor();
        this.backgroundEventQueue = backgroundEventQueue;
        this.backgroundEventProcessor = new BackgroundEventProcessor();
        this.backgroundEventReaper = backgroundEventReaper;
        this.metrics = metrics;
        this.metadata = metadata;
        this.requestTimeoutMs = requestTimeoutMs;
        this.defaultApiTimeoutMs = defaultApiTimeoutMs;
        this.acknowledgementMode = ShareAcknowledgementMode.fromString(acknowledgementModeConfig);
        this.deserializers = new Deserializers<K, V>(keyDeserializer, valueDeserializer, metrics);
        this.currentFetch = ShareFetch.empty();
        this.applicationEventHandler = applicationEventHandler;
        this.kafkaShareConsumerMetrics = new KafkaShareConsumerMetrics(metrics);
        this.clientTelemetryReporter = Optional.empty();
        this.completedAcknowledgements = Collections.emptyList();
        this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, "consumer-share-metrics");
        this.acknowledgementEventHandler = new ShareAcknowledgementEventHandler(acknowledgementEventQueue);
        this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, this.asyncConsumerMetrics);
    }

    @Override
    public Set<String> subscription() {
        this.acquireAndEnsureOpen();
        try {
            Set<String> set = Collections.unmodifiableSet(this.subscriptions.subscription());
            return set;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(Collection<String> topics) {
        this.acquireAndEnsureOpen();
        try {
            this.maybeThrowInvalidGroupIdException();
            if (topics == null) {
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            }
            if (topics.isEmpty()) {
                this.unsubscribe();
            } else {
                for (String topic : topics) {
                    if (!Utils.isBlank(topic)) continue;
                    throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                }
                this.applicationEventHandler.addAndGet(new ShareSubscriptionChangeEvent(topics));
                this.log.info("Subscribed to topics: {}", (Object)String.join((CharSequence)", ", topics));
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public void unsubscribe() {
        this.acquireAndEnsureOpen();
        try {
            Timer timer = this.time.timer(this.defaultApiTimeoutMs);
            ShareUnsubscribeEvent unsubscribeApplicationEvent = new ShareUnsubscribeEvent(CompletableEvent.calculateDeadlineMs(timer));
            this.applicationEventHandler.addAndGet(unsubscribeApplicationEvent);
            this.log.info("Unsubscribed all topics");
        }
        finally {
            this.release();
        }
    }

    @Override
    public synchronized ConsumerRecords<K, V> poll(Duration timeout) {
        Timer timer = this.time.timer(timeout);
        this.acquireAndEnsureOpen();
        try {
            Object fetch;
            this.processBackgroundEvents();
            this.handleCompletedAcknowledgements();
            this.acknowledgeBatchIfImplicitAcknowledgement();
            this.ensureInFlightAcknowledgedIfExplicitAcknowledgement();
            this.kafkaShareConsumerMetrics.recordPollStart(timer.currentTimeMs());
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics.");
            }
            this.shouldSendShareFetchEvent = true;
            do {
                this.applicationEventHandler.add(new SharePollEvent(timer.currentTimeMs()));
                this.wakeupTrigger.maybeTriggerWakeup();
                fetch = this.pollForFetches(timer);
                if (!((ShareFetch)fetch).isEmpty()) {
                    this.currentFetch = fetch;
                    this.handleCompletedAcknowledgements();
                    ConsumerRecords<K, V> consumerRecords = new ConsumerRecords<K, V>(((ShareFetch)fetch).records(), Map.of());
                    return consumerRecords;
                }
                this.processBackgroundEvents();
                this.metadata.maybeThrowAnyException();
            } while (timer.notExpired());
            this.handleCompletedAcknowledgements();
            fetch = ConsumerRecords.empty();
            return fetch;
        }
        catch (ShareFetchException e) {
            this.currentFetch = e.shareFetch();
            throw e.cause();
        }
        finally {
            this.kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
            this.release();
        }
    }

    private ShareFetch<K, V> pollForFetches(Timer timer) {
        long pollTimeout = Math.min(this.applicationEventHandler.maximumTimeToWait(), timer.remainingMs());
        Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = this.currentFetch.takeAcknowledgedRecords();
        ShareFetch<K, V> fetch = this.collect(acknowledgementsMap);
        if (!fetch.isEmpty()) {
            return fetch;
        }
        Timer pollTimer = this.time.timer(pollTimeout);
        this.wakeupTrigger.setShareFetchAction(this.fetchBuffer);
        try {
            this.fetchBuffer.awaitNotEmpty(pollTimer);
        }
        catch (InterruptException e) {
            this.log.trace("Timeout during fetch", (Throwable)e);
            throw e;
        }
        finally {
            timer.update(pollTimer.currentTimeMs());
            this.wakeupTrigger.clearTask();
        }
        return this.collect(Collections.emptyMap());
    }

    private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
        Map<TopicIdPartition, NodeAcknowledgements> acksToSend = acknowledgementsMap;
        if (this.currentFetch.isEmpty() && !this.currentFetch.hasRenewals()) {
            ShareFetch<K, V> fetch = this.fetchCollector.collect(this.fetchBuffer);
            if (fetch.isEmpty()) {
                Map<TopicIdPartition, NodeAcknowledgements> controlRecordAcknowledgements = fetch.takeAcknowledgedRecords();
                if (!controlRecordAcknowledgements.isEmpty()) {
                    this.sendShareAcknowledgeAsyncEvent(controlRecordAcknowledgements);
                }
                if (this.shouldSendShareFetchEvent) {
                    this.applicationEventHandler.add(new ShareFetchEvent(acksToSend));
                    this.shouldSendShareFetchEvent = false;
                    this.applicationEventHandler.wakeupNetworkThread();
                    acksToSend = Map.of();
                }
            }
            if (!acksToSend.isEmpty()) {
                this.sendShareAcknowledgeAsyncEvent(acksToSend);
            }
            return fetch;
        }
        if (this.currentFetch.hasRenewals()) {
            this.currentFetch.takeRenewedRecords();
            if (this.currentFetch.hasRenewals() && this.shouldSendShareFetchEvent) {
                this.applicationEventHandler.add(new ShareFetchEvent(acksToSend));
                this.shouldSendShareFetchEvent = false;
                this.applicationEventHandler.wakeupNetworkThread();
                acksToSend = Map.of();
            }
        }
        if (!acksToSend.isEmpty()) {
            this.sendShareAcknowledgeAsyncEvent(acksToSend);
        }
        return this.currentFetch;
    }

    private void sendShareAcknowledgeAsyncEvent(Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap) {
        Timer timer = this.time.timer(this.defaultApiTimeoutMs);
        this.applicationEventHandler.add(new ShareAcknowledgeAsyncEvent(acknowledgementsMap, CompletableEvent.calculateDeadlineMs(timer)));
        this.applicationEventHandler.wakeupNetworkThread();
    }

    @Override
    public void acknowledge(ConsumerRecord<K, V> record) {
        this.acknowledge(record, AcknowledgeType.ACCEPT);
    }

    @Override
    public void acknowledge(ConsumerRecord<K, V> record, AcknowledgeType type) {
        this.acquireAndEnsureOpen();
        try {
            this.ensureExplicitAcknowledgement();
            this.currentFetch.acknowledge(record, type);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void acknowledge(String topic, int partition, long offset, AcknowledgeType type) {
        this.acquireAndEnsureOpen();
        try {
            this.ensureExplicitAcknowledgement();
            this.currentFetch.acknowledge(topic, partition, offset, type);
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<TopicIdPartition, Optional<KafkaException>> commitSync() {
        return this.commitSync(Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<TopicIdPartition, Optional<KafkaException>> commitSync(Duration timeout) {
        this.acquireAndEnsureOpen();
        try {
            this.handleCompletedAcknowledgements();
            this.acknowledgeBatchIfImplicitAcknowledgement();
            Timer requestTimer = this.time.timer(timeout.toMillis());
            Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = this.acknowledgementsToSend();
            if (acknowledgementsMap.isEmpty()) {
                Map<TopicIdPartition, Optional<KafkaException>> map = Collections.emptyMap();
                return map;
            }
            ShareAcknowledgeSyncEvent event = new ShareAcknowledgeSyncEvent(acknowledgementsMap, CompletableEvent.calculateDeadlineMs(requestTimer));
            this.applicationEventHandler.add(event);
            CompletableFuture commitFuture = event.future();
            this.wakeupTrigger.setActiveTask(commitFuture);
            try {
                HashMap<TopicIdPartition, Optional<KafkaException>> result = new HashMap<TopicIdPartition, Optional<KafkaException>>();
                Map completedAcknowledgements = (Map)ConsumerUtils.getResult(commitFuture);
                completedAcknowledgements.forEach((tip, acks) -> {
                    KafkaException exception = acks.getAcknowledgeException();
                    if (exception == null) {
                        result.put((TopicIdPartition)tip, Optional.empty());
                    } else {
                        result.put((TopicIdPartition)tip, Optional.of(exception));
                    }
                });
                this.handleCompletedAcknowledgements();
                HashMap<TopicIdPartition, Optional<KafkaException>> hashMap = result;
                this.wakeupTrigger.clearTask();
                return hashMap;
            }
            catch (Throwable throwable) {
                this.wakeupTrigger.clearTask();
                throw throwable;
            }
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitAsync() {
        this.acquireAndEnsureOpen();
        try {
            this.handleCompletedAcknowledgements();
            this.acknowledgeBatchIfImplicitAcknowledgement();
            Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = this.acknowledgementsToSend();
            if (!acknowledgementsMap.isEmpty()) {
                Timer timer = this.time.timer(this.defaultApiTimeoutMs);
                ShareAcknowledgeAsyncEvent event = new ShareAcknowledgeAsyncEvent(acknowledgementsMap, CompletableEvent.calculateDeadlineMs(timer));
                this.applicationEventHandler.add(event);
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callback) {
        this.acquireAndEnsureOpen();
        try {
            if (callback != null) {
                if (this.acknowledgementCommitCallbackHandler == null) {
                    ShareAcknowledgementCommitCallbackRegistrationEvent event = new ShareAcknowledgementCommitCallbackRegistrationEvent(true);
                    this.applicationEventHandler.add(event);
                }
                this.acknowledgementCommitCallbackHandler = new AcknowledgementCommitCallbackHandler(callback);
            } else {
                if (this.acknowledgementCommitCallbackHandler != null) {
                    ShareAcknowledgementCommitCallbackRegistrationEvent event = new ShareAcknowledgementCommitCallbackRegistrationEvent(false);
                    this.applicationEventHandler.add(event);
                }
                this.completedAcknowledgements.clear();
                this.acknowledgementCommitCallbackHandler = null;
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public Uuid clientInstanceId(Duration timeout) {
        if (this.clientTelemetryReporter.isEmpty()) {
            throw new IllegalStateException("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.");
        }
        return ClientTelemetryUtils.fetchClientInstanceId(this.clientTelemetryReporter.get(), timeout);
    }

    @Override
    public Optional<Integer> acquisitionLockTimeoutMs() {
        this.acquireAndEnsureOpen();
        try {
            Optional<Integer> optional = this.currentFetch.acquisitionLockTimeoutMs();
            return optional;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override
    public void registerMetricForSubscription(KafkaMetric metric) {
        if (!this.metrics().containsKey(metric.metricName())) {
            this.clientTelemetryReporter.ifPresent(reporter -> reporter.metricChange(metric));
        } else {
            this.log.debug("Skipping registration for metric {}. Existing consumer metrics cannot be overwritten.", (Object)metric.metricName());
        }
    }

    @Override
    public void unregisterMetricFromSubscription(KafkaMetric metric) {
        if (!this.metrics().containsKey(metric.metricName())) {
            this.clientTelemetryReporter.ifPresent(reporter -> reporter.metricRemoval(metric));
        } else {
            this.log.debug("Skipping unregistration for metric {}. Existing consumer metrics cannot be removed.", (Object)metric.metricName());
        }
    }

    @Override
    public void close() {
        this.close(Duration.ofMillis(30000L));
    }

    @Override
    public void close(Duration timeout) {
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        this.acquire();
        try {
            if (!this.closed) {
                this.close(timeout, false);
            }
        }
        finally {
            this.closed = true;
            this.release();
        }
    }

    private void close(Duration timeout, boolean swallowException) {
        this.log.trace("Closing the Kafka consumer");
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        this.wakeupTrigger.disableWakeups();
        Timer closeTimer = this.createTimerForCloseRequests(timeout);
        this.clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
        closeTimer.update();
        Utils.swallow(this.log, Level.ERROR, "Failed to release assignment before closing consumer", () -> this.sendAcknowledgementsAndLeaveGroup(closeTimer, firstException), firstException);
        Utils.swallow(this.log, Level.ERROR, "Failed to stop finding coordinator", this::stopFindCoordinatorOnClose, firstException);
        Utils.swallow(this.log, Level.ERROR, "Failed invoking acknowledgement commit callback", this::handleCompletedAcknowledgements, firstException);
        Utils.swallow(this.log, Level.ERROR, "Failed processing background events", this::processBackgroundEventsOnClose, firstException);
        if (this.applicationEventHandler != null) {
            Utils.closeQuietly(() -> this.applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException);
        }
        closeTimer.update();
        if (this.backgroundEventReaper != null && this.backgroundEventQueue != null) {
            this.backgroundEventReaper.reap(this.backgroundEventQueue);
        }
        Utils.closeQuietly((AutoCloseable)this.kafkaShareConsumerMetrics, "kafka share consumer metrics", firstException);
        Utils.closeQuietly((AutoCloseable)this.asyncConsumerMetrics, "kafka async consumer metrics", firstException);
        Utils.closeQuietly((AutoCloseable)this.metrics, "consumer metrics", firstException);
        Utils.closeQuietly(this.deserializers, "consumer deserializers", firstException);
        this.clientTelemetryReporter.ifPresent(reporter -> Utils.closeQuietly((AutoCloseable)reporter, "consumer telemetry reporter", firstException));
        AppInfoParser.unregisterAppInfo("kafka.consumer", this.clientId, this.metrics);
        this.log.debug("Kafka share consumer has been closed");
        Throwable exception = firstException.get();
        if (exception != null && !swallowException) {
            if (exception instanceof InterruptException) {
                throw (InterruptException)exception;
            }
            throw new KafkaException("Failed to close Kafka share consumer", exception);
        }
    }

    private void stopFindCoordinatorOnClose() {
        if (this.applicationEventHandler == null) {
            return;
        }
        this.log.debug("Stop finding coordinator during consumer close");
        this.applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
    }

    private Timer createTimerForCloseRequests(Duration timeout) {
        Time time = this.time == null ? Time.SYSTEM : this.time;
        return time.timer(Math.min(timeout.toMillis(), (long)this.requestTimeoutMs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendAcknowledgementsAndLeaveGroup(Timer timer, AtomicReference<Throwable> firstException) {
        if (this.applicationEventHandler == null || this.backgroundEventProcessor == null || this.backgroundEventReaper == null || this.backgroundEventQueue == null) {
            return;
        }
        this.completeQuietly(() -> this.applicationEventHandler.addAndGet(new ShareAcknowledgeOnCloseEvent(this.acknowledgementsToSend(), CompletableEvent.calculateDeadlineMs(timer))), "Failed to send pending acknowledgements with a timeout(ms)=" + timer.timeoutMs(), firstException);
        timer.update();
        ShareUnsubscribeEvent unsubscribeEvent = new ShareUnsubscribeEvent(CompletableEvent.calculateDeadlineMs(timer));
        this.applicationEventHandler.add(unsubscribeEvent);
        try {
            this.processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException || e instanceof InvalidTopicException);
            this.log.info("Completed releasing assignment and leaving group to close consumer.");
        }
        catch (TimeoutException e2) {
            this.log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't complete it within {} ms. It will proceed to close.", (Object)timer.timeoutMs());
        }
        finally {
            timer.update();
        }
    }

    @Override
    public void wakeup() {
        this.wakeupTrigger.wakeup();
    }

    private void acquireAndEnsureOpen() {
        this.acquire();
        if (this.closed) {
            this.release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        Thread thread = Thread.currentThread();
        long threadId = thread.getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaShareConsumer is not safe for multi-threaded access. currentThread(name: " + thread.getName() + ", id: " + threadId + ") otherThread(id: " + this.currentThread.get() + ")");
        }
        if (this.acknowledgementCommitCallbackHandler != null && this.acknowledgementCommitCallbackHandler.hasEnteredCallback()) {
            throw new IllegalStateException("KafkaShareConsumer methods are not accessible from user-defined acknowledgement commit callback.");
        }
        this.refCount.incrementAndGet();
    }

    private void release() {
        if (this.refCount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

    public static LogContext createLogContext(String clientId, String groupId) {
        return new LogContext("[ShareConsumer clientId=" + clientId + ", groupId=" + groupId + "] ");
    }

    private void maybeThrowInvalidGroupIdException() {
        if (this.groupId == null || this.groupId.isEmpty()) {
            throw new InvalidGroupIdException("You must provide a valid group.id in the consumer configuration.");
        }
    }

    private void handleCompletedAcknowledgements() {
        if (this.acknowledgementEventQueue == null || this.acknowledgementEventHandler == null) {
            return;
        }
        this.processAcknowledgementEvents();
        if (!this.completedAcknowledgements.isEmpty()) {
            try {
                if (this.acknowledgementCommitCallbackHandler != null) {
                    this.acknowledgementCommitCallbackHandler.onComplete(this.completedAcknowledgements);
                }
            }
            finally {
                this.completedAcknowledgements.clear();
            }
        }
    }

    private void acknowledgeBatchIfImplicitAcknowledgement() {
        if (this.acknowledgementMode == ShareAcknowledgementMode.IMPLICIT) {
            this.currentFetch.acknowledgeAll(AcknowledgeType.ACCEPT);
        }
    }

    private void ensureInFlightAcknowledgedIfExplicitAcknowledgement() {
        if (this.acknowledgementMode == ShareAcknowledgementMode.EXPLICIT && !this.currentFetch.checkAllInFlightAreAcknowledged()) {
            throw new IllegalStateException("All records must be acknowledged in explicit acknowledgement mode.");
        }
    }

    private Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsToSend() {
        return this.currentFetch.takeAcknowledgedRecords();
    }

    private void ensureExplicitAcknowledgement() {
        if (this.acknowledgementMode == ShareAcknowledgementMode.IMPLICIT) {
            throw new IllegalStateException("Implicit acknowledgement of delivery is being used.");
        }
    }

    private static ShareAcknowledgementMode initializeAcknowledgementMode(ConsumerConfig config, Logger log) {
        String s = config.getString("share.acknowledgement.mode");
        return ShareAcknowledgementMode.fromString(s);
    }

    void processAcknowledgementEvents() {
        List<ShareAcknowledgementEvent> events = this.acknowledgementEventHandler.drainEvents();
        if (!events.isEmpty()) {
            for (ShareAcknowledgementEvent event : events) {
                try {
                    this.acknowledgementEventProcessor.process(event);
                }
                catch (Exception e) {
                    this.log.warn("An error occurred when processing the acknowledgement event: {}", (Object)e.getMessage(), (Object)e);
                }
            }
        }
    }

    void processBackgroundEventsOnClose() {
        block3: {
            if (this.backgroundEventQueue == null || this.backgroundEventHandler == null) {
                return;
            }
            try {
                this.processBackgroundEvents();
            }
            catch (Exception e) {
                if (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException || e instanceof InvalidTopicException) break block3;
                throw e;
            }
        }
    }

    boolean processBackgroundEvents() {
        AtomicReference<KafkaException> firstError = new AtomicReference<KafkaException>();
        List<BackgroundEvent> events = this.backgroundEventHandler.drainEvents();
        if (!events.isEmpty()) {
            long startMs = this.time.milliseconds();
            for (BackgroundEvent event : events) {
                this.asyncConsumerMetrics.recordBackgroundEventQueueTime(this.time.milliseconds() - event.enqueuedMs());
                try {
                    if (event instanceof CompletableEvent) {
                        this.backgroundEventReaper.add((CompletableEvent)((Object)event));
                    }
                    this.backgroundEventProcessor.process(event);
                }
                catch (Throwable t) {
                    KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
                    if (firstError.compareAndSet(null, e)) continue;
                    this.log.warn("An error occurred when processing the background event: {}", (Object)e.getMessage(), (Object)e);
                }
            }
            this.asyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(this.time.milliseconds() - startMs);
        }
        this.backgroundEventReaper.reap(this.time.milliseconds());
        if (firstError.get() != null) {
            throw (KafkaException)firstError.get();
        }
        return !events.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    <T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException) {
        this.log.trace("Will wait up to {} ms for future {} to complete", (Object)timer.remainingMs(), future);
        do {
            boolean hadEvents;
            block11: {
                hadEvents = false;
                try {
                    hadEvents = this.processBackgroundEvents();
                }
                catch (Exception e) {
                    if (ignoreErrorEventException.test(e)) break block11;
                    throw e;
                }
            }
            try {
                if (future.isDone()) {
                    T result = ConsumerUtils.getResult(future);
                    this.log.trace("Future {} completed successfully", future);
                    T t = result;
                    return t;
                }
                if (!hadEvents) {
                    Timer pollInterval = this.time.timer(100L);
                    this.log.trace("Waiting {} ms for future {} to complete", (Object)pollInterval.remainingMs(), future);
                    T result = ConsumerUtils.getResult(future, pollInterval);
                    this.log.trace("Future {} completed successfully", future);
                    T t = result;
                    return t;
                }
            }
            catch (TimeoutException timeoutException) {
            }
            finally {
                timer.update();
            }
        } while (timer.notExpired());
        this.log.trace("Future {} did not complete within timeout", future);
        throw new TimeoutException("Operation timed out before completion");
    }

    void completeQuietly(Utils.ThrowingRunnable function, String msg, AtomicReference<Throwable> firstException) {
        try {
            function.run();
        }
        catch (TimeoutException e) {
            this.log.debug("Timeout expired before the {} operation could complete.", (Object)msg);
        }
        catch (Exception e) {
            firstException.compareAndSet(null, e);
        }
    }

    @Override
    public String clientId() {
        return this.clientId;
    }

    @Override
    public Metrics metricsRegistry() {
        return this.metrics;
    }

    AsyncConsumerMetrics asyncConsumerMetrics() {
        return this.asyncConsumerMetrics;
    }

    @Override
    public KafkaShareConsumerMetrics kafkaShareConsumerMetrics() {
        return this.kafkaShareConsumerMetrics;
    }

    static interface ApplicationEventHandlerFactory {
        public ApplicationEventHandler build(LogContext var1, Time var2, int var3, BlockingQueue<ApplicationEvent> var4, CompletableEventReaper var5, Supplier<ApplicationEventProcessor> var6, Supplier<NetworkClientDelegate> var7, Supplier<RequestManagers> var8, AsyncConsumerMetrics var9);
    }

    static interface ShareFetchCollectorFactory<K, V> {
        public ShareFetchCollector<K, V> build(LogContext var1, ShareConsumerMetadata var2, SubscriptionState var3, ShareFetchConfig var4, Deserializers<K, V> var5);
    }

    private class ShareAcknowledgementEventProcessor
    implements EventProcessor<ShareAcknowledgementEvent> {
        @Override
        public void process(ShareAcknowledgementEvent event) {
            if (ShareConsumerImpl.this.acknowledgementCommitCallbackHandler != null) {
                ShareConsumerImpl.this.completedAcknowledgements.add(event.acknowledgementsMap());
            }
            if (event.checkForRenewAcknowledgements()) {
                ShareConsumerImpl.this.currentFetch.renew(event.acknowledgementsMap(), event.acquisitionLockTimeoutMs());
            }
        }
    }

    private class BackgroundEventProcessor
    implements EventProcessor<BackgroundEvent> {
        @Override
        public void process(BackgroundEvent event) {
            if (event.type() != BackgroundEvent.Type.ERROR) {
                throw new IllegalArgumentException("Background event type " + String.valueOf((Object)event.type()) + " was not expected");
            }
            this.process((ErrorEvent)event);
        }

        @Override
        private void process(ErrorEvent event) {
            throw event.error();
        }
    }
}

