package reactor.core.publisher;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/reactor-core-3.6.13.jar:reactor/core/publisher/FluxRefCountGrace.class */
public final class FluxRefCountGrace<T> extends Flux<T> implements Scannable, Fuseable {
    final ConnectableFlux<T> source;
    final int n;
    final Duration gracePeriod;
    final Scheduler scheduler;
    RefConnection connection;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/reactor-core-3.6.13.jar:reactor/core/publisher/FluxRefCountGrace$RefConnection.class */
    public static final class RefConnection implements Runnable, Consumer<Disposable> {
        final FluxRefCountGrace<?> parent;
        Disposable timer;
        long subscriberCount;
        boolean connected;
        boolean terminated;
        volatile Disposable sourceDisconnector;
        static final AtomicReferenceFieldUpdater<RefConnection, Disposable> SOURCE_DISCONNECTOR = AtomicReferenceFieldUpdater.newUpdater(RefConnection.class, Disposable.class, "sourceDisconnector");

        RefConnection(FluxRefCountGrace<?> fluxRefCountGrace) {
            this.parent = fluxRefCountGrace;
        }

        boolean isTerminated() {
            Disposable disposable = this.sourceDisconnector;
            return this.terminated || (disposable != null && disposable.isDisposed());
        }

        @Override // java.lang.Runnable
        public void run() {
            this.parent.timeout(this);
        }

