package arc.io.reliable;

import arc.exception.ThrowableUtil;
import arc.io.DataIo;
import arc.streams.StreamUtil;
import arc.utils.BufferPool;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;

/* loaded from: input_file:arc/io/reliable/TransportStream.class */
public class TransportStream implements Transport {
    public static final int OFFSET_CHANNEL_ID = 0;
    public static final int OFFSET_MESSAGE_ID = 4;
    public static final int OFFSET_VERSION = 8;
    public static final int OFFSET_TYPE = 10;
    public static final int OFFSET_FLAGS = 11;
    public static final int OFFSET_OFFSET = 12;
    public static final int OFFSET_LENGTH = 20;
    public static final int OFFSET_CRC = 24;
    public static final int OFFSET_MAGIC = 28;
    public static final byte TYPE_DATA = 1;
    public static final byte TYPE_ACK = 2;
    public static final byte TYPE_DISCONNECT = 3;
    public static final byte TYPE_ERROR = 4;
    public static final byte TYPE_DATA_END_SEND = 5;
    public static final byte TYPE_DATA_END_SEND_ACK = 6;
    public static final byte TYPE_DATA_END_RECV = 7;
    public static final byte TYPE_DATA_END_RECV_ACK = 68;
    public static final byte FLAG_RELIABLE = 1;
    public static final int MAGIC = -1346243638;
    public static final short VERSION = 1;
    public static final int HEADER_SIZE = 32;
    private int _id;
    private InputStream _is;
    private OutputStream _os;
    private DataAcknowledgeHandler _ack;
    private TransportMonitor _tm;
    private byte[] _send;
    private byte[] _recv;
    private boolean _needToReadHeader;
    private long _message;
    private long _offset;
    private boolean _eod;
    private BufferPool _bp;

    public TransportStream(int i, InputStream inputStream, OutputStream outputStream) {
        this(i, inputStream, outputStream, null);
    }

    public TransportStream(int i, InputStream inputStream, OutputStream outputStream, DataAcknowledgeHandler dataAcknowledgeHandler) {
        this._needToReadHeader = true;
        this._message = -1L;
        this._offset = -1L;
        this._eod = false;
        this._id = i;
        this._is = inputStream;
        this._os = outputStream;
        this._ack = dataAcknowledgeHandler;
        this._send = new byte[32];
        this._recv = new byte[32];
        DataIo.writeInt(this._send, 28, -1346243638L);
        DataIo.writeShort(this._send, 8, (short) 1);
    }

    @Override // arc.io.reliable.Transport
    public int id() {
        return this._id;
    }

    @Override // arc.io.reliable.Consumer
    public void setDataAcknowledgeHandler(DataAcknowledgeHandler dataAcknowledgeHandler) {
        this._ack = dataAcknowledgeHandler;
    }

    @Override // arc.io.reliable.Transport
    public void setTransportMonitor(TransportMonitor transportMonitor) {
        this._tm = transportMonitor;
    }

    @Override // arc.io.reliable.Producer
    public void setBufferPool(BufferPool bufferPool) {
        this._bp = bufferPool;
    }

    @Override // arc.io.reliable.Producer
    public void reset(Log log) throws IOException {
        this._offset = -1L;
        this._eod = false;
        this._needToReadHeader = true;
    }

    @Override // arc.io.reliable.Consumer
    public long maxSentOffset(long j) {
        if (this._message != j) {
            return -1L;
        }
        return this._offset;
    }

    @Override // arc.io.reliable.Consumer
    public boolean send(Log log, Data data) throws IOException {
        if (data.message() < this._message || data.offset() < this._offset) {
            return false;
        }
        DataIo.writeByte(this._send, 10, (byte) 1);
        DataIo.writeUnsignedInt(this._send, 0, data.channel());
        DataIo.writeUnsignedInt(this._send, 4, data.message());
        DataIo.writeLong(this._send, 12, data.offset());
        DataIo.writeInt(this._send, 20, data.bufferLength());
        this._os.write(this._send);
        data.copyTo(this._os);
        this._message = data.message();
        this._offset = data.offset();
        if (data.offset() == Long.MAX_VALUE) {
            this._os.flush();
        }
        if (this._tm == null) {
            return true;
        }
        this._tm.sent(data.offset(), data.bufferLength());
        return true;
    }

