package alluxio.client.block.stream;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.client.file.FileSystemContext;
import alluxio.network.protocol.RPCMessageDecoder;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.Status;
import alluxio.network.protocol.databuffer.DataNettyBufferV2;
import alluxio.proto.dataserver.Protocol;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/NettyPacketWriter.class */
public final class NettyPacketWriter implements PacketWriter {
    private static final Logger LOG = LoggerFactory.getLogger("alluxio.logger.type");
    private static final long PACKET_SIZE = Configuration.getBytes(PropertyKey.USER_NETWORK_NETTY_WRITER_PACKET_SIZE_BYTES);
    private static final int MAX_PACKETS_IN_FLIGHT = Configuration.getInt(PropertyKey.USER_NETWORK_NETTY_WRITER_BUFFER_SIZE_PACKETS);
    private static final long WRITE_TIMEOUT_MS = Configuration.getLong(PropertyKey.USER_NETWORK_NETTY_TIMEOUT_MS);
    private final FileSystemContext mContext;
    private final Channel mChannel;
    private final InetSocketAddress mAddress;
    private final long mId;
    private final long mSessionId;
    private final int mTier;
    private final Protocol.RequestType mRequestType;
    private final long mLength;
    private boolean mClosed = false;
    private ReentrantLock mLock = new ReentrantLock();

    @GuardedBy("mLock")
    private long mPosToWrite = 0;

    @GuardedBy("mLock")
    private long mPosToQueue = 0;

    @GuardedBy("mLock")
    private Throwable mPacketWriteException = null;

    @GuardedBy("mLock")
    private boolean mDone = false;

    @GuardedBy("mLock")
    private boolean mEOFSent = false;
    private Condition mDoneOrFailed = this.mLock.newCondition();
    private Condition mBufferNotFullOrFailed = this.mLock.newCondition();
    private Condition mBufferEmptyOrFailed = this.mLock.newCondition();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:alluxio/client/block/stream/NettyPacketWriter$PacketWriteHandler.class */
    public final class PacketWriteHandler extends ChannelInboundHandlerAdapter {
        public PacketWriteHandler() {
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws IOException {
            Preconditions.checkState(acceptMessage(obj), "Incorrect response type.");
            Protocol.Status status = ((Protocol.Response) ((RPCProtoMessage) obj).getMessage()).getStatus();
            if (!Status.isOk(status)) {
                throw new IOException(String.format("Failed to write block %d from %s with status %s.", Long.valueOf(NettyPacketWriter.this.mId), NettyPacketWriter.this.mAddress, status.toString()));
            }
            NettyPacketWriter.this.mLock.lock();
            try {
                NettyPacketWriter.this.mDone = true;
                NettyPacketWriter.this.mDoneOrFailed.signal();
                NettyPacketWriter.this.mLock.unlock();
            } catch (Throwable th) {
                NettyPacketWriter.this.mLock.unlock();
                throw th;
            }
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            NettyPacketWriter.LOG.error("Exception caught while reading response from netty channel {}.", th);
            NettyPacketWriter.this.mLock.lock();
            try {
                NettyPacketWriter.this.mPacketWriteException = th;
                NettyPacketWriter.this.mBufferNotFullOrFailed.signal();
                NettyPacketWriter.this.mDoneOrFailed.signal();
                NettyPacketWriter.this.mBufferEmptyOrFailed.signal();
                NettyPacketWriter.this.mLock.unlock();
                channelHandlerContext.close();
            } catch (Throwable th2) {
                NettyPacketWriter.this.mLock.unlock();
                throw th2;
            }
        }

        @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
        public void channelUnregistered(ChannelHandlerContext channelHandlerContext) {
            NettyPacketWriter.this.mLock.lock();
            try {
                if (NettyPacketWriter.this.mPacketWriteException == null) {
                    NettyPacketWriter.this.mPacketWriteException = new IOException("Channel closed.");
                }
                NettyPacketWriter.this.mBufferNotFullOrFailed.signal();
                NettyPacketWriter.this.mDoneOrFailed.signal();
                NettyPacketWriter.this.mBufferEmptyOrFailed.signal();
                NettyPacketWriter.this.mLock.unlock();
                channelHandlerContext.fireChannelUnregistered();
            } catch (Throwable th) {
                NettyPacketWriter.this.mLock.unlock();
                throw th;
            }
        }

        private boolean acceptMessage(Object obj) {
            if (obj instanceof RPCProtoMessage) {
                return ((RPCProtoMessage) obj).getMessage() instanceof Protocol.Response;
            }
            return false;
        }
    }

