package alluxio.client.block.stream;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.client.block.stream.PacketReader;
import alluxio.client.file.FileSystemContext;
import alluxio.network.protocol.RPCMessageDecoder;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.Status;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataNettyBufferV2;
import alluxio.proto.dataserver.Protocol;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/NettyPacketReader.class */
public final class NettyPacketReader implements PacketReader {
    private static final Logger LOG = LoggerFactory.getLogger("alluxio.logger.type");
    private static final boolean CANCEL_ENABLED = Configuration.getBoolean(PropertyKey.USER_NETWORK_NETTY_READER_CANCEL_ENABLED);
    private static final int MAX_PACKETS_IN_FLIGHT = Configuration.getInt(PropertyKey.USER_NETWORK_NETTY_READER_BUFFER_SIZE_PACKETS);
    private static final long READ_TIMEOUT_MS = Configuration.getLong(PropertyKey.USER_NETWORK_NETTY_TIMEOUT_MS);
    private final FileSystemContext mContext;
    private final Channel mChannel;
    private final Protocol.RequestType mRequestType;
    private final InetSocketAddress mAddress;
    private final long mId;
    private final long mStart;
    private final long mBytesToRead;
    private final ReentrantLock mLock;

    @GuardedBy("mLock")
    private final Queue<ByteBuf> mPackets;

    @GuardedBy("mLock")
    private Throwable mPacketReaderException;
    private final Condition mNotEmptyOrFailed;
    private long mPosToRead;
    private boolean mDone;
    private boolean mClosed;

    /* loaded from: input_file:alluxio/client/block/stream/NettyPacketReader$Factory.class */
    public static class Factory implements PacketReader.Factory {
        private final FileSystemContext mContext;
        private final InetSocketAddress mAddress;
        private final long mId;
        private final long mLockId;
        private final long mSessionId;
        private final Protocol.RequestType mRequestType;

        public Factory(FileSystemContext fileSystemContext, InetSocketAddress inetSocketAddress, long j, long j2, long j3, Protocol.RequestType requestType) {
            this.mContext = fileSystemContext;
            this.mAddress = inetSocketAddress;
            this.mId = j;
            this.mLockId = j2;
            this.mSessionId = j3;
            this.mRequestType = requestType;
        }

        @Override // alluxio.client.block.stream.PacketReader.Factory
        public PacketReader create(long j, long j2) throws IOException {
            return new NettyPacketReader(this.mContext, this.mAddress, this.mId, j, j2, this.mLockId, this.mSessionId, this.mRequestType);
        }
    }

    /* loaded from: input_file:alluxio/client/block/stream/NettyPacketReader$PacketReadHandler.class */
    private class PacketReadHandler extends ChannelInboundHandlerAdapter {
        static final /* synthetic */ boolean $assertionsDisabled;