    @Override // arc.io.reliable.Consumer
    public void waitToDrain(Log log, long j, long j2, long j3) throws IOException {
    }

    @Override // arc.io.reliable.Consumer
    public void writeEndOfData(Log log, long j, long j2, long j3) throws IOException {
        if (log != null) {
            log.println("[channel=" + j + ", transport=" + id() + "] EOD SENT for message " + j2);
        }
        try {
            writeEndOfDataMarker(log, (byte) 5, j, j2, j3);
            readEndOfDataMarker(log, (byte) 6, j, j2, j3);
            readEndOfDataMarker(log, (byte) 7, j, j2, j3);
            try {
                writeEndOfDataMarker(log, (byte) 68, j, j2, j3);
            } catch (IOException e) {
                if (log != null) {
                    log.println("[channel=" + j + ", transport=" + id() + "] FAILED EOD (RECV ACK): ignoring: for message " + j2 + ": " + e.getMessage());
                }
            }
        } catch (IOException e2) {
            if (log != null) {
                log.println("[channel=" + j + ", transport=" + id() + "] FAILED EOD for message " + j2 + ": " + e2.getMessage());
            }
            throw e2;
        }
    }

    private void writeEndOfDataMarker(Log log, byte b, long j, long j2, long j3) throws IOException {
        DataIo.writeByte(this._send, 10, b);
        DataIo.writeUnsignedInt(this._send, 0, j);
        DataIo.writeUnsignedInt(this._send, 4, j2);
        DataIo.writeLong(this._send, 12, j3);
        DataIo.writeInt(this._send, 20, 0L);
        this._os.write(this._send);
        this._os.flush();
    }

    private void readEndOfDataMarker(Log log, byte b, long j, long j2, long j3) throws IOException {
        StreamUtil.readFully(this._is, this._recv, 0, 32);
        if (b != DataIo.readByte(this._recv, 10)) {
            throw new ExNonRecoverableFailure("Expected EOD ACK, but found data for: " + j3);
        }
        DataIo.readUnsignedInt(this._recv, 0);
        DataIo.readUnsignedInt(this._recv, 4);
        DataIo.readLong(this._recv, 12);
    }

    private void readEndOfDataMarkerACK(Log log, long j, long j2, long j3) throws IOException {
        long nextDataOffset = nextDataOffset(log, true);
        if (nextDataOffset == -4) {
            return;
        }
        if (nextDataOffset != -2) {
            throw new ExNonRecoverableFailure("Expected EOD ACK, but found data for: " + nextDataOffset);
        }
        throw new IOException("Disconnected");
    }

    private boolean handleEndOfDataSend(Log log, long j, long j2, long j3) throws IOException {
        writeEndOfDataMarker(log, (byte) 6, j, j2, j3);
        writeEndOfDataMarker(log, (byte) 7, j, j2, j3);
        try {
            readEndOfDataMarker(log, (byte) 68, j, j2, j3);
            return true;
        } catch (IOException e) {
            return false;
        }
    }

    private void handleEndOfDataRecv(Log log, long j, long j2, long j3) throws IOException {
        writeEndOfDataMarker(log, (byte) 68, j, j2, j3);
    }

    @Override // arc.io.reliable.Consumer
    public void disconnect(Log log) throws IOException {
        DataIo.writeByte(this._send, 10, (byte) 3);
        this._os.write(this._send);
        this._os.flush();
    }

    private boolean readHeader(Log log, boolean z) throws IOException {
        if (!this._needToReadHeader) {
            return true;
        }
        if (!z && this._is.available() < 32) {
            return false;
        }
        if (z) {
            this._os.flush();
        }
        StreamUtil.readFully(this._is, this._recv, 0, 32);
        readHeader(log, DataIo.readUnsignedInt(this._recv, 0), DataIo.readUnsignedInt(this._recv, 4));
        this._needToReadHeader = false;
        return true;
    }

