Blob Blame History Raw
#
# Copyright (c) 2020 Red Hat, Inc.
#
# This file is part of nmstate
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 2.1 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#

from contextlib import contextmanager
import errno
import logging
import os

import libnmstate
import libnmstate.error as libnmError

try:
    import varlink
except ModuleNotFoundError:
    raise libnmError.NmstateDependencyError("python3 varlink module not found")


class NmstateVarlinkLogHandler(logging.Handler):
    def __init__(self):
        self._log_records = list()
        super().__init__()

    def filter(self, record):
        return True

    def emit(self, record):
        self._log_records.append(record)

    @property
    def logs(self):
        """
        Return a list of dict, example:
            [
                {
                    "time": "2003-07-08 16:49:45,896",
                    "level": "DEBUG",
                    "message": "foo is changed",
                }
            ]
        """
        return [
            {
                "time": record.asctime,
                "level": record.levelname,
                "message": record.message,
            }
            for record in self._log_records
        ]


@contextmanager
def nmstate_varlink_logger():
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    handler = NmstateVarlinkLogHandler()
    handler.setLevel(logging.DEBUG)
    logger.addHandler(handler)
    try:
        yield handler
    finally:
        logger.removeHandler(handler)


def validate_method_arguments(user_args, method_args):
    """
    Returns dictionary with validated arguments values.
    """
    kwargs = {}
    for arg in user_args.keys():
        if arg not in method_args or user_args[arg] == {}:
            raise varlink.InvalidParameter(arg)
        if user_args[arg] is not None:
            kwargs[arg] = user_args[arg]
    return kwargs


def gen_varlink_server(address):
    """
    Returns varlink server object
    Checks if the varlink address already in use.
    """
    address = "unix:" + address
    server = varlink.ThreadingServer(
        address, ServiceRequestHandler, bind_and_activate=False
    )
    server.allow_reuse_address = True
    try:
        server.server_bind()
        server.server_activate()
    except OSError as exception:
        server.shutdown()
        server.server_close()
        if exception.errno == errno.EADDRINUSE:
            server = varlink.ThreadingServer(
                address, ServiceRequestHandler, bind_and_activate=True
            )
        else:
            logging.error(exception.strerror)
            raise exception
    return server


def start_varlink_server(address):
    """
    Runs the varlink server in the specified the file path
    """
    varlink_server = gen_varlink_server(address)
    try:
        with varlink_server as server:
            server.serve_forever()
    except Exception as exception:
        logging.error(str(exception))
        varlink_server.shutdown()
    finally:
        varlink_server.server_close()


class NmstateError(varlink.VarlinkError):
    def __init__(self, message, logs):
        varlink.VarlinkError.__init__(
            self,
            {
                "error": self.__class__.__name__,
                "parameters": {
                    "error_message": message,
                    "log": logs,
                },
            },
        )


class NmstateValueError(NmstateError):
    pass


class NmstatePermissionError(NmstateError):
    pass


class NmstateConflictError(NmstateError):
    pass


class NmstateLibnmError(NmstateError):
    pass


class NmstateVerificationError(NmstateError):
    pass


class NmstateNotImplementedError(NmstateError):
    pass


class NmstateInternalError(NmstateError):
    pass


class NmstateDependencyError(NmstateError):
    pass


class ServiceRequestHandler(varlink.RequestHandler):
    service = varlink.Service(
        vendor="Red Hat",
        product="Nmstate",
        version=libnmstate.__version__,
        url="https://www.nmstate.io",
        interface_dir=os.path.dirname(__file__),
        namespaced=False,
    )


@ServiceRequestHandler.service.interface("io.nmstate")
class NmstateVarlinkService:
    def Show(self, arguments):
        """
        Reports the state data on the system
        """
        with nmstate_varlink_logger() as log_handler:
            method_args = ["include_status_data"]
            show_kwargs = validate_method_arguments(arguments, method_args)
            try:
                configured_state = libnmstate.show(**show_kwargs)
                return {"state": configured_state, "log": log_handler.logs}
            except libnmstate.error.NmstateValueError as exception:
                logging.error(str(exception))
                raise NmstateValueError(str(exception), log_handler.logs)

    def ShowRunningConfig(self, arguments):
        with nmstate_varlink_logger() as log_handler:
            method_args = []
            validate_method_arguments(arguments, method_args)
            try:
                configured_state = libnmstate.show_running_config()
                return {"state": configured_state, "log": log_handler.logs}
            except libnmstate.error.NmstateValueError as exception:
                logging.error(str(exception))
                raise NmstateValueError(str(exception), log_handler.logs)

    def Apply(self, arguments):
        """
        Apply desired state declared in json format
        which is parsed as dictionary
        """
        with nmstate_varlink_logger() as log_handler:
            method_args = [
                "desired_state",
                "verify_change",
                "commit",
                "rollback_timeout",
                "save_to_disk",
            ]
            apply_kwargs = validate_method_arguments(arguments, method_args)
            if "desired_state" not in apply_kwargs.keys():
                logging.error("Desired_state not specified")
                raise NmstateValueError(
                    "desired_state: No state specified", log_handler.logs
                )
            try:
                libnmstate.apply(**apply_kwargs)
                return {"log": log_handler.logs}
            except TypeError as exception:
                logging.error(str(exception), log_handler.logs)
                raise varlink.InvalidParameter(exception)
            except libnmstate.error.NmstatePermissionError as exception:
                logging.error(str(exception))
                raise NmstatePermissionError(str(exception), log_handler.logs)
            except libnmstate.error.NmstateValueError as exception:
                logging.error(str(exception))
                raise NmstateValueError(str(exception), log_handler.logs)
            except libnmstate.error.NmstateConflictError as exception:
                logging.error(str(exception))
                raise NmstateConflictError(str(exception), log_handler.logs)
            except libnmstate.error.NmstateVerificationError as exception:
                logging.error(str(exception))
                raise NmstateVerificationError(
                    str(exception), log_handler.logs
                )

    def Commit(self, arguments):
        """
        Commits the checkpoint
        """
        with nmstate_varlink_logger() as log_handler:
            method_args = ["checkpoint"]
            commit_kwargs = validate_method_arguments(arguments, method_args)
            try:
                libnmstate.commit(**commit_kwargs)
                return {"log": log_handler.logs}
            except libnmstate.error.NmstateValueError as exception:
                logging.error(str(exception))
                raise NmstateValueError(str(exception), log_handler.logs)

    def Rollback(self, arguments):
        """
        Roll back to the checkpoint
        """
        with nmstate_varlink_logger() as log_handler:
            method_args = ["checkpoint"]
            rollback_kwargs = validate_method_arguments(arguments, method_args)
            try:
                libnmstate.rollback(**rollback_kwargs)
                return {"log": log_handler.logs}
            except libnmstate.error.NmstateValueError as exception:
                logging.error(str(exception))
                raise NmstateValueError(str(exception), log_handler.logs)