# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright(c)2013 NTT corp. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

""" Unit tests for websockifyserver """
import errno
import os
import socket
import ssl
from unittest.mock import patch
import sys
import tempfile
import unittest
from io import StringIO
from io import BytesIO

from websockify import websockifyserver


def raise_oserror(*args, **kwargs):
    raise OSError('fake error')


class FakeSocket:
    def __init__(self, data=b''):
        self._data = data

    def recv(self, amt, flags=None):
        res = self._data[0:amt]
        if not (flags & socket.MSG_PEEK):
            self._data = self._data[amt:]

        return res

    def makefile(self, mode='r', buffsize=None):
        if 'b' in mode:
            return BytesIO(self._data)
        else:
            return StringIO(self._data.decode('latin_1'))


class WebSockifyRequestHandlerTestCase(unittest.TestCase):
    def setUp(self):
        super().setUp()
        self.tmpdir = tempfile.mkdtemp('-websockify-tests')
        # Mock this out cause it screws tests up
        patch('os.chdir').start()

    def tearDown(self):
        """Called automatically after each test."""
        patch.stopall()
        os.rmdir(self.tmpdir)
        super().tearDown()

    def _get_server(self, handler_class=websockifyserver.WebSockifyRequestHandler,
                    **kwargs):
        web = kwargs.pop('web', self.tmpdir)
        return websockifyserver.WebSockifyServer(
            handler_class, listen_host='localhost',
            listen_port=80, key=self.tmpdir, web=web,
            record=self.tmpdir, daemon=False, ssl_only=0, idle_timeout=1,
            **kwargs)

    @patch('websockify.websockifyserver.WebSockifyRequestHandler.send_error')
    def test_normal_get_with_only_upgrade_returns_error(self, send_error):
        server = self._get_server(web=None)
        handler = websockifyserver.WebSockifyRequestHandler(
            FakeSocket(b'GET /tmp.txt HTTP/1.1'), '127.0.0.1', server)

        handler.do_GET()
        send_error.assert_called_with(405)

    @patch('websockify.websockifyserver.WebSockifyRequestHandler.send_error')
    def test_head_with_only_upgrade_returns_error(self, send_error):
        server = self._get_server(web=None)
        handler = websockifyserver.WebSockifyRequestHandler(
            FakeSocket(b'HEAD /tmp.txt HTTP/1.1'), '127.0.0.1', server)

        handler.do_HEAD()
        send_error.assert_called_with(405)

    @patch('websockify.websockifyserver.WebSockifyRequestHandler.send_error')
    def test_post_returns_error(self, send_error):
        server = self._get_server(web=None)
        handler = websockifyserver.WebSockifyRequestHandler(
            FakeSocket(b'POST / HTTP/1.1'), '127.0.0.1', server)

        handler.do_POST()
        send_error.assert_called_with(405)

    @patch('websockify.websockifyserver.WebSockifyRequestHandler.send_error')
    def test_put_returns_error(self, send_error):
        server = self._get_server(web=None)
        handler = websockifyserver.WebSockifyRequestHandler(
            FakeSocket(b'PUT / HTTP/1.1'), '127.0.0.1', server)

        handler.do_PUT()
        send_error.assert_called_with(405)

    @patch('websockify.websockifyserver.WebSockifyRequestHandler.send_error')
    def test_patch_returns_error(self, send_error):
        server = self._get_server(web=None)
        handler = websockifyserver.WebSockifyRequestHandler(
            FakeSocket(b'PATCH / HTTP/1.1'), '127.0.0.1', server)

        handler.do_PATCH()
        send_error.assert_called_with(405)

    @patch('websockify.websockifyserver.WebSockifyRequestHandler.send_error')
    def test_delete_returns_error(self, send_error):
        server = self._get_server(web=None)
        handler = websockifyserver.WebSockifyRequestHandler(
            FakeSocket(b'DELETE / HTTP/1.1'), '127.0.0.1', server)

        handler.do_DELETE()
        send_error.assert_called_with(405)

    @patch('websockify.websockifyserver.WebSockifyRequestHandler.send_error')
    def test_options_returns_error(self, send_error):
        server = self._get_server(web=None)
        handler = websockifyserver.WebSockifyRequestHandler(
            FakeSocket(b'OPTIONS / HTTP/1.1'), '127.0.0.1', server)

        handler.do_OPTIONS()
        send_error.assert_called_with(405)

    @patch('websockify.websockifyserver.WebSockifyRequestHandler.send_error')
    def test_list_dir_with_file_only_returns_error(self, send_error):
        server = self._get_server(file_only=True)
        handler = websockifyserver.WebSockifyRequestHandler(
            FakeSocket(b'GET / HTTP/1.1'), '127.0.0.1', server)

        handler.path = '/'
        handler.do_GET()
        send_error.assert_called_with(404)


