/*
 * Decompiled with CFR 0.152.
 */
package com.igormaznitsa.zxpoly.streamer;

import com.igormaznitsa.zxpoly.Version;
import com.igormaznitsa.zxpoly.streamer.AbstractTcpSingleThreadServer;
import com.igormaznitsa.zxpoly.streamer.FemtoHttpServer;
import com.igormaznitsa.zxpoly.streamer.TcpReader;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.logging.Logger;
import org.apache.commons.io.IOUtils;

public class HttpProcessor
implements Version {
    private static final Logger LOGGER = Logger.getLogger(HttpProcessor.class.getName());
    private static final String STREAM_RESOURCE = "stream.ts";
    private static final String WS_STREAM_RESOURCE = "wsstream.ts";
    private final String mime;
    private final InetAddress tcpReaderAddress;
    private final InetAddress httpServerAddress;
    private final int tcpReaderPort;
    private final int httpServerPort;
    private final AtomicReference<FemtoHttpServer> httpServerRef = new AtomicReference();
    private final AtomicReference<TcpReader> tcpReaderRef = new AtomicReference();
    private final Consumer<HttpProcessor> stopConsumer;
    private final ExecutorService executorService = new ThreadPoolExecutor(3, 10, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5), r -> {
        Thread thread = Thread.ofVirtual().name("zxpoly-http-stream-" + System.nanoTime()).unstarted(r);
        thread.setDaemon(true);
        return thread;
    });
    private final List<ThreadDataBuffer> threadDataBuffers = new CopyOnWriteArrayList<ThreadDataBuffer>();
    private volatile boolean stopped;

    public HttpProcessor(String mime, InetAddress addressIn, int portIn, InetAddress addressOut, int portOut, Consumer<HttpProcessor> stopConsumer) {
        this.mime = mime;
        this.tcpReaderAddress = addressIn;
        this.httpServerAddress = addressOut;
        this.tcpReaderPort = portIn;
        this.httpServerPort = portOut;
        this.stopConsumer = stopConsumer;
    }

    public static String makeAcceptKey(String key) {
        try {
            return Base64.getEncoder().encodeToString(MessageDigest.getInstance("SHA-1").digest((key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").getBytes(StandardCharsets.UTF_8)));
        }
        catch (NoSuchAlgorithmException ex) {
            throw new Error("Can't find SHA-1 provider");
        }
    }

    public void start() throws IOException {
        this.startTcpServer();
        this.startHttpServer();
    }

    private void startTcpServer() {
        final TcpReader newReader = new TcpReader("tcp-reader", 65536, 10, InetAddress.getLoopbackAddress(), 0, (source, data) -> {
            this.threadDataBuffers.forEach(b -> b.offer(data));
            return false;
        });
        if (this.tcpReaderRef.compareAndSet(null, newReader)) {
            newReader.addListener(new AbstractTcpSingleThreadServer.TcpServerListener(){
                final /* synthetic */ HttpProcessor this$0;
                {
                    this.this$0 = this$0;
                }

                @Override
                public void onConnectionDone(AbstractTcpSingleThreadServer source, Socket socket) {
                    if (source == newReader) {
                        LOGGER.info("TCP reader connection lost");
                        this.this$0.stop();
                    }
                }
            });
            newReader.start();
        }
    }

    private void startHttpServer() throws IOException {
        FemtoHttpServer server = new FemtoHttpServer(this.executorService, new InetSocketAddress(this.httpServerAddress, this.httpServerPort), 3);
        if (this.httpServerRef.compareAndSet(null, server)) {
            server.createContext("/stream.ts", this::handleStreamData);
            server.createContext("/wsstream.ts", this::handleWsStreamData);
            server.createContext("/", this::handleStaticResource);
            server.start();
        }
    }

    private byte[] readResource(String path) {
        try {
            return IOUtils.resourceToByteArray("/com/igormaznitsa/zxpoly/streamer" + path);
        }
        catch (IOException ex) {
            LOGGER.warning("Can't find resource for path: " + path);
            return null;
        }
    }

    public String getHttpAddress() {
        FemtoHttpServer server = this.httpServerRef.get();
        InetSocketAddress address = server == null ? null : server.getAddress();
        return address == null ? "none" : address.getAddress().getHostAddress() + ":" + address.getPort();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleStaticResource(FemtoHttpServer.HttpExchange exchange) {
        try {
            String linkToVideoStream = "http://" + this.getHttpAddress() + "/stream.ts";
            String linkToWsVideoStream = "ws://" + this.getHttpAddress() + "/wsstream.ts";
            String linkToPlaylist = "http://" + this.getHttpAddress() + "/playlist.m3u8";
            String path = exchange.getRequestURI().getPath();
            LOGGER.info("Incoming request for resource: " + path);
            if ("/".equals(path)) {
                path = "/streamer.html";
            }
            String mime = "text/plain";
            boolean binary = false;
            if (path.endsWith(".css")) {
                mime = "text/css";
            } else if (path.endsWith(".js")) {
                mime = "text/javascript";
            } else if (path.endsWith(".htm") || path.endsWith(".html")) {
                mime = "text/html";
            } else if (path.endsWith(".png")) {
                mime = "image/png";
                binary = true;
            } else if (path.endsWith(".ico")) {
                mime = "image/x-icon";
                binary = true;
            } else if (path.endsWith(".m3u8")) {
                mime = "application/x-mpegURL";
            } else if (path.endsWith(".js.map")) {
                mime = "application/octet-stream";
            }
            exchange.getResponseHeaders().add("Origin", "http://" + this.getHttpAddress());
            exchange.getResponseHeaders().add("Content-Type", mime);
            exchange.getResponseHeaders().add("Access-Control-Allow-Origin ", "*");
            byte[] data = this.readResource(path);
            if (data == null) {
                exchange.sendResponseHeaders(404, -1L);
            } else {
                if (!binary) {
                    String text = new String(data, StandardCharsets.UTF_8).replace("${version.major}", Integer.toString(2)).replace("${version.minor}", Integer.toString(3)).replace("${version.build}", Integer.toString(5)).replace("${video.link}", linkToVideoStream).replace("${wsvideo.link}", linkToWsVideoStream).replace("${playlist.link}", linkToPlaylist).replace("${video.mime}", this.mime);
                    data = text.getBytes(StandardCharsets.UTF_8);
                }
                exchange.sendResponseHeaders(200, data.length);
                OutputStream out = exchange.getResponseBody();
                out.write(data);
            }
        }
        catch (IOException ex) {
            LOGGER.warning("IOException during resource processing: " + ex.getMessage());
        }
        finally {
            exchange.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleWsStreamData(FemtoHttpServer.HttpExchange exchange) {
        ThreadDataBuffer buffer = new ThreadDataBuffer(Thread.currentThread().getName(), 32);
        this.threadDataBuffers.add(buffer);
        try {
            if ("get".equalsIgnoreCase(exchange.getRequestMethod())) {
                Optional<String> connection = exchange.getRequestHeaders().getFirst("Connection");
                Optional<String> upgrade = exchange.getRequestHeaders().getFirst("Upgrade");
                Optional<String> webSocketKey = exchange.getRequestHeaders().getFirst("Sec-WebSocket-Key");
                Optional<String> webSocketProtocol = exchange.getRequestHeaders().getFirst("Sec-WebSocket-Protocol");
                if (!(connection.isPresent() && connection.get().toLowerCase(Locale.ENGLISH).contains("upgrade") && upgrade.isPresent() && upgrade.get().equalsIgnoreCase("WebSocket") && webSocketKey.isPresent())) {
                    exchange.sendResponseHeaders(400, -1L);
                } else {
                    exchange.getResponseHeaders().add("Connection", "Upgrade");
                    exchange.getResponseHeaders().add("Upgrade", "websocket");
                    exchange.getResponseHeaders().add("Sec-WebSocket-Version", 13L);
                    exchange.getResponseHeaders().add("Origin", "http://" + this.getHttpAddress());
                    exchange.getResponseHeaders().add("Access-Control-Allow-Origin ", "*");
                    exchange.getResponseHeaders().add("Sec-WebSocket-Accept", HttpProcessor.makeAcceptKey(webSocketKey.get()));
                    webSocketProtocol.ifPresent(x -> exchange.getResponseHeaders().add("Sec-WebSocket-Protocol", (String)x));
                    exchange.sendResponseHeaders(101, -1L);
                    InputStream inputStream = exchange.getRequestBody();
                    OutputStream outputStream = exchange.getResponseBody();
                    final AtomicBoolean wsChannelActive = new AtomicBoolean(true);
                    WebSocketStreamWrapper wrapper = new WebSocketStreamWrapper(new WebSocketStreamWrapper.WsSignalReceiver(){

                        @Override
                        public void onClose(WebSocketStreamWrapper source, byte[] data) {
                            wsChannelActive.set(false);
                        }
                    }, inputStream, outputStream);
                    wrapper.start();
                    while (!Thread.currentThread().isInterrupted() && wsChannelActive.get()) {
                        byte[] data = buffer.poll();
                        if (data == null) continue;
                        wrapper.writeBinary(false, data);
                    }
                }
            } else {
                exchange.sendResponseHeaders(405, -1L);
            }
        }
        catch (IOException ex) {
            LOGGER.warning("IOException during WebSocket stream: " + ex.getMessage());
        }
        finally {
            LOGGER.info("Streaming thread completed");
            this.threadDataBuffers.remove(buffer);
            exchange.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleStreamData(FemtoHttpServer.HttpExchange exchange) {
        if ("head".equalsIgnoreCase(exchange.getRequestMethod())) {
            LOGGER.info("Incoming HEAD request for video stream: " + String.valueOf(exchange.getRequestURI()));
            FemtoHttpServer.Headers headers = exchange.getResponseHeaders();
            headers.add("Content-Type", this.mime);
            headers.add("Cache-Control", "no-cache, no-store");
            headers.add("Pragma", "no-cache");
            headers.add("Expires", "0");
            headers.add("Connection", "Keep-Alive");
            headers.add("Keep-Alive", "max");
            headers.add("Accept-Ranges", "none");
            try {
                exchange.sendResponseHeaders(200, -1L);
            }
            catch (IOException ex) {
                exchange.close();
            }
        } else if ("get".equalsIgnoreCase(exchange.getRequestMethod())) {
            ThreadDataBuffer buffer = new ThreadDataBuffer(Thread.currentThread().getName(), 32);
            this.threadDataBuffers.add(buffer);
            try {
                LOGGER.info("Incoming GET request for video stream: " + String.valueOf(exchange.getRequestURI()));
                exchange.getResponseHeaders().add("Content-Type", this.mime);
                exchange.getResponseHeaders().add("Cache-Control", "no-cache, no-store");
                exchange.getResponseHeaders().add("Pragma", "no-cache");
                exchange.getResponseHeaders().add("Expires", "0");
                exchange.getResponseHeaders().add("Connection", "Keep-Alive");
                exchange.getResponseHeaders().add("Keep-Alive", "max");
                exchange.getResponseHeaders().add("Accept-Ranges", "none");
                exchange.getResponseHeaders().add("Origin", "http://" + this.getHttpAddress());
                exchange.getResponseHeaders().add("Content-Type", this.mime);
                exchange.getResponseHeaders().add("Access-Control-Allow-Origin ", "*");
                exchange.sendResponseHeaders(200, Long.MAX_VALUE);
                try (OutputStream responseStream = exchange.getResponseBody();){
                    while (!this.stopped && !Thread.currentThread().isInterrupted()) {
                        byte[] data = buffer.poll();
                        if (data == null) continue;
                        responseStream.write(data);
                    }
                }
            }
            catch (Exception ex) {
                LOGGER.warning("Exception during streaming: " + ex.getMessage());
            }
            finally {
                LOGGER.info("Streaming thread completed");
                this.threadDataBuffers.remove(buffer);
                exchange.close();
            }
        } else {
            LOGGER.warning("Incoming unsupported " + exchange.getRequestMethod() + " request for video stream: " + String.valueOf(exchange.getRequestURI()));
            try {
                exchange.sendResponseHeaders(405, -1L);
            }
            catch (Exception exception) {
            }
            finally {
                exchange.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        this.stopped = true;
        FemtoHttpServer server = this.httpServerRef.getAndSet(null);
        if (server != null) {
            TcpReader reader = this.tcpReaderRef.getAndSet(null);
            if (reader != null) {
                reader.stop();
            }
            try {
                LOGGER.info("Stopping server");
                server.stop();
            }
            catch (Exception ex) {
                LOGGER.warning("Error on server stop: " + ex.getMessage());
            }
            finally {
                this.executorService.shutdownNow();
            }
            if (this.stopConsumer != null) {
                this.stopConsumer.accept(this);
            }
        }
    }

    public String getTcpAddress() {
        TcpReader reader = this.tcpReaderRef.get();
        return reader == null ? "none" : reader.getServerAddress();
    }

    private static final class ThreadDataBuffer {
        private final BlockingQueue<byte[]> buffer;
        private final String id;

        ThreadDataBuffer(String id, int bufferSize) {
            this.buffer = new ArrayBlockingQueue<byte[]>(bufferSize);
            this.id = id;
        }

        public int hashCode() {
            return this.id.hashCode();
        }

        public boolean equals(Object obj) {
            return obj instanceof ThreadDataBuffer && Objects.equals(this.id, ((ThreadDataBuffer)obj).id);
        }

        byte[] poll() {
            return (byte[])this.buffer.poll();
        }

        boolean offer(byte[] data) {
            if (data == null) {
                return false;
            }
            return this.buffer.offer(data);
        }
    }

    public static class WebSocketStreamWrapper {
        private static final byte[] EMPTY_ARRAY = new byte[0];
        private final Random rnd = new Random();
        private final AtomicReference<Thread> thread = new AtomicReference();
        private final InputStream inputStream;
        private final OutputStream outputStream;
        private final WsSignalReceiver receiver;
        private final Lock writeLock = new ReentrantLock();

        public WebSocketStreamWrapper(WsSignalReceiver receiver, InputStream inputStream, OutputStream outputStream) {
            this.receiver = Objects.requireNonNull(receiver);
            this.inputStream = inputStream;
            this.outputStream = outputStream;
        }

        static void writeWebSocketFrame(OutputStream stream, int opCode, OptionalInt frameMask, byte[] data) throws IOException {
            int j;
            int i;
            byte[] mask;
            int maskBit;
            stream.write(0x80 | opCode & 0xF);
            if (frameMask.isPresent()) {
                maskBit = 128;
                mask = new byte[4];
                int value = frameMask.getAsInt();
                mask[0] = (byte)(value >> 24);
                mask[1] = (byte)(value >> 16);
                mask[2] = (byte)(value >> 8);
                mask[3] = (byte)value;
            } else {
                maskBit = 0;
                mask = null;
            }
            if (data.length < 126) {
                stream.write(maskBit | data.length);
            } else if (data.length < 65536) {
                stream.write(maskBit | 0x7E);
                WebSocketStreamWrapper.writePayloadLength(stream, data.length);
            } else {
                stream.write(maskBit | 0x7F);
                WebSocketStreamWrapper.writePayloadLength(stream, data.length);
            }
            if (mask != null) {
                stream.write(mask);
                for (i = 0; i < data.length; ++i) {
                    j = i % 4;
                    data[i] = (byte)(data[i] ^ mask[j]);
                }
            }
            stream.write(data);
            if (mask != null) {
                for (i = 0; i < data.length; ++i) {
                    j = i % 4;
                    data[i] = (byte)(data[i] ^ mask[j]);
                }
            }
            stream.flush();
        }

        private static long readPayloadLengthBytes(InputStream in, int bytes) throws IOException {
            long result = 0L;
            int count = bytes;
            while (--count > 0) {
                int next = in.read();
                if (next < 0) {
                    throw new EOFException();
                }
                result |= (long)next << 8 * count;
            }
            return result;
        }

        private static void writePayloadLength(OutputStream out, long length) throws IOException {
            if (length < 65536L) {
                out.write((byte)(length >>> 8));
                out.write((byte)length);
            } else {
                out.write((byte)(length >>> 56));
                out.write((byte)(length >>> 48));
                out.write((byte)(length >>> 40));
                out.write((byte)(length >>> 32));
                out.write((byte)(length >>> 24));
                out.write((byte)(length >>> 16));
                out.write((byte)(length >>> 8));
                out.write((byte)length);
            }
        }

        public void start() {
            Thread newThread = Thread.ofVirtual().name("ws-wrapper-read-thread-" + System.nanoTime()).unstarted(this::readRun);
            if (!this.thread.compareAndSet(null, newThread)) {
                throw new IllegalStateException("Already started");
            }
            newThread.start();
        }

        private void assertStarted() {
            if (this.thread.get() == null) {
                throw new IllegalStateException("Not started");
            }
        }

        private OptionalInt generateMask() {
            int result = 0;
            while ((result = this.rnd.nextInt()) == 0) {
            }
            return OptionalInt.of(result);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void writeFrame(int opCode, boolean masked, byte[] data) throws IOException {
            this.assertStarted();
            this.writeLock.lock();
            try {
                WebSocketStreamWrapper.writeWebSocketFrame(this.outputStream, opCode, masked ? this.generateMask() : OptionalInt.empty(), data);
            }
            finally {
                this.writeLock.unlock();
            }
        }

        public void writeText(boolean masked, String text) throws IOException {
            this.writeFrame(1, masked, text.getBytes(StandardCharsets.UTF_8));
        }

        public void writeBinary(boolean masked, byte[] data) throws IOException {
            this.writeFrame(2, masked, data);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void readRun() {
            this.receiver.onStart(this);
            try {
                block5: while (this.thread.get() != null && !Thread.currentThread().isInterrupted()) {
                    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
                    int opcode = 0;
                    while (this.thread.get() != null && !Thread.currentThread().isInterrupted()) {
                        int maskOpcode = this.inputStream.read();
                        if (maskOpcode >= 0) {
                            boolean fin = (maskOpcode & 0x80) != 0;
                            opcode |= maskOpcode & 0xF;
                            int maskPayloadLen = this.inputStream.read();
                            if (maskPayloadLen < 0) {
                                throw new EOFException();
                            }
                            boolean mask = (maskPayloadLen & 0x80) != 0;
                            long payloadLen = maskPayloadLen & 0x7F;
                            if (payloadLen == 126L) {
                                payloadLen = WebSocketStreamWrapper.readPayloadLengthBytes(this.inputStream, 2);
                            } else if (payloadLen == 127L) {
                                payloadLen = WebSocketStreamWrapper.readPayloadLengthBytes(this.inputStream, 8);
                            }
                            byte[] maskValue = mask ? IOUtils.readFully(this.inputStream, 4) : EMPTY_ARRAY;
                            byte[] payload = IOUtils.readFully(this.inputStream, (int)payloadLen);
                            if (mask) {
                                for (int i = 0; i < payload.length; ++i) {
                                    int j = i % 4;
                                    payload[i] = (byte)(payload[i] ^ maskValue[j]);
                                }
                            }
                            buffer.write(payload);
                            if (!fin) continue;
                            this.processIncomingPacket(opcode, buffer.toByteArray());
                            continue block5;
                        }
                        throw new EOFException("WebSocket input stream is closed");
                    }
                }
            }
            catch (Exception ex) {
                LOGGER.warning("Web socket read error: " + ex.getMessage());
            }
            finally {
                this.receiver.onStop(this);
            }
        }

        private void processIncomingPacket(int opCode, byte[] data) throws IOException {
            switch (opCode) {
                case 0: {
                    this.receiver.onUnexpected(this, opCode, data);
                    break;
                }
                case 1: {
                    this.receiver.onText(this, new String(data, StandardCharsets.UTF_8));
                    break;
                }
                case 2: {
                    this.receiver.onBinary(this, data);
                    break;
                }
                case 8: {
                    this.receiver.onClose(this, data);
                    break;
                }
                case 9: {
                    this.writeFrame(10, false, EMPTY_ARRAY);
                    break;
                }
                case 10: {
                    break;
                }
                default: {
                    this.receiver.onReserved(this, opCode, data);
                }
            }
        }

        public void close(boolean closeStreams) {
            Thread startedThread = this.thread.getAndSet(null);
            if (startedThread != null) {
                IOUtils.closeQuietly(this.inputStream);
                IOUtils.closeQuietly(this.outputStream);
                try {
                    startedThread.interrupt();
                    startedThread.join();
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        public static interface WsSignalReceiver {
            default public void onText(WebSocketStreamWrapper source, String text) {
            }

            default public void onBinary(WebSocketStreamWrapper source, byte[] data) {
            }

            default public void onClose(WebSocketStreamWrapper source, byte[] data) {
            }

            default public void onUnexpected(WebSocketStreamWrapper source, int code, byte[] data) {
            }

            default public void onReserved(WebSocketStreamWrapper source, int code, byte[] data) {
            }

            default public void onStart(WebSocketStreamWrapper source) {
            }

            default public void onStop(WebSocketStreamWrapper source) {
            }
        }
    }
}