        @Override // java.util.function.Consumer
        public void accept(Disposable disposable) {
            OperatorDisposables.replace(SOURCE_DISCONNECTOR, this, disposable);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/reactor-core-3.6.13.jar:reactor/core/publisher/FluxRefCountGrace$RefCountInner.class */
    static final class RefCountInner<T> implements Fuseable.QueueSubscription<T>, InnerOperator<T, T> {
        final CoreSubscriber<? super T> actual;
        final FluxRefCountGrace<T> parent;
        RefConnection connection;
        Subscription s;
        Fuseable.QueueSubscription<T> qs;
        Throwable error;
        static final int MONITOR_SET_FLAG = 536870912;
        static final int TERMINATED_FLAG = 1073741824;
        static final int CANCELLED_FLAG = Integer.MIN_VALUE;
        volatile int state;
        static final AtomicIntegerFieldUpdater<RefCountInner> STATE = AtomicIntegerFieldUpdater.newUpdater(RefCountInner.class, "state");

        RefCountInner(CoreSubscriber<? super T> coreSubscriber, FluxRefCountGrace<T> fluxRefCountGrace) {
            this.actual = coreSubscriber;
            this.parent = fluxRefCountGrace;
        }

        void setRefConnection(RefConnection refConnection) {
            int i;
            this.connection = refConnection;
            this.actual.onSubscribe(this);
            do {
                i = this.state;
                if (isCancelled(i)) {
                    return;
                }
                if (isTerminated(i)) {
                    this.parent.terminated(refConnection);
                    Throwable th = this.error;
                    if (th != null) {
                        this.actual.onError(th);
                        return;
                    } else {
                        this.actual.onComplete();
                        return;
                    }
                }
            } while (!STATE.compareAndSet(this, i, i | 536870912));
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(T t) {
            this.actual.onNext(t);
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            int i;
            this.error = th;
            do {
                i = this.state;
                if (isTerminated(i) || isCancelled(i)) {
                    Operators.onErrorDropped(th, this.actual.currentContext());
                    return;
                }
            } while (!STATE.compareAndSet(this, i, i | 1073741824));
            if (isMonitorSet(i)) {
                this.parent.terminated(this.connection);
                this.actual.onError(th);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            int i;
            do {
                i = this.state;
                if (isTerminated(i) || isCancelled(i)) {
                    return;
                }
            } while (!STATE.compareAndSet(this, i, i | 1073741824));
            if (isMonitorSet(i)) {
                this.parent.terminated(this.connection);
                this.actual.onComplete();
            }
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            this.s.request(j);
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            this.s.cancel();
            int i = this.state;
            if (isTerminated(i) || isCancelled(i) || !STATE.compareAndSet(this, i, i | Integer.MIN_VALUE)) {
                return;
            }
            this.parent.cancel(this.connection);
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (Operators.validate(this.s, subscription)) {
                this.s = subscription;
            }
        }

        @Override // reactor.core.Fuseable.QueueSubscription
        public int requestFusion(int i) {
            if (!(this.s instanceof Fuseable.QueueSubscription)) {
                return 0;
            }
            this.qs = (Fuseable.QueueSubscription) this.s;
            return this.qs.requestFusion(i);
        }

        @Override // java.util.Queue
        @Nullable
        public T poll() {
            return this.qs.poll();
        }

        @Override // java.util.Collection
        public int size() {
            return this.qs.size();
        }

        @Override // java.util.Collection
        public boolean isEmpty() {
            return this.qs.isEmpty();
        }

        @Override // java.util.Collection
        public void clear() {
            this.qs.clear();
        }

        @Override // reactor.core.publisher.InnerProducer
        public CoreSubscriber<? super T> actual() {
            return this.actual;
        }

        @Override // reactor.core.Scannable
        public Object scanUnsafe(Scannable.Attr attr) {
            return attr == Scannable.Attr.RUN_STYLE ? Scannable.Attr.RunStyle.SYNC : attr == Scannable.Attr.PARENT ? this.s : attr == Scannable.Attr.TERMINATED ? Boolean.valueOf(isTerminated(this.state)) : attr == Scannable.Attr.CANCELLED ? Boolean.valueOf(isCancelled(this.state)) : super.scanUnsafe(attr);
        }

        static boolean isTerminated(int i) {
            return (i & 1073741824) == 1073741824;
        }

        static boolean isCancelled(int i) {
            return (i & Integer.MIN_VALUE) == Integer.MIN_VALUE;
        }

        static boolean isMonitorSet(int i) {
            return (i & 536870912) == 536870912;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FluxRefCountGrace(ConnectableFlux<T> connectableFlux, int i, Duration duration, Scheduler scheduler) {
        this.source = ConnectableFlux.from((ConnectableFlux) Objects.requireNonNull(connectableFlux, "source"));
        this.n = i;
        this.gracePeriod = duration;
        this.scheduler = scheduler;
    }

    @Override // reactor.core.publisher.Flux
    public int getPrefetch() {
        return this.source.getPrefetch();
    }

    @Override // reactor.core.Scannable
    @Nullable
    public Object scanUnsafe(Scannable.Attr attr) {
        return attr == Scannable.Attr.PREFETCH ? Integer.valueOf(getPrefetch()) : attr == Scannable.Attr.PARENT ? this.source : attr == Scannable.Attr.RUN_STYLE ? Scannable.Attr.RunStyle.SYNC : attr == InternalProducerAttr.INSTANCE ? true : null;
    }

    @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        RefConnection refConnection;
        RefCountInner refCountInner = new RefCountInner(coreSubscriber, this);
        this.source.subscribe((CoreSubscriber) refCountInner);
        boolean z = false;
        synchronized (this) {
            refConnection = this.connection;
            if (refConnection == null || refConnection.isTerminated()) {
                refConnection = new RefConnection(this);
                this.connection = refConnection;
            }
            long j = refConnection.subscriberCount;
            if (j == 0 && refConnection.timer != null) {
                refConnection.timer.dispose();
            }
            refConnection.subscriberCount = j + 1;
            if (!refConnection.connected && j + 1 == this.n) {
                z = true;
                refConnection.connected = true;
            }
        }
        refCountInner.setRefConnection(refConnection);
        if (z) {
            this.source.connect(refConnection);
        }
    }

    void cancel(RefConnection refConnection) {
        boolean z = false;
        Disposable disposable = null;
        Disposable.Swap swap = null;
        synchronized (this) {
            if (refConnection.terminated) {
                return;
            }
            long j = refConnection.subscriberCount - 1;
            refConnection.subscriberCount = j;
            if (j == 0 && refConnection.connected) {
                if (!this.gracePeriod.isZero()) {
                    swap = Disposables.swap();
                    refConnection.timer = swap;
                    z = true;
                } else if (refConnection == this.connection) {
                    this.connection = null;
                    disposable = RefConnection.SOURCE_DISCONNECTOR.getAndSet(refConnection, Disposables.disposed());
                }
                if (z) {
                    swap.replace(this.scheduler.schedule(refConnection, this.gracePeriod.toNanos(), TimeUnit.NANOSECONDS));
                } else if (disposable != null) {
                    disposable.dispose();
                }
            }
        }
    }

    void terminated(RefConnection refConnection) {
        synchronized (this) {
            if (!refConnection.terminated) {
                refConnection.terminated = true;
                this.connection = null;
            }
        }
    }

    void timeout(RefConnection refConnection) {
        Disposable disposable = null;
        synchronized (this) {
            if (refConnection.subscriberCount == 0 && refConnection == this.connection) {
                this.connection = null;
                disposable = RefConnection.SOURCE_DISCONNECTOR.getAndSet(refConnection, Disposables.disposed());
            }
        }
        if (disposable != null) {
            disposable.dispose();
        }
    }
}
