/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.NamedInternal;
import org.apache.kafka.streams.kstream.internals.graph.GracePeriodGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;

class GroupedStreamAggregateBuilder<K, V> {
    private final InternalStreamsBuilder builder;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private final boolean repartitionRequired;
    private final String userProvidedRepartitionTopicName;
    private final Set<String> subTopologySourceNodes;
    private final String name;
    private final GraphNode graphNode;
    private GraphNode repartitionNode;
    final Initializer<Long> countInitializer = () -> 0L;
    final Aggregator<K, V, Long> countAggregator = (aggKey, value, aggregate) -> aggregate + 1L;
    final Initializer<V> reduceInitializer = () -> null;

    GroupedStreamAggregateBuilder(InternalStreamsBuilder builder, GroupedInternal<K, V> groupedInternal, boolean repartitionRequired, Set<String> subTopologySourceNodes, String name, GraphNode graphNode) {
        this.builder = builder;
        this.keySerde = groupedInternal.keySerde();
        this.valueSerde = groupedInternal.valueSerde();
        this.repartitionRequired = repartitionRequired;
        this.subTopologySourceNodes = subTopologySourceNodes;
        this.name = name;
        this.graphNode = graphNode;
        this.userProvidedRepartitionTopicName = groupedInternal.name();
    }

    <KR, VR> KTable<KR, VR> buildNonWindowed(NamedInternal functionName, String storeName, KStreamAggProcessorSupplier<K, V, KR, VR> aggregateSupplier, String queryableStoreName, Serde<KR> keySerde, Serde<VR> valueSerde, boolean isOutputVersioned) {
        String aggFunctionName = functionName.name();
        ProcessorGraphNode<K, V> aggProcessorNode = new ProcessorGraphNode<K, V>(aggFunctionName, new ProcessorParameters<K, V, KR, VR>(aggregateSupplier, aggFunctionName));
        aggProcessorNode.setOutputVersioned(isOutputVersioned);
        return this.build(aggFunctionName, storeName, aggregateSupplier, aggProcessorNode, queryableStoreName, keySerde, valueSerde);
    }

    <KR, VR> KTable<KR, VR> buildWindowed(NamedInternal functionName, String storeName, long gracePeriod, KStreamAggProcessorSupplier<K, V, KR, VR> aggregateSupplier, String queryableStoreName, Serde<KR> keySerde, Serde<VR> valueSerde, boolean isOutputVersioned) {
        String aggFunctionName = functionName.name();
        GracePeriodGraphNode<K, V> gracePeriodAggProcessorNode = new GracePeriodGraphNode<K, V>(aggFunctionName, new ProcessorParameters<K, V, KR, VR>(aggregateSupplier, aggFunctionName), gracePeriod);
        gracePeriodAggProcessorNode.setOutputVersioned(isOutputVersioned);
        return this.build(aggFunctionName, storeName, aggregateSupplier, gracePeriodAggProcessorNode, queryableStoreName, keySerde, valueSerde);
    }

    private <KR, VR> KTable<KR, VR> build(String aggFunctionName, String storeName, KStreamAggProcessorSupplier<K, V, KR, VR> aggregateSupplier, ProcessorGraphNode<K, V> aggProcessorNode, String queryableStoreName, Serde<KR> keySerde, Serde<VR> valueSerde) {
        if (queryableStoreName != null && !queryableStoreName.equals(storeName)) {
            throw new IllegalStateException(String.format("queryableStoreName should be null or equal to storeName but got storeName='%s' and queryableStoreName='%s'", storeName, queryableStoreName));
        }
        String sourceName = this.name;
        GraphNode parentNode = this.graphNode;
        if (this.repartitionRequired) {
            OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
            String repartitionTopicPrefix = this.userProvidedRepartitionTopicName != null ? this.userProvidedRepartitionTopicName : storeName;
            sourceName = this.createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder, this.userProvidedRepartitionTopicName != null || queryableStoreName != null);
            if (this.repartitionNode == null || this.userProvidedRepartitionTopicName == null) {
                this.repartitionNode = repartitionNodeBuilder.build();
            }
            this.builder.addGraphNode(parentNode, this.repartitionNode);
            parentNode = this.repartitionNode;
        }
        this.builder.addGraphNode(parentNode, aggProcessorNode);
        return new KTableImpl(aggFunctionName, keySerde, valueSerde, sourceName.equals(this.name) ? this.subTopologySourceNodes : Collections.singleton(sourceName), queryableStoreName, aggregateSupplier, aggProcessorNode, this.builder);
    }

    private String createRepartitionSource(String repartitionTopicNamePrefix, OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder, boolean isRepartitionTopicNameProvidedByUser) {
        return KStreamImpl.createRepartitionedSource(this.builder, this.keySerde, this.valueSerde, repartitionTopicNamePrefix, null, optimizableRepartitionNodeBuilder, isRepartitionTopicNameProvidedByUser);
    }
}