        public PacketReadHandler() {
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
            ByteBuf byteBuf;
            Preconditions.checkState(acceptMessage(obj), "Incorrect response type %s, %s.", obj.getClass().getCanonicalName(), obj);
            RPCProtoMessage rPCProtoMessage = (RPCProtoMessage) obj;
            Protocol.Status status = ((Protocol.Response) rPCProtoMessage.getMessage()).getStatus();
            if (!Status.isOk(status)) {
                channelHandlerContext.fireExceptionCaught(new IOException(String.format("Failed to read block %d from %s with status %s.", Long.valueOf(NettyPacketReader.this.mId), NettyPacketReader.this.mAddress, status.toString())));
            }
            NettyPacketReader.this.mLock.lock();
            try {
                Preconditions.checkState(NettyPacketReader.this.mPacketReaderException == null);
                DataBuffer payloadDataBuffer = rPCProtoMessage.getPayloadDataBuffer();
                if (payloadDataBuffer == null) {
                    byteBuf = channelHandlerContext.alloc().buffer(0, 0);
                } else {
                    Preconditions.checkState(payloadDataBuffer.getLength() > 0);
                    if (!$assertionsDisabled && !(payloadDataBuffer.getNettyOutput() instanceof ByteBuf)) {
                        throw new AssertionError();
                    }
                    byteBuf = (ByteBuf) payloadDataBuffer.getNettyOutput();
                }
                NettyPacketReader.this.mPackets.offer(byteBuf);
                NettyPacketReader.this.mNotEmptyOrFailed.signal();
                if (NettyPacketReader.this.tooManyPacketsPending()) {
                    NettyPacketReader.this.pause();
                }
            } finally {
                NettyPacketReader.this.mLock.unlock();
            }
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            NettyPacketReader.LOG.error("Exception caught while reading response from netty channel.", th);
            NettyPacketReader.this.mLock.lock();
            try {
                NettyPacketReader.this.mPacketReaderException = th;
                NettyPacketReader.this.mNotEmptyOrFailed.signal();
                NettyPacketReader.this.mLock.unlock();
                channelHandlerContext.close();
            } catch (Throwable th2) {
                NettyPacketReader.this.mLock.unlock();
                throw th2;
            }
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
        public void channelUnregistered(ChannelHandlerContext channelHandlerContext) {
            NettyPacketReader.this.mLock.lock();
            try {
                if (NettyPacketReader.this.mPacketReaderException == null) {
                    NettyPacketReader.this.mPacketReaderException = new IOException("ChannelClosed");
                }
                NettyPacketReader.this.mNotEmptyOrFailed.signal();
                NettyPacketReader.this.mLock.unlock();
                channelHandlerContext.fireChannelUnregistered();
            } catch (Throwable th) {
                NettyPacketReader.this.mLock.unlock();
                throw th;
            }
        }

        private boolean acceptMessage(Object obj) {
            if (obj instanceof RPCProtoMessage) {
                return ((RPCProtoMessage) obj).getMessage() instanceof Protocol.Response;
            }
            return false;
        }

        static {
            $assertionsDisabled = !NettyPacketReader.class.desiredAssertionStatus();
        }
    }

    private NettyPacketReader(FileSystemContext fileSystemContext, InetSocketAddress inetSocketAddress, long j, long j2, long j3, long j4, long j5, Protocol.RequestType requestType) throws IOException {
        this.mLock = new ReentrantLock();
        this.mPackets = new LinkedList();
        this.mPacketReaderException = null;
        this.mNotEmptyOrFailed = this.mLock.newCondition();
        this.mDone = false;
        this.mClosed = false;
        Preconditions.checkArgument(j2 >= 0 && j3 > 0);
        this.mContext = fileSystemContext;
        this.mAddress = inetSocketAddress;
        this.mId = j;
        this.mStart = j2;
        this.mPosToRead = j2;
        this.mBytesToRead = j3;
        this.mRequestType = requestType;
        this.mChannel = fileSystemContext.acquireNettyChannel(inetSocketAddress);
        ChannelPipeline pipeline = this.mChannel.pipeline();
        if (!(pipeline.last() instanceof RPCMessageDecoder)) {
            throw new RuntimeException(String.format("Channel pipeline has unexpected handlers %s.", pipeline.last().getClass().getCanonicalName()));
        }
        this.mChannel.pipeline().addLast(new PacketReadHandler());
        this.mChannel.writeAndFlush(new RPCProtoMessage(Protocol.ReadRequest.newBuilder().setId(j).setOffset(j2).setLength(j3).setLockId(j4).setSessionId(j5).setType(requestType).build())).addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE_ON_FAILURE);
    }

    @Override // alluxio.client.block.stream.PacketReader
    public long pos() {
        return this.mPosToRead;
    }