    @Override // arc.io.reliable.Producer
    public long nextDataOffset(Log log, boolean z) throws IOException {
        if (this._eod) {
            return -1L;
        }
        while (1 != 0 && readHeader(log, z)) {
            if (DataIo.readInt(this._recv, 28) != -1346243638) {
                throw new ExNonRecoverableFailure("Invalid magic number found in transport message");
            }
            byte readByte = DataIo.readByte(this._recv, 10);
            switch (readByte) {
                case 1:
                    long readUnsignedInt = DataIo.readUnsignedInt(this._recv, 0);
                    long readUnsignedInt2 = DataIo.readUnsignedInt(this._recv, 4);
                    long readLong = DataIo.readLong(this._recv, 12);
                    readData(log, readUnsignedInt, readUnsignedInt2, readLong, DataIo.readInt(this._recv, 20));
                    if (readLong == Long.MAX_VALUE) {
                        return -1L;
                    }
                    return readLong;
                case 2:
                    this._needToReadHeader = true;
                    long readUnsignedInt3 = DataIo.readUnsignedInt(this._recv, 0);
                    long readUnsignedInt4 = DataIo.readUnsignedInt(this._recv, 4);
                    long readLong2 = DataIo.readLong(this._recv, 12);
                    int readInt = DataIo.readInt(this._recv, 20);
                    readACK(log, readUnsignedInt3, readUnsignedInt4, readLong2, readInt);
                    if (this._ack != null) {
                        this._ack.acknowledge(readUnsignedInt3, readUnsignedInt4, readLong2, readInt);
                    }
                    if (this._tm != null) {
                        this._tm.acknowledged(readLong2, readInt);
                    }
                    if (readLong2 == Long.MAX_VALUE) {
                        return -4L;
                    }
                case 3:
                    this._needToReadHeader = true;
                    return -2L;
                case 4:
                    this._needToReadHeader = true;
                    byte[] bArr = new byte[DataIo.readInt(this._recv, 20)];
                    if (StreamUtil.readFully(this._is, bArr, 0, bArr.length, false)) {
                        throw new ExNonRecoverableFailure(new String(bArr, "UTF-8"));
                    }
                    throw new ExNonRecoverableFailure("Error was generated whilst recieving data");
                case 5:
                    long readUnsignedInt5 = DataIo.readUnsignedInt(this._recv, 0);
                    long readUnsignedInt6 = DataIo.readUnsignedInt(this._recv, 4);
                    long readLong3 = DataIo.readLong(this._recv, 12);
                    this._needToReadHeader = true;
                    if (!handleEndOfDataSend(log, readUnsignedInt5, readUnsignedInt6, readLong3)) {
                        return -2L;
                    }
                    this._eod = true;
                    return -1L;
                default:
                    throw new IOException("Expected an acknowledgment, found message of type: " + ((int) readByte));
            }
        }
        return -3L;
    }

    @Override // arc.io.reliable.Producer
    public synchronized long messageId() throws IOException {
        return DataIo.readUnsignedInt(this._recv, 4);
    }

    @Override // arc.io.reliable.Producer
    public int dataLength() throws IOException {
        return DataIo.readInt(this._recv, 20);
    }

