/*
 * Decompiled with CFR 0.152.
 */
package reactor.ipc.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPool;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.SucceededFuture;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Publisher;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.MonoSink;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.ReactorNetty;
import reactor.ipc.netty.channel.AbortedException;
import reactor.ipc.netty.channel.ChannelOperations;
import reactor.ipc.netty.channel.ContextHandler;
import reactor.ipc.netty.options.ClientOptions;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

final class PooledClientContextHandler<CHANNEL extends Channel>
extends ContextHandler<CHANNEL>
implements GenericFutureListener<Future<CHANNEL>> {
    static final Logger log = Loggers.getLogger(PooledClientContextHandler.class);
    final ClientOptions clientOptions;
    final boolean secure;
    final ChannelPool pool;
    final DirectProcessor<Void> onReleaseEmitter;
    volatile Future<CHANNEL> future;
    static final AtomicReferenceFieldUpdater<PooledClientContextHandler, Future> FUTURE = AtomicReferenceFieldUpdater.newUpdater(PooledClientContextHandler.class, Future.class, "future");
    static final Future<?> DISPOSED = new SucceededFuture(null, null);

    PooledClientContextHandler(ChannelOperations.OnNew<CHANNEL> channelOpFactory, ClientOptions options, MonoSink<NettyContext> sink, LoggingHandler loggingHandler, boolean secure, SocketAddress providedAddress, ChannelPool pool) {
        super(channelOpFactory, options, sink, loggingHandler, providedAddress);
        this.clientOptions = options;
        this.secure = secure;
        this.pool = pool;
        this.onReleaseEmitter = DirectProcessor.create();
    }

    @Override
    public void fireContextActive(NettyContext context) {
        if (!this.fired) {
            this.fired = true;
            if (context != null) {
                this.sink.success((Object)context);
            } else {
                this.sink.success();
            }
        }
    }

    @Override
    public void setFuture(Future<?> future) {
        Future<CHANNEL> f;
        Objects.requireNonNull(future, "future");
        do {
            if ((f = this.future) != DISPOSED) continue;
            if (log.isDebugEnabled()) {
                log.debug("Cancelled existing channel from pool: {}", new Object[]{this.pool.toString()});
            }
            this.sink.success();
            return;
        } while (!FUTURE.compareAndSet(this, f, future));
        if (log.isDebugEnabled()) {
            log.debug("Acquiring existing channel from pool: {} {}", new Object[]{future, this.pool.toString()});
        }
        future.addListener((GenericFutureListener)this);
    }

    @Override
    protected void terminateChannel(Channel channel) {
        this.release(channel);
    }

    public void operationComplete(Future<CHANNEL> future) throws Exception {
        if (future.isCancelled()) {
            if (log.isDebugEnabled()) {
                log.debug("Cancelled {}", new Object[]{future.toString()});
            }
            return;
        }
        if (DISPOSED == this.future) {
            if (log.isDebugEnabled()) {
                log.debug("Dropping acquisition {} because of asynchronous user cancellation", new Object[]{future});
            }
            if (future.isSuccess()) {
                this.disposeOperationThenRelease((Channel)future.get());
            }
            this.sink.success();
            return;
        }
        if (!future.isSuccess()) {
            if (future.cause() != null) {
                this.fireContextError(future.cause());
            } else {
                this.fireContextError(new AbortedException("error while acquiring connection"));
            }
            return;
        }
        Channel c = (Channel)future.get();
        if (c.eventLoop().inEventLoop()) {
            this.connectOrAcquire(c);
        } else {
            c.eventLoop().execute(() -> this.connectOrAcquire(c));
        }
    }

    @Override
    protected Publisher<Void> onCloseOrRelease(Channel channel) {
        return this.onReleaseEmitter;
    }

    final void connectOrAcquire(CHANNEL c) {
        if (DISPOSED == this.future) {
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(c, "Dropping acquisition because of asynchronous user cancellation"));
            }
            this.disposeOperationThenRelease(c);
            this.sink.success();
            return;
        }
        if (!c.isActive()) {
            if (log.isDebugEnabled()) {
                log.debug(ReactorNetty.format(c, "Immediately aborted pooled channel, re-acquiring new channel"));
            }
            this.setFuture(this.pool.acquire());
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(c, "Acquired active channel"));
        }
        if (this.createOperations((Channel)c, null) == null) {
            this.setFuture(this.pool.acquire());
        }
    }

    public void dispose() {
        Future<?> f = FUTURE.getAndSet(this, DISPOSED);
        if (f == null || f == DISPOSED) {
            return;
        }
        if (!f.isDone()) {
            return;
        }
        try {
            Channel c = (Channel)f.get();
            if (!c.eventLoop().inEventLoop()) {
                c.eventLoop().execute(() -> this.disposeOperationThenRelease(c));
            } else {
                this.disposeOperationThenRelease(c);
            }
        }
        catch (Exception e) {
            log.error("Failed releasing channel", (Throwable)e);
            this.onReleaseEmitter.onError((Throwable)e);
        }
    }

    final void disposeOperationThenRelease(CHANNEL c) {
        ChannelOperations<?, ?> ops = ChannelOperations.get(c);
        if (ops != null) {
            ops.inbound.cancel();
            return;
        }
        this.release(c);
    }

    final void release(CHANNEL c) {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(c, "Releasing channel"));
        }
        if (!NettyContext.isPersistent(c) && c.isActive()) {
            c.close();
            this.onReleaseEmitter.onComplete();
            return;
        }
        if (!c.isActive()) {
            this.onReleaseEmitter.onComplete();
            return;
        }
        this.pool.release(c).addListener(f -> {
            if (log.isDebugEnabled() && !f.isSuccess()) {
                log.debug(ReactorNetty.format(c, "Failed cleaning the channel from pool"), f.cause());
            }
            this.onReleaseEmitter.onComplete();
        });
    }

    @Override
    protected void doDropped(Channel channel) {
        this.dispose();
        this.fireContextError(new AbortedException("Channel has been dropped"));
    }

    @Override
    protected void doPipeline(Channel ch) {
    }

    @Override
    protected Tuple2<String, Integer> getSNI() {
        if (this.providedAddress instanceof InetSocketAddress) {
            InetSocketAddress ipa = (InetSocketAddress)this.providedAddress;
            return Tuples.of((Object)ipa.getHostString(), (Object)ipa.getPort());
        }
        return null;
    }
}