    /* loaded from: input_file:alluxio/client/block/stream/NettyPacketWriter$WriteListener.class */
    private final class WriteListener implements ChannelFutureListener {
        private final long mPosToWriteUncommitted;

        public WriteListener(long j) {
            this.mPosToWriteUncommitted = j;
        }

        @Override // io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) {
            if (!channelFuture.isSuccess()) {
                channelFuture.channel().close();
            }
            boolean z = false;
            NettyPacketWriter.this.mLock.lock();
            try {
                Preconditions.checkState(this.mPosToWriteUncommitted - NettyPacketWriter.this.mPosToWrite <= NettyPacketWriter.PACKET_SIZE, "Some packet is not acked.");
                Preconditions.checkState(this.mPosToWriteUncommitted <= NettyPacketWriter.this.mLength);
                NettyPacketWriter.access$902(NettyPacketWriter.this, this.mPosToWriteUncommitted);
                if (channelFuture.cause() != null) {
                    NettyPacketWriter.this.mPacketWriteException = channelFuture.cause();
                    NettyPacketWriter.this.mDoneOrFailed.signal();
                    NettyPacketWriter.this.mBufferNotFullOrFailed.signal();
                    NettyPacketWriter.this.mBufferEmptyOrFailed.signal();
                    NettyPacketWriter.this.mLock.unlock();
                    return;
                }
                if (NettyPacketWriter.this.mPosToWrite == NettyPacketWriter.this.mPosToQueue) {
                    NettyPacketWriter.this.mBufferEmptyOrFailed.signal();
                }
                if (!NettyPacketWriter.this.tooManyPacketsInFlight()) {
                    NettyPacketWriter.this.mBufferNotFullOrFailed.signal();
                }
                if (NettyPacketWriter.this.mPosToWrite == NettyPacketWriter.this.mLength) {
                    z = true;
                }
                if (z) {
                    NettyPacketWriter.this.sendEOF();
                }
            } finally {
                NettyPacketWriter.this.mLock.unlock();
            }
        }
    }

    public NettyPacketWriter(FileSystemContext fileSystemContext, InetSocketAddress inetSocketAddress, long j, long j2, long j3, int i, Protocol.RequestType requestType) throws IOException {
        this.mContext = fileSystemContext;
        this.mAddress = inetSocketAddress;
        this.mSessionId = j3;
        this.mId = j;
        this.mRequestType = requestType;
        this.mLength = j2;
        this.mTier = i;
        this.mChannel = fileSystemContext.acquireNettyChannel(inetSocketAddress);
        ChannelPipeline pipeline = this.mChannel.pipeline();
        if (!(pipeline.last() instanceof RPCMessageDecoder)) {
            throw new RuntimeException(String.format("Channel pipeline has unexpected handlers %s (expects RPCMessageDecoder).", pipeline.last().getClass().getCanonicalName()));
        }
        this.mChannel.pipeline().addLast(new PacketWriteHandler());
    }

    @Override // alluxio.client.block.stream.PacketWriter
    public long pos() {
        this.mLock.lock();
        try {
            long j = this.mPosToQueue;
            this.mLock.unlock();
            return j;
        } catch (Throwable th) {
            this.mLock.unlock();
            throw th;
        }
    }