class WebSockifyServerTestCase(unittest.TestCase):
    def setUp(self):
        super().setUp()
        self.tmpdir = tempfile.mkdtemp('-websockify-tests')
        # Mock this out cause it screws tests up
        patch('os.chdir').start()

    def tearDown(self):
        """Called automatically after each test."""
        patch.stopall()
        os.rmdir(self.tmpdir)
        super().tearDown()

    def _get_server(self, handler_class=websockifyserver.WebSockifyRequestHandler,
                    **kwargs):
        return websockifyserver.WebSockifyServer(
            handler_class, listen_host='localhost',
            listen_port=80, key=self.tmpdir, web=self.tmpdir,
            record=self.tmpdir, **kwargs)

    def test_daemonize_raises_error_while_closing_fds(self):
        server = self._get_server(daemon=True, ssl_only=1, idle_timeout=1)
        patch('os.fork').start().return_value = 0
        patch('signal.signal').start()
        patch('os.setsid').start()
        patch('os.close').start().side_effect = raise_oserror
        self.assertRaises(OSError, server.daemonize, keepfd=None, chdir='./')

    def test_daemonize_ignores_ebadf_error_while_closing_fds(self):
        def raise_oserror_ebadf(fd):
            raise OSError(errno.EBADF, 'fake error')

        server = self._get_server(daemon=True, ssl_only=1, idle_timeout=1)
        patch('os.fork').start().return_value = 0
        patch('signal.signal').start()
        patch('os.setsid').start()
        patch('os.close').start().side_effect = raise_oserror_ebadf
        patch('os.open').start().side_effect = raise_oserror
        self.assertRaises(OSError, server.daemonize, keepfd=None, chdir='./')

    def test_handshake_fails_on_not_ready(self):
        server = self._get_server(daemon=True, ssl_only=0, idle_timeout=1)

        def fake_select(rlist, wlist, xlist, timeout=None):
            return ([], [], [])

        patch('select.select').start().side_effect = fake_select
        self.assertRaises(
            websockifyserver.WebSockifyServer.EClose, server.do_handshake,
            FakeSocket(), '127.0.0.1')

    def test_empty_handshake_fails(self):
        server = self._get_server(daemon=True, ssl_only=0, idle_timeout=1)

        sock = FakeSocket('')

        def fake_select(rlist, wlist, xlist, timeout=None):
            return ([sock], [], [])

        patch('select.select').start().side_effect = fake_select
        self.assertRaises(
            websockifyserver.WebSockifyServer.EClose, server.do_handshake,
            sock, '127.0.0.1')

    def test_handshake_policy_request(self):
        # TODO(directxman12): implement
        pass

    def test_handshake_ssl_only_without_ssl_raises_error(self):
        server = self._get_server(daemon=True, ssl_only=1, idle_timeout=1)

        sock = FakeSocket(b'some initial data')

        def fake_select(rlist, wlist, xlist, timeout=None):
            return ([sock], [], [])

        patch('select.select').start().side_effect = fake_select
        self.assertRaises(
            websockifyserver.WebSockifyServer.EClose, server.do_handshake,
            sock, '127.0.0.1')

    def test_do_handshake_no_ssl(self):
        class FakeHandler:
            CALLED = False

            def __init__(self, *args, **kwargs):
                type(self).CALLED = True

        FakeHandler.CALLED = False

        server = self._get_server(
            handler_class=FakeHandler, daemon=True,
            ssl_only=0, idle_timeout=1)

        sock = FakeSocket(b'some initial data')

        def fake_select(rlist, wlist, xlist, timeout=None):
            return ([sock], [], [])

        patch('select.select').start().side_effect = fake_select
        self.assertEqual(server.do_handshake(sock, '127.0.0.1'), sock)
        self.assertTrue(FakeHandler.CALLED, True)

    def test_do_handshake_ssl(self):
        # TODO(directxman12): implement this
        pass

    def test_do_handshake_ssl_without_ssl_raises_error(self):
        # TODO(directxman12): implement this
        pass

    def test_do_handshake_ssl_without_cert_raises_error(self):
        server = self._get_server(daemon=True, ssl_only=0, idle_timeout=1,
                                  cert='afdsfasdafdsafdsafdsafdas')

        sock = FakeSocket(b"\x16some ssl data")

        def fake_select(rlist, wlist, xlist, timeout=None):
            return ([sock], [], [])

        patch('select.select').start().side_effect = fake_select
        self.assertRaises(
            websockifyserver.WebSockifyServer.EClose, server.do_handshake,
            sock, '127.0.0.1')

    def test_do_handshake_ssl_error_eof_raises_close_error(self):
        server = self._get_server(daemon=True, ssl_only=0, idle_timeout=1)

        sock = FakeSocket(b"\x16some ssl data")

        def fake_select(rlist, wlist, xlist, timeout=None):
            return ([sock], [], [])

        def fake_wrap_socket(*args, **kwargs):
            raise ssl.SSLError(ssl.SSL_ERROR_EOF)

        class fake_create_default_context():
            def __init__(self, purpose):
                self.verify_mode = None
                self.options = 0

            def load_cert_chain(self, certfile, keyfile, password):
                pass

            def set_default_verify_paths(self):
                pass

            def load_verify_locations(self, cafile):
                pass

            def wrap_socket(self, *args, **kwargs):
                raise ssl.SSLError(ssl.SSL_ERROR_EOF)

        patch('select.select').start().side_effect = fake_select
        patch('ssl.create_default_context').start().side_effect = fake_create_default_context
        self.assertRaises(
            websockifyserver.WebSockifyServer.EClose, server.do_handshake,
            sock, '127.0.0.1')

    def test_do_handshake_ssl_sets_ciphers(self):
        test_ciphers = 'TEST-CIPHERS-1:TEST-CIPHER-2'

        class FakeHandler:
            def __init__(self, *args, **kwargs):
                pass

        server = self._get_server(handler_class=FakeHandler, daemon=True,
                                  idle_timeout=1, ssl_ciphers=test_ciphers)
        sock = FakeSocket(b"\x16some ssl data")

        def fake_select(rlist, wlist, xlist, timeout=None):
            return ([sock], [], [])

        class fake_create_default_context():
            CIPHERS = ''

            def __init__(self, purpose):
                self.verify_mode = None
                self.options = 0

            def load_cert_chain(self, certfile, keyfile, password):
                pass

            def set_default_verify_paths(self):
                pass

            def load_verify_locations(self, cafile):
                pass

            def wrap_socket(self, *args, **kwargs):
                pass

            def set_ciphers(self, ciphers_to_set):
                fake_create_default_context.CIPHERS = ciphers_to_set

        patch('select.select').start().side_effect = fake_select
        patch('ssl.create_default_context').start().side_effect = fake_create_default_context
        server.do_handshake(sock, '127.0.0.1')
        self.assertEqual(fake_create_default_context.CIPHERS, test_ciphers)

    def test_do_handshake_ssl_sets_opions(self):
        test_options = 0xCAFEBEEF

        class FakeHandler:
            def __init__(self, *args, **kwargs):
                pass

        server = self._get_server(handler_class=FakeHandler, daemon=True,
                                  idle_timeout=1, ssl_options=test_options)
        sock = FakeSocket(b"\x16some ssl data")

        def fake_select(rlist, wlist, xlist, timeout=None):
            return ([sock], [], [])

        class fake_create_default_context:
            OPTIONS = 0

            def __init__(self, purpose):
                self.verify_mode = None
                self._options = 0

            def load_cert_chain(self, certfile, keyfile, password):
                pass

            def set_default_verify_paths(self):
                pass

            def load_verify_locations(self, cafile):
                pass

            def wrap_socket(self, *args, **kwargs):
                pass

            def get_options(self):
                return self._options

            def set_options(self, val):
                fake_create_default_context.OPTIONS = val
            options = property(get_options, set_options)

        patch('select.select').start().side_effect = fake_select
        patch('ssl.create_default_context').start().side_effect = fake_create_default_context
        server.do_handshake(sock, '127.0.0.1')
        self.assertEqual(fake_create_default_context.OPTIONS, test_options)

    def test_fallback_sigchld_handler(self):
        # TODO(directxman12): implement this
        pass

    def test_start_server_error(self):
        server = self._get_server(daemon=False, ssl_only=1, idle_timeout=1)

        def fake_select(rlist, wlist, xlist, timeout=None):
            raise Exception("fake error")

        patch('websockify.websockifyserver.WebSockifyServer.socket').start()
        patch('websockify.websockifyserver.WebSockifyServer.daemonize').start()
        patch('select.select').start().side_effect = fake_select
        server.start_server()

    def test_start_server_keyboardinterrupt(self):
        server = self._get_server(daemon=False, ssl_only=0, idle_timeout=1)

        def fake_select(rlist, wlist, xlist, timeout=None):
            raise KeyboardInterrupt

        patch('websockify.websockifyserver.WebSockifyServer.socket').start()
        patch('websockify.websockifyserver.WebSockifyServer.daemonize').start()
        patch('select.select').start().side_effect = fake_select
        server.start_server()

    def test_start_server_systemexit(self):
        server = self._get_server(daemon=False, ssl_only=0, idle_timeout=1)

        def fake_select(rlist, wlist, xlist, timeout=None):
            sys.exit()

        patch('websockify.websockifyserver.WebSockifyServer.socket').start()
        patch('websockify.websockifyserver.WebSockifyServer.daemonize').start()
        patch('select.select').start().side_effect = fake_select
        server.start_server()

    def test_socket_set_keepalive_options(self):
        keepcnt = 12
        keepidle = 34
        keepintvl = 56

        server = self._get_server(daemon=False, ssl_only=0, idle_timeout=1)
        sock = server.socket('localhost',
                             tcp_keepcnt=keepcnt,
                             tcp_keepidle=keepidle,
                             tcp_keepintvl=keepintvl)

        if hasattr(socket, 'TCP_KEEPCNT'):
            self.assertEqual(sock.getsockopt(socket.SOL_TCP,
                                             socket.TCP_KEEPCNT), keepcnt)
        self.assertEqual(sock.getsockopt(socket.SOL_TCP,
                                         socket.TCP_KEEPIDLE), keepidle)
        self.assertEqual(sock.getsockopt(socket.SOL_TCP,
                                         socket.TCP_KEEPINTVL), keepintvl)

        sock = server.socket('localhost',
                             tcp_keepalive=False,
                             tcp_keepcnt=keepcnt,
                             tcp_keepidle=keepidle,
                             tcp_keepintvl=keepintvl)

        if hasattr(socket, 'TCP_KEEPCNT'):
            self.assertNotEqual(sock.getsockopt(socket.SOL_TCP,
                                                socket.TCP_KEEPCNT), keepcnt)
        self.assertNotEqual(sock.getsockopt(socket.SOL_TCP,
                                            socket.TCP_KEEPIDLE), keepidle)
        self.assertNotEqual(sock.getsockopt(socket.SOL_TCP,
                                            socket.TCP_KEEPINTVL), keepintvl)
