package arc.io.reliable;

import arc.utils.BufferPool;
import arc.utils.ListUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang.StringUtils;

/* loaded from: input_file:arc/io/reliable/ReliableTransport.class */
public class ReliableTransport implements Transport, DataAcknowledgeHandler {
    public static final int DEFAULT_MAX_OUTSTANDING_ACKS = Integer.MAX_VALUE;
    private Transport _t;
    private int _id;
    private DataAcknowledgeHandler _ack;
    private LinkedList<Data> _sent;
    private int _maxOutstandingACKs;

    public ReliableTransport(int i, Transport transport) {
        this(i, transport, Integer.MAX_VALUE);
    }

    public ReliableTransport(int i, Transport transport, int i2) {
        this._sent = null;
        this._id = i;
        this._t = transport;
        this._t.setDataAcknowledgeHandler(this);
        if (i2 > 0) {
            this._maxOutstandingACKs = i2;
            this._sent = new LinkedList<>();
        }
    }

    @Override // arc.io.reliable.Consumer
    public synchronized boolean connected() {
        return this._t != null;
    }

    @Override // arc.io.reliable.Transport
    public int id() {
        return this._id;
    }

    @Override // arc.io.reliable.Consumer
    public void setDataAcknowledgeHandler(DataAcknowledgeHandler dataAcknowledgeHandler) {
        this._ack = dataAcknowledgeHandler;
    }

    @Override // arc.io.reliable.Transport
    public void setTransportMonitor(TransportMonitor transportMonitor) {
        this._t.setTransportMonitor(transportMonitor);
    }

    @Override // arc.io.reliable.Consumer
    public long maxSentOffset(long j) {
        if (this._t == null) {
            return -1L;
        }
        return this._t.maxSentOffset(j);
    }

    @Override // arc.io.reliable.Consumer
    public synchronized boolean send(Log log, Data data) throws IOException {
        checkForTransport();
        sent(log, data);
        try {
            if (this._t.send(log, data)) {
                return true;
            }
            unsent(data);
            return false;
        } catch (IOException e) {
            unsent(data);
            throw e;
        }
    }

    private synchronized void sent(Log log, Data data) throws IOException {
        if (this._sent != null) {
            Iterator<Data> it = this._sent.iterator();
            while (it.hasNext()) {
                if (it.next().equals(data)) {
                    return;
                }
            }
            while (this._sent.size() > this._maxOutstandingACKs) {
                if (nextDataOffset(log, true) == -2) {
                    throw new IOException("Transport has been disconnected");
                }
            }
            this._sent.add(data);
        }
    }

    private void unsent(Data data) {
        if (this._sent != null) {
            this._sent.remove(data);
        }
    }

    private void checkForTransport() throws IOException {
        if (this._t == null) {
            throw new IOException("Transport has been disconnected");
        }
    }

    @Override // arc.io.reliable.Consumer
    public synchronized void flush() throws IOException {
        checkForTransport();
        this._t.flush();
    }

    @Override // arc.io.reliable.Consumer
    public synchronized void disconnect(Log log) throws IOException {
        if (this._t != null) {
            if (log != null) {
                log.println("[transport=" + this._id + "]: DISCONNECT");
            }
            this._t.disconnect(log);
            this._t = null;
        }
    }

    @Override // arc.io.reliable.Producer
    public synchronized void reset(Log log) throws IOException {
        if (this._t != null) {
            this._t.reset(log);
        }
    }

    @Override // arc.io.reliable.Producer
    public synchronized long nextDataOffset(Log log, boolean z) throws IOException {
        checkForTransport();
        long nextDataOffset = this._t.nextDataOffset(log, z);
        if (nextDataOffset >= 0 && !ListUtil.isEmpty((List) this._sent)) {
            this._t.messageId();
            Iterator<Data> it = this._sent.iterator();
            while (it.hasNext()) {
                if (it.next().offset() == Long.MAX_VALUE) {
                }
            }
            this._sent.clear();
        }
        return nextDataOffset;
    }

    @Override // arc.io.reliable.Producer
    public synchronized long messageId() throws IOException {
        checkForTransport();
        return this._t.messageId();
    }

    @Override // arc.io.reliable.Producer
    public synchronized int dataLength() throws IOException {
        checkForTransport();
        return this._t.dataLength();
    }

    @Override // arc.io.reliable.Producer
    public synchronized Data nextData(Log log, int i, BufferPool bufferPool, DataDiscardListener dataDiscardListener) throws IOException {
        checkForTransport();
        Data nextData = this._t.nextData(log, i, bufferPool, dataDiscardListener);
        nextData.setTransport(this);
        return nextData;
    }

    @Override // arc.io.reliable.DataAcknowledgeHandler
    public synchronized void acknowledge(long j, long j2, long j3, int i) {
        if (this._maxOutstandingACKs > 0) {
            Iterator<Data> it = this._sent.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Data next = it.next();
                if (next.channel() == j && next.message() == j2 && next.offset() == j3) {
                    it.remove();
                    break;
                }
            }
        }
        if (this._ack != null) {
            this._ack.acknowledge(j, j2, j3, i);
        }
    }

    @Override // arc.io.reliable.Consumer
    public synchronized int numberOutstanding() {
        if (this._sent == null) {
            return 0;
        }
        return this._sent.size();
    }

    @Override // arc.io.reliable.Consumer
    public synchronized List<Data> outstanding() {
        if (ListUtil.isEmpty((List) this._sent)) {
            return null;
        }
        return new ArrayList(this._sent);
    }

    @Override // arc.io.reliable.Consumer
    public synchronized void writeError(Throwable th) {
        if (this._t != null) {
            this._t.writeError(th);
        }
    }

    @Override // arc.io.reliable.Producer
    public void setBufferPool(BufferPool bufferPool) {
        this._t.setBufferPool(bufferPool);
    }

    public String toString() {
        return String.valueOf(this._id);
    }

    @Override // arc.io.reliable.Consumer
    public void waitToDrain(Log log, long j, long j2, long j3) throws IOException {
        if (this._sent.size() > 0) {
            if (log != null) {
                String str = StringUtils.EMPTY;
                Iterator<Data> it = this._sent.iterator();
                while (it.hasNext()) {
                    str = (str + "\n    ") + it.next();
                }
                log.println("[channel=" + j + ", transport=" + this._id + "]: EOD WAIT TO DRAIN for messsage " + j2 + ": outstanding:" + str);
            }
            while (this._sent.size() > 0) {
                this._t.waitForACK(log, j, j2);
            }
        }
    }

    @Override // arc.io.reliable.Consumer
    public void writeEndOfData(Log log, long j, long j2, long j3) throws IOException {
        this._t.writeEndOfData(log, j, j2, j3);
    }

    @Override // arc.io.reliable.Producer
    public long waitForACK(Log log, long j, long j2) throws IOException {
        return this._t.waitForACK(log, j, j2);
    }
}