    @Override // alluxio.client.block.stream.PacketWriter
    public void writePacket(ByteBuf byteBuf) throws IOException {
        this.mLock.lock();
        try {
            try {
                Preconditions.checkState((this.mClosed || this.mEOFSent) ? false : true);
                Preconditions.checkArgument(((long) byteBuf.readableBytes()) <= PACKET_SIZE);
                while (this.mPacketWriteException == null) {
                    if (!tooManyPacketsInFlight()) {
                        long j = this.mPosToQueue;
                        this.mPosToQueue += byteBuf.readableBytes();
                        this.mChannel.writeAndFlush(new RPCProtoMessage(Protocol.WriteRequest.newBuilder().setId(this.mId).setOffset(j).setSessionId(this.mSessionId).setTier(this.mTier).setType(this.mRequestType).build(), new DataNettyBufferV2(byteBuf))).addListener2((GenericFutureListener<? extends Future<? super Void>>) new WriteListener(j + byteBuf.readableBytes()));
                        return;
                    } else {
                        try {
                            if (!this.mBufferNotFullOrFailed.await(WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
                                throw new IOException(String.format("Timeout to write packet to %d @ %s.", Long.valueOf(this.mId), this.mAddress));
                            }
                        } catch (InterruptedException e) {
                            throw Throwables.propagate(e);
                        }
                    }
                }
                throw new IOException(this.mPacketWriteException);
            } catch (Throwable th) {
                byteBuf.release();
                throw th;
            }
        } finally {
            this.mLock.unlock();
        }
    }

    @Override // alluxio.client.Cancelable
    public void cancel() throws IOException {
        if (this.mClosed) {
            return;
        }
        this.mLock.lock();
        try {
            try {
                this.mPacketWriteException = new IOException("PacketWriter is cancelled.");
                this.mBufferEmptyOrFailed.signal();
                this.mBufferNotFullOrFailed.signal();
                this.mDoneOrFailed.signal();
                Future<Void> sync2 = this.mChannel.close().sync2();
                if (sync2.cause() != null) {
                    throw new IOException(sync2.cause());
                }
            } catch (InterruptedException e) {
                throw Throwables.propagate(e);
            }
        } finally {
            this.mLock.unlock();
        }
    }

    @Override // alluxio.client.block.stream.PacketWriter
    public void flush() throws IOException {
        this.mChannel.flush();
        this.mLock.lock();
        do {
            try {
                try {
                    if (this.mPosToWrite == this.mPosToQueue) {
                        return;
                    }
                    if (this.mPacketWriteException != null) {
                        throw new IOException(this.mPacketWriteException);
                    }
                } catch (InterruptedException e) {
                    throw Throwables.propagate(e);
                }
            } finally {
                this.mLock.unlock();
            }
        } while (this.mBufferEmptyOrFailed.await(WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS));
        throw new IOException(String.format("Timeout to flush packets to %d @ %s.", Long.valueOf(this.mId), this.mAddress));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        sendEOF();
        this.mLock.lock();
        do {
            try {
                if (this.mDone) {
                    return;
                }
                try {
                    if (this.mPacketWriteException != null) {
                        this.mChannel.close().sync2();
                        throw new IOException(this.mPacketWriteException);
                    }
                } catch (InterruptedException e) {
                    throw Throwables.propagate(e);
                }
            } finally {
                this.mLock.unlock();
                if (this.mChannel.isOpen()) {
                    Preconditions.checkState(this.mChannel.pipeline().last() instanceof PacketWriteHandler);
                    this.mChannel.pipeline().removeLast();
                }
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                this.mClosed = true;
            }
        } while (this.mDoneOrFailed.await(WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS));
        this.mChannel.close().sync2();
        throw new IOException(String.format("Timeout to close the NettyPacketWriter (block: %d, address: %s).", Long.valueOf(this.mId), this.mAddress));
    }

    public boolean tooManyPacketsInFlight() {
        return this.mPosToQueue - this.mPosToWrite >= ((long) MAX_PACKETS_IN_FLIGHT) * PACKET_SIZE;
    }

    public void sendEOF() {
        this.mLock.lock();
        try {
            if (this.mEOFSent) {
                return;
            }
            this.mEOFSent = true;
            long j = this.mPosToQueue;
            this.mLock.unlock();
            this.mChannel.writeAndFlush(new RPCProtoMessage(Protocol.WriteRequest.newBuilder().setId(this.mId).setOffset(j).setSessionId(this.mSessionId).setTier(this.mTier).setType(this.mRequestType).build(), null)).addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE_ON_FAILURE);
        } finally {
            this.mLock.unlock();
        }
    }

    @Override // alluxio.client.block.stream.PacketWriter
    public int packetSize() {
        return (int) PACKET_SIZE;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: alluxio.client.block.stream.NettyPacketWriter.access$902(alluxio.client.block.stream.NettyPacketWriter, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$902(alluxio.client.block.stream.NettyPacketWriter r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.mPosToWrite = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: alluxio.client.block.stream.NettyPacketWriter.access$902(alluxio.client.block.stream.NettyPacketWriter, long):long");
    }

    static {
    }
}