    @Override // alluxio.client.block.stream.PacketReader
    public DataBuffer readPacket() throws IOException {
        Preconditions.checkState(!this.mClosed, "PacketReader is closed while reading packets.");
        ByteBuf byteBuf = null;
        this.mLock.lock();
        do {
            try {
                try {
                    if (this.mDone) {
                        return null;
                    }
                    if (this.mPacketReaderException != null) {
                        throw new IOException(this.mPacketReaderException);
                    }
                    byteBuf = this.mPackets.poll();
                    if (!tooManyPacketsPending()) {
                        resume();
                    }
                    if (byteBuf != null) {
                        if (byteBuf.readableBytes() == 0) {
                            byteBuf.release();
                            this.mDone = true;
                            this.mLock.unlock();
                            return null;
                        }
                        this.mPosToRead += byteBuf.readableBytes();
                        Preconditions.checkState(this.mPosToRead - this.mStart <= this.mBytesToRead);
                        DataNettyBufferV2 dataNettyBufferV2 = new DataNettyBufferV2(byteBuf);
                        this.mLock.unlock();
                        return dataNettyBufferV2;
                    }
                    try {
                    } catch (InterruptedException e) {
                        throw Throwables.propagate(e);
                    }
                } catch (Throwable th) {
                    if (byteBuf != null) {
                        byteBuf.release();
                    }
                    throw th;
                }
            } finally {
                this.mLock.unlock();
            }
        } while (this.mNotEmptyOrFailed.await(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS));
        throw new IOException(String.format("Timeout while reading packet from block %d @ %s.", Long.valueOf(this.mId), this.mAddress));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            if (this.mDone) {
                if (this.mChannel.isOpen()) {
                    Preconditions.checkState(this.mChannel.pipeline().last() instanceof PacketReadHandler);
                    this.mChannel.pipeline().removeLast();
                    resume();
                }
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                this.mClosed = true;
                return;
            }
            if (!this.mChannel.isOpen()) {
                if (this.mChannel.isOpen()) {
                    Preconditions.checkState(this.mChannel.pipeline().last() instanceof PacketReadHandler);
                    this.mChannel.pipeline().removeLast();
                    resume();
                }
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                this.mClosed = true;
                return;
            }
            try {
                if (!CANCEL_ENABLED) {
                    this.mChannel.close().sync2();
                    if (this.mChannel.isOpen()) {
                        Preconditions.checkState(this.mChannel.pipeline().last() instanceof PacketReadHandler);
                        this.mChannel.pipeline().removeLast();
                        resume();
                    }
                    this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                    this.mClosed = true;
                    return;
                }
                if (remaining() > 0) {
                    this.mChannel.writeAndFlush(new RPCProtoMessage(Protocol.ReadRequest.newBuilder().setId(this.mId).setCancel(true).setType(this.mRequestType).build())).addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE_ON_FAILURE);
                }
                while (true) {
                    try {
                        DataBuffer readPacket = readPacket();
                        if (readPacket == null) {
                            break;
                        } else {
                            readPacket.release();
                        }
                    } catch (IOException e) {
                        LOG.warn("Failed to close the NettyBlockReader (block: {}, address: {}).", Long.valueOf(this.mId), this.mAddress, e);
                        try {
                            this.mChannel.close().sync2();
                            if (this.mChannel.isOpen()) {
                                Preconditions.checkState(this.mChannel.pipeline().last() instanceof PacketReadHandler);
                                this.mChannel.pipeline().removeLast();
                                resume();
                            }
                            this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                            this.mClosed = true;
                            return;
                        } catch (InterruptedException e2) {
                            throw Throwables.propagate(e2);
                        }
                    }
                }
                if (this.mChannel.isOpen()) {
                    Preconditions.checkState(this.mChannel.pipeline().last() instanceof PacketReadHandler);
                    this.mChannel.pipeline().removeLast();
                    resume();
                }
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                this.mClosed = true;
            } catch (InterruptedException e3) {
                this.mChannel.close();
                throw Throwables.propagate(e3);
            }
        } catch (Throwable th) {
            if (this.mChannel.isOpen()) {
                Preconditions.checkState(this.mChannel.pipeline().last() instanceof PacketReadHandler);
                this.mChannel.pipeline().removeLast();
                resume();
            }
            this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
            this.mClosed = true;
            throw th;
        }
    }

    private long remaining() {
        return (this.mStart + this.mBytesToRead) - this.mPosToRead;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean tooManyPacketsPending() {
        return this.mPackets.size() >= MAX_PACKETS_IN_FLIGHT;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void pause() {
        this.mChannel.config().setAutoRead(false);
    }

    private void resume() {
        if (this.mChannel.config().isAutoRead()) {
            return;
        }
        this.mChannel.config().setAutoRead(true);
        this.mChannel.read();
    }
}
