/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.operators.multi;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
import io.smallrye.mutiny.subscription.BackPressureFailure;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class MultiEmitOnOp<T>
extends AbstractMultiOperator<T, T> {
    private final Executor executor;
    private final int bufferSize;

    public MultiEmitOnOp(Multi<? extends T> upstream, Executor executor, int bufferSize) {
        super(upstream);
        this.executor = executor;
        this.bufferSize = bufferSize;
    }

    @Override
    public void subscribe(MultiSubscriber<? super T> downstream) {
        ParameterValidation.nonNullNpe(downstream, "subscriber");
        this.upstream.subscribe().withSubscriber(new MultiEmitOnProcessor<T>(downstream, this.executor, this.bufferSize));
    }

    static final class MultiEmitOnProcessor<T>
    extends MultiOperatorProcessor<T, T>
    implements Runnable {
        private final Executor executor;
        private final int bufferSize;
        private final Queue<T> queue;
        private volatile boolean cancelled;
        private volatile boolean done;
        private final AtomicReference<Throwable> failure = new AtomicReference();
        private final AtomicInteger wip = new AtomicInteger();
        private final AtomicLong requested = new AtomicLong();
        private long produced;

        MultiEmitOnProcessor(MultiSubscriber<? super T> downstream, Executor executor, int bufferSize) {
            super(downstream);
            this.executor = executor;
            this.bufferSize = bufferSize;
            this.queue = Queues.createMpscArrayQueue(bufferSize);
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            if (this.compareAndSetUpstreamSubscription(null, subscription)) {
                this.downstream.onSubscribe(this);
                subscription.request(this.bufferSize);
            } else {
                subscription.cancel();
            }
        }

        @Override
        public void onItem(T t) {
            if (this.done) {
                return;
            }
            if (!this.queue.offer(t)) {
                this.cancelUpstream();
                this.onFailure(new BackPressureFailure("Queue is full, the upstream didn't enforce the requests"));
                this.done = true;
            } else {
                this.schedule();
            }
        }

        @Override
        public void onFailure(Throwable throwable) {
            if (!this.done || !this.cancelled) {
                this.done = true;
                this.failure.set(throwable);
                this.schedule();
            }
        }

        @Override
        public void onCompletion() {
            if (!this.done || !this.cancelled) {
                this.done = true;
                this.schedule();
            }
        }

        @Override
        public void request(long n) {
            if (n > 0L) {
                if (!this.done || !this.cancelled) {
                    Subscriptions.add(this.requested, n);
                    this.schedule();
                }
            } else {
                this.onFailure(Subscriptions.getInvalidRequestException());
            }
        }

        @Override
        public void cancel() {
            if (this.cancelled) {
                return;
            }
            this.cancelled = true;
            this.cancelUpstream();
            if (this.wip.getAndIncrement() == 0) {
                this.queue.clear();
            }
        }

        void schedule() {
            block3: {
                if (this.wip.getAndIncrement() != 0) {
                    return;
                }
                try {
                    this.executor.execute(this);
                }
                catch (RejectedExecutionException rejected) {
                    Flow.Subscription subscription = this.getAndSetUpstreamSubscription(Subscriptions.CANCELLED);
                    if (subscription == Subscriptions.CANCELLED) break block3;
                    this.done = true;
                    this.cancelUpstream();
                    this.queue.clear();
                    this.downstream.onFailure(rejected);
                    super.cancel();
                }
            }
        }

        @Override
        public void run() {
            int missed = 1;
            Queue<T> q = this.queue;
            long emitted = this.produced;
            while (true) {
                long requests = this.requested.get();
                while (emitted != requests) {
                    boolean empty;
                    boolean wasDone = this.done;
                    T item = q.poll();
                    boolean bl = empty = item == null;
                    if (this.isDoneOrCancelled(wasDone, empty)) {
                        return;
                    }
                    if (empty) break;
                    this.downstream.onItem(item);
                    if (++emitted != (long)this.bufferSize) continue;
                    if (requests != Long.MAX_VALUE) {
                        requests = this.requested.addAndGet(-emitted);
                    }
                    super.request(emitted);
                    emitted = 0L;
                }
                if (emitted == requests && this.isDoneOrCancelled(this.done, q.isEmpty())) {
                    return;
                }
                int w = this.wip.get();
                if (missed == w) {
                    this.produced = emitted;
                    if ((missed = this.wip.addAndGet(-missed)) != 0) continue;
                    break;
                }
                missed = w;
            }
        }

        boolean isDoneOrCancelled(boolean upstreamDone, boolean queueEmpty) {
            if (this.cancelled) {
                this.queue.clear();
                return true;
            }
            Throwable maybeFailure = this.failure.get();
            if (upstreamDone && maybeFailure != null) {
                this.downstream.onFailure(maybeFailure);
                return true;
            }
            if (upstreamDone && queueEmpty) {
                this.downstream.onCompletion();
                return true;
            }
            return false;
        }
    }
}