    @Override // arc.io.reliable.Producer
    public Data nextData(Log log, int i, BufferPool bufferPool, final DataDiscardListener dataDiscardListener) throws IOException {
        long nextDataOffset = nextDataOffset(log, true);
        if (nextDataOffset == -1) {
            return null;
        }
        long readUnsignedInt = DataIo.readUnsignedInt(this._recv, 0);
        long readUnsignedInt2 = DataIo.readUnsignedInt(this._recv, 4);
        int readInt = DataIo.readInt(this._recv, 20);
        if (i >= readInt) {
            StreamedData streamedData = new StreamedData(log, readUnsignedInt, readUnsignedInt2, nextDataOffset, this, this._is, readInt, bufferPool) { // from class: arc.io.reliable.TransportStream.1
                @Override // arc.io.reliable.StreamedData, arc.io.reliable.Data
                public void discard() throws IOException {
                    super.discard();
                    if (dataDiscardListener != null) {
                        dataDiscardListener.discarded(this);
                    }
                }
            };
            streamedData.setTransport(this);
            return streamedData;
        }
        BufferPool.Buffer buffer = this._bp.get(readInt);
        try {
            StreamUtil.readFully(this._is, buffer.array());
            writeACK(log, readUnsignedInt, readUnsignedInt2, nextDataOffset, readInt);
            BufferedData bufferedData = new BufferedData(readUnsignedInt, readUnsignedInt2, nextDataOffset, buffer, readInt) { // from class: arc.io.reliable.TransportStream.2
                @Override // arc.io.reliable.BufferedData, arc.io.reliable.Data
                public void discard() {
                    super.discard();
                    if (dataDiscardListener != null) {
                        dataDiscardListener.discarded(this);
                    }
                }
            };
            bufferedData.setTransport(this);
            return bufferedData;
        } catch (IOException e) {
            buffer.discard();
            throw e;
        }
    }

    @Override // arc.io.reliable.Producer
    public long waitForACK(Log log, long j, long j2) throws IOException {
        readHeader(log, true);
        if (DataIo.readInt(this._recv, 28) != -1346243638) {
            throw new ExNonRecoverableFailure("Invalid magic number found in transport message");
        }
        byte readByte = DataIo.readByte(this._recv, 10);
        switch (readByte) {
            case 2:
                long readUnsignedInt = DataIo.readUnsignedInt(this._recv, 0);
                long readUnsignedInt2 = DataIo.readUnsignedInt(this._recv, 4);
                long readLong = DataIo.readLong(this._recv, 12);
                int readInt = DataIo.readInt(this._recv, 20);
                this._needToReadHeader = true;
                readACK(log, readUnsignedInt, readUnsignedInt2, readLong, readInt);
                if (this._ack != null) {
                    this._ack.acknowledge(readUnsignedInt, readUnsignedInt2, readLong, readInt);
                }
                if (this._tm != null) {
                    this._tm.acknowledged(readLong, readInt);
                }
                return readLong;
            case 3:
                throw new IOException("Disconnect");
            default:
                throw new ExNonRecoverableFailure("Excpected ACK, found: " + ((int) readByte));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void writeACK(Log log, long j, long j2, long j3, int i) throws IOException {
        DataIo.writeByte(this._send, 10, (byte) 2);
        DataIo.writeUnsignedInt(this._send, 0, j);
        DataIo.writeUnsignedInt(this._send, 4, j2);
        DataIo.writeLong(this._send, 12, j3);
        DataIo.writeInt(this._send, 20, i);
        this._os.write(this._send);
        this._os.flush();
        wroteACK(log, j, j2, j3, i);
        this._needToReadHeader = true;
        if (this._tm != null) {
            this._tm.received(j3, i);
        }
    }

    @Override // arc.io.reliable.Consumer
    public void flush() throws IOException {
        this._os.flush();
    }

    @Override // arc.io.reliable.Consumer
    public List<Data> outstanding() {
        return null;
    }

    @Override // arc.io.reliable.Consumer
    public void writeError(Throwable th) {
        if (this._os == null) {
            return;
        }
        try {
            byte[] bytes = ThrowableUtil.toStringWithStack(th).getBytes("UTF-8");
            DataIo.writeByte(this._send, 10, (byte) 4);
            DataIo.writeLong(this._send, 12, -1L);
            DataIo.writeInt(this._send, 20, bytes.length);
            this._os.write(this._send);
            this._os.write(bytes);
            this._os.flush();
        } catch (Throwable th2) {
        }
    }

    protected void readHeader(Log log, long j, long j2) {
    }

    protected void readData(Log log, long j, long j2, long j3, int i) {
    }

    protected void wroteACK(Log log, long j, long j2, long j3, int i) {
    }

    protected void readACK(Log log, long j, long j2, long j3, int i) {
    }

    @Override // arc.io.reliable.Consumer
    public boolean connected() {
        return true;
    }

    @Override // arc.io.reliable.Consumer
    public int numberOutstanding() {
        return 0;
    }
}
