/*
 * Decompiled with CFR 0.152.
 */
package com.igormaznitsa.zxpoly.streamer;

import com.igormaznitsa.zxpoly.streamer.AbstractTcpSingleThreadServer;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Arrays;

public class TcpReader
extends AbstractTcpSingleThreadServer {
    private final int maxChunkSize;
    private final TcpReaderDataProcessor[] processors;

    public TcpReader(String id, int maxChunkSize, int bufferSize, InetAddress address, int port, TcpReaderDataProcessor ... processors) {
        super(id, bufferSize, address, port);
        this.processors = (TcpReaderDataProcessor[])processors.clone();
        this.maxChunkSize = maxChunkSize;
    }

    public void write(byte[] data) {
        this.buffer.offer(data);
    }

    @Override
    protected void doBusiness(Socket socket) throws Exception {
        byte[] chunk = new byte[this.maxChunkSize];
        InputStream inputStream = socket.getInputStream();
        while (!this.isStopped() && !Thread.currentThread().isInterrupted()) {
            int available = Math.max(256, Math.min(this.maxChunkSize, inputStream.available()));
            int read = inputStream.read(chunk, 0, available);
            if (read < 0) {
                throw new IOException("input stream is closed");
            }
            if (read <= 0) continue;
            byte[] data = Arrays.copyOfRange(chunk, 0, read);
            boolean placeIntoBuffer = true;
            for (TcpReaderDataProcessor l : this.processors) {
                placeIntoBuffer &= l.onIncomingData(this, data);
            }
            if (!placeIntoBuffer) continue;
            this.buffer.put(data);
        }
    }

    public byte[] read() {
        return (byte[])this.buffer.poll();
    }

    public static interface TcpReaderDataProcessor {
        public boolean onIncomingData(TcpReader var1, byte[] var2);
    }
}

