/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft;

import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.KafkaRaftClient;
import org.apache.kafka.raft.RaftMessage;
import org.apache.kafka.raft.RaftRequest;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.util.ShutdownableThread;
import org.slf4j.Logger;

public class KafkaRaftClientDriver<T>
extends ShutdownableThread {
    private final KafkaRaftClient<T> client;
    private final Logger log;
    private final FaultHandler fatalFaultHandler;

    public KafkaRaftClientDriver(KafkaRaftClient<T> client, String threadNamePrefix, FaultHandler fatalFaultHandler, LogContext logContext) {
        super(threadNamePrefix + "-io-thread", false);
        this.client = client;
        this.fatalFaultHandler = fatalFaultHandler;
        this.log = logContext.logger(KafkaRaftClientDriver.class);
    }

    public void doWork() {
        try {
            this.client.poll();
        }
        catch (Throwable t) {
            throw this.fatalFaultHandler.handleFault("Unexpected error in raft IO thread", t);
        }
    }

    public boolean initiateShutdown() {
        if (super.initiateShutdown()) {
            this.client.shutdown(5000).whenComplete((na, exception) -> {
                if (exception != null) {
                    this.log.error("Graceful shutdown of RaftClient failed", exception);
                } else {
                    this.log.info("Completed graceful shutdown of RaftClient");
                }
            });
            return true;
        }
        return false;
    }

    public void shutdown() throws InterruptedException {
        try {
            super.shutdown();
        }
        finally {
            this.client.close();
        }
    }

    public boolean isRunning() {
        return this.client.isRunning() && !this.isThreadFailed();
    }

    public CompletableFuture<ApiMessage> handleRequest(RequestContext context, RequestHeader header, ApiMessage request, long createdTimeMs) {
        RaftRequest.Inbound inboundRequest = new RaftRequest.Inbound(context.listenerName, header.correlationId(), header.apiVersion(), request, createdTimeMs);
        this.client.handle(inboundRequest);
        return inboundRequest.completion.thenApply(RaftMessage::data);
    }

    public KafkaRaftClient<T> client() {
        return this.client;
    }
}

