Blob Blame History Raw
#!/usr/libexec/platform-python
# Copyright 2012,2018 Red Hat Inc.
# Author: Kushal Das <kdas@redhat.com>

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.  See
# http://www.gnu.org/copyleft/gpl.html for the full text of the
# license.
#
import os
import re
import sys
from optparse import OptionParser, SUPPRESS_HELP
from utils import run, read_list
from utils import read_total_list, get_release_name
from utils import createbug
from utils import query_user, query_user_bool

KSCVERSION = "ksc - Version 1.6"


class Ksc(object):

    # RE to detect ksc cli execution
    HEADER_RE                 = re.compile(r"\[command: (?P<cmd>.*)\]")

    # RE to extract KO basename
    SECTION_KO_RE             = re.compile(r'{(?P<ko_file>.*)}')

    # RE to match with KO-body section
    LISTS_RE                  = re.compile(r'.*\[WHITELISTUSAGE\](?P<wl>.*)\[NONWHITELISTUSAGE\]\s*(?P<gl>.*)', re.S)

    # RE to extract symbol and its justification
    JUSTIFICATION_RE          = re.compile(r'#*\s*\((?P<symbol>.*)\)\s*(?P<justification>[^#]*)')

    # Non-whitelisted symbols justification free-form entries
    JUSTIFICATION_PLACEHOLDER = "ENTER JUSTIFICATION TEXT HERE"
    JUSTIFICATION_REFERENCE   = "JUSTIFICATION STATED UNDER `%s' SECTION"
    JUSTIFICATION_REF_DETECT  = re.compile(r"JUSTIFICATION STATED UNDER `(.*)' SECTION")
    JUSTIFICATION_SEPARATOR   = '#' * 10
    JUSTIFICATION_BODY        = '\n(%s)\n\n%s\n\n'

    # Sections
    SECTION_WHITELISTS        = "[WHITELISTUSAGE]\n"
    SECTION_CO_WHITELISTS     = "[NONWHITELISTUSAGE]\n"

    def __init__(self, mock=False):
        """
        Init call
        """
        self.all_symbols_used = {}
        self.nonwhite_symbols_used = {}
        self.white_symbols = {}
        self.defined_symbols = {}
        self.justified_symbols = {}
        self.justifications = {}
        self.matchdata = None
        self.total = None
        self.verbose = False
        self.mock = mock
        self.releasedir = None
        self.symvers = None
        self.arch = None
        self.vermagic = {}
        if mock:
            self.releasename = '7.0'
        else:
            self.releasename = None

    def clean(self):
        self.all_symbols_used = {}
        self.nonwhite_symbols_used = {}
        self.white_symbols = {}
        self.defined_symbols = {}
        self.justified_symbols = {}
        self.justifications = {}
        self.matchdata = None
        self.total = None
        self.vermagic = {}

    def main(self, mock_options=None):
        """
        Main function for the logic
        """
        filename = os.path.join(os.path.expanduser("~"), "ksc-result.txt")
        # default architecture
        self.arch = "x86_64"

        parser = OptionParser()
        parser.add_option("-c", "--config", dest="config",
                          help="path to configuration file", metavar="CONFIG")
        parser.add_option("-k", "--ko", action="append", dest="ko",
                          help="path to the ko file", metavar="KO")
        parser.add_option("-K", "--ko-dependency", action="append",
                          dest="ko_dependency", help="path to a dependency",
                          metavar="DEPENDENCY")
        parser.add_option("-n", "--name", dest="releasename",
                          help="Red Hat release to file the bug against, "
                               "e.g '6.7'", metavar="RELEASENAME")
        parser.add_option("-p", "--previous", dest="previous",
                          help="path to previous resultset to submit as bug")
        parser.add_option("-r", "--release", dest="release",
                          help="RHEL whitelist release to compare against, "
                               "e.g '6.7'", metavar="RELEASE")
        parser.add_option("-y", "--symvers", dest="symvers",
                          help="Path to the Module.symvers file. "
                               "The current kernel path is used if "
                               "not specified.",
                          metavar="SYMVERS")
        parser.add_option("-s", "--submit",
                          action="store_true", dest="submit", default=False,
                          help="Submit to Red Hat Bugzilla")
        parser.add_option("-v", "--version",
                          action="store_true", dest="VERSION", default=False,
                          help="Prints KSC version number")
        parser.add_option("-j", "--justification-from", action="append",
                          dest="justification_from", metavar="JUSTIFICATION",
                          help="read justification from previous ksc-result")

        if not self.mock:  # pragma: no cover
            (options, args) = parser.parse_args(sys.argv[1:])
        else:  # pragma: no cover
            (options, args) = parser.parse_args(mock_options)

        if options.VERSION:
            print(KSCVERSION)
            sys.exit(0)

        # Create the ksc.conf config path
        if options.config:
            path = os.path.abspath(os.path.expanduser(options.config))
        else:
            path = os.path.expanduser("~/ksc.conf")

        if options.releasename:
            self.releasename = options.releasename
            if not self.valid_release_version(self.releasename):
                sys.exit(1)

        if options.release:
            if not self.valid_release_version(options.release):
                sys.exit(1)

        if options.releasename and options.release and \
                options.release != options.releasename:
            print("Release and release name do not match.")
            sys.exit(1)

        if options.previous:  # Submit the result of previous run
            filename = os.path.abspath(os.path.expanduser(options.previous))
            if os.path.basename(filename) != 'ksc-result.txt':
                print("Please point to the ksc-result.txt file in -p option.")
                sys.exit(1)

            self.submit(filename, path)
            return

        self.releasedir = 'kabi-current'
        if options.release:
            if not self.valid_release_version(options.release):
                sys.exit(1)

            self.releasedir = 'kabi-rhel' + options.release.replace('.', '')

        if options.symvers:
            self.symvers = options.symvers

        if options.justification_from:
            for file in options.justification_from:
                self.read_justifications(file)

        if options.ko_dependency:
            for kmod_path in options.ko_dependency:
                self.parse_ko(kmod_path, process_whitelists=False)

        if options.ko:
            self.find_arch(options.ko)

            exists = self.read_data(self.arch, self.releasedir, self.symvers)
            # Return if there is any issue in reading whitelists
            if not exists:
                print(("Release %s for arch %s was not found.\n"
                      "Do you have right kernel-abi-whitelist installed ?" %
                       (self.releasedir, self.arch)))
                sys.exit(1)

            for kmod_path in options.ko:
                self.parse_ko(kmod_path, process_whitelists=True)

            self.remove_internal_symbols()

            for kmod_path in options.ko:
                self.print_result(kmod_path)

            self.save_result()

        else:  # pragma: no cover
            print("You need to provide a path to at least one .ko file.")
            sys.exit(1)

        # Now save the result

        if not options.submit:
            return

        if not self.mock:  # pragma: no cover
            self.get_justification(filename)
        self.submit(filename, path)

    def read_justifications(self, filepath):
        filepath = os.path.abspath(os.path.expanduser(filepath))

        if not os.path.isfile(filepath):
            print("Filename `%s' does not exist!" % filepath)
            return

        with open(filepath, "r") as fd:

            filename_section = ""

            for file_contents in re.split("({[^}]*})", fd.read()):

                filename_match = self.SECTION_KO_RE.match(file_contents)
                if filename_match:
                    filename_section = filename_match.group('ko_file')

                match = self.LISTS_RE.match(file_contents)
                if not match:
                    continue

                for symbol, justification in \
                        self.JUSTIFICATION_RE.findall(file_contents):
                    symbol = symbol.strip()
                    justification = justification.strip()

                    if justification == self.JUSTIFICATION_PLACEHOLDER:
                        continue

                    if self.JUSTIFICATION_REF_DETECT.match(justification):
                        continue

                    if filename_section not in self.justifications:
                        self.justifications[filename_section] = {}

                    self.justifications[filename_section][symbol] = \
                            justification

                    if symbol not in self.justified_symbols:
                        self.justified_symbols[symbol] = \
                                os.path.basename(filename_section)

    def valid_release_version(self, release):
        rels = release.split(".")
        if len(rels) != 2:
            print("Invalid release: %s" % release)
            return False
        if not rels[0].isdigit() or int(rels[0]) <= 5:
            print("Invalid release: %s" % release)
            return False
        return True

    def submit_get_consent(self):

        """
        Part of the submission process. User gets queried for Red Hat's
        receipt and internal use. Consent is mandatory.
        """

        consent_string = "By using ksc to upload your data to Red Hat, " \
            "you consent to Red Hat's receipt use and analysis of this " \
            "data. Do you agree? [y/N] "

        consent = query_user_bool(consent_string)
        if consent.lower() != 'y':
            print("Cannot proceed without consent. Qutting.")
            sys.exit(1)

    def submit_get_release(self):

        """
        Part of the submission process. User gets queried for release if
        not explicitly provided via argv.
        """

        # Release has been specified as argv, no need to query user at this time
        if self.releasename is not None:
            return

        print("RHEL release not specified with -n flag. Defaulting to your "
            "system's RHEL release.")

        self.releasename = get_release_name()
        use_sys_release  = ""
        if not self.releasename:
            print("Unable to determine system's release name. Please specify.")

        else:
            query = "File bug against RHEL release %s?" % self.releasename
            query += "\n"
            query += "y/N: "

            use_sys_release = query_user_bool(query)

            if not use_sys_release:
                print("Unable to determine user option. Qutting.")
                sys.exit(1)

        # Either system-determine RHEL release is not what user wishes to file
        # against, or ksc couldn't determine the release; query user to specify
        if use_sys_release.lower() == 'n' or not self.releasename:
            release_name = query_user(
                "Please enter valid RHEL release to file bug against: ",
                is_valid=self.valid_release_version
            )

            if not release_name:
                print("Unable to determine a valid RHEL release. Qutting.")
                sys.exit(1)

            else:
                print("Using RHEL %s release." % release_name)

    def submit(self, filename, path):
        """
        Submits the resultset into Red Hat bugzilla.
        Asks user for Red Hat internal processing consent.
        If not set, determines and asks which RHEL release to use.

        :arg filename: Full path the ksc-result.txt file.
        :arg path: Path to the config file.
        """
        try:
            with open(filename, "r") as fptr:
                line = fptr.readline().strip()
                module_name = self.get_module_name(line)

        except IOError as err:
            print("Unable to read previous result: {}".format(err))
            sys.exit(1)

        if not self.mock:  # Ask for user permission
            self.submit_get_consent()
            self.submit_get_release()

        createbug(filename, self.arch, mock=self.mock, path=path,
                  releasename=self.releasename, module=module_name)

    def get_justification(self, filename):
        """
        Get the justification from User
        on non-whitelist symbols

        """
        bold = "\033[1m"
        reset = "\033[0;0m"

        print(bold)
        print('On the next screen, the result log will be opened to allow')
        print('you to provide technical justification on why these symbols')
        print('need to be included in the KABI whitelist.')
        print('Please provide sufficient information in the log, marked with ')
        print('the line below:')

        print(("\n%s\n" % self.JUSTIFICATION_PLACEHOLDER) + reset)
        print(bold + 'Press ENTER for next screen.' + reset)
        try:
            input()
        except EOFError:
            print("Warning. Running in a non-interactive mode.")

        editor = os.getenv('EDITOR')
        if editor:
            os.system(editor + ' ' + filename)
        else:
            os.system('vi ' + filename)
        return True

    def save_result(self):
        """
        Save the result in a text file
        """
        output_filename = os.path.expanduser("~/ksc-result.txt")
        if os.path.isfile(output_filename):

            overwrite_result_query = "ksc-result.txt already exists. " \
                    "Overwrite? [y/N]: "
            overwrite = query_user_bool(overwrite_result_query)

            if overwrite.lower() != 'y':
                print("Unable to get an explicit overwrite acknowledgement. "
                        "Quitting.")
                sys.exit(1)

        if os.path.isfile(output_filename):
            with open(output_filename, 'w+') as f:
                f.truncate()

        try:
            with open(output_filename, "a") as f:
                command = "[command: %s]\n" % " ".join(sys.argv)

                f.write(command)
                for ko_file in self.all_symbols_used:
                    f.write("\n{%s@%s}\n\n" % (
                        os.path.basename(ko_file),
                        self.vermagic[ko_file].strip()
                    ))
                    self.write_result(f, ko_file)

            if not self.mock:
                print("A copy of the report is saved in %s" % output_filename)

        except Exception as e:
            print("Error in saving the result file at %s" % output_filename)
            print(e)
            sys.exit(1)

    def print_result(self, kmod_path):
        """
        Print the result (score)
        """

        print("Processing %s" % kmod_path)

        for name in self.nonwhite_symbols_used[kmod_path]:
            if name not in self.total:
                print("WARNING: External symbol in %s does not "
                      "exist in current kernel: %s" \
                      % (os.path.basename(kmod_path),name))

        total_len = len(self.all_symbols_used[kmod_path])
        non_white = len(self.nonwhite_symbols_used[kmod_path])
        white_len = float(len(self.white_symbols[kmod_path]))

        if total_len == 0:  # pragma: no cover
            print("No kernel symbol usage found in %s." % kmod_path)
            return

        score = (white_len / total_len) * 100

        if not self.mock:
            print("Checking against architecture %s" % self.arch)
            print("Total symbol usage: %s\t"
                  "Total Non white list symbol usage: %s"
                  % (total_len, non_white))
            print("Score: %0.2f%%\n" % score)

    def find_arch(self, kmod_list):
        """
        Finds the architecture of the file in given path
        """
        rset = {'littleendianIntel80386': 'i686',
                'bigendianPowerPC64': 'ppc64',
                'littleendianPowerPC64': 'ppc64le',
                'littleendianAdvancedMicroDevicesX86-64': 'x86_64',
                'bigendianIBMS/390': 's390x',
                'littleendianAArch64': 'aarch64'}
        arch = []
        for kmod_path in kmod_list:
            try:
                data = run("readelf -h %s | grep -e Data -e Machine | awk -F "
                        "':' '{print $2}' | paste -d ' '  - - | awk -F ',' "
                        "'{print $2}' | sed 's/[ \t]*//g'" % kmod_path)
                arch.append(rset[data.strip()])
            except IOError as e:
                print(e, end=' ')
                print(("(Only kernel object files are supported)")
                    if "No such file" not in str(e)
                    else "")
                sys.exit(1)
            except KeyError:
                print("%s: Invalid architecture. (only %s are supported)"
                    % (kmod_path, ', '.join(sorted(rset.values()))))
                sys.exit(1)

        arch = list(set(arch))
        if len(arch) > 1:
            print("Object files for multiple architectures were provided (%s)."
                % ', '.join(sorted(arch)))
            sys.exit(1)

        self.arch = arch[0]

    def write_result(self, f, ko_file):
        """
        Save the result set in the given file
        """
        try:
            ko_basename = os.path.basename(ko_file)

            f.write("[%s]\n" % self.arch)

            # Write whitelisted symbols
            f.write(self.SECTION_WHITELISTS)
            for name in sorted(self.white_symbols[ko_file]):
                f.write(name + '\n')

            # Write non-whitelisted symbols as well as their justification
            # Justification can be one of:
            #  - free-form entry
            #  - reference to a different kernel module section (if exists)
            #  - justification placeholder later to be specified by hand
            f.write(self.SECTION_CO_WHITELISTS)
            for name in sorted(self.nonwhite_symbols_used[ko_file]):

                justification=""
                if name in self.justified_symbols \
                        and ko_basename != self.justified_symbols[name]:
                    justification=self.JUSTIFICATION_REFERENCE % \
                            self.justified_symbols[name]
                elif ko_basename in self.justifications and \
                        name in self.justifications[ko_basename]:
                    justification=self.justifications[ko_basename][name]
                elif "" in self.justifications and \
                        name in self.justifications[ko_basename]:
                    justification=self.justifications[ko_basename][name]
                else:
                    justification=self.JUSTIFICATION_PLACEHOLDER
                    self.justified_symbols[name] = os.path.basename(ko_file)

                f.write(self.JUSTIFICATION_SEPARATOR)
                f.write(self.JUSTIFICATION_BODY % (name, justification))

            if self.nonwhite_symbols_used[ko_file]:
                f.write(self.JUSTIFICATION_SEPARATOR)
                f.write('\n')
        except Exception as err:
            print(err)

    def read_data(self, arch, releasedir, symvers):
        """
        Read both data files
        """
        self.matchdata, exists = read_list(arch, releasedir, self.verbose)
        self.total = read_total_list(symvers)
        return exists

    def parse_ko(self, path, process_whitelists=True):
        """
        parse a ko file
        """
        if process_whitelists:
            self.nonwhite_symbols_used[path] = set()
            self.all_symbols_used[path] = set()
            self.nonwhite_symbols_used[path] = set()
            self.white_symbols[path] = set()

        self.defined_symbols[path] = set()

        try:
            self.vermagic[path] = run("modinfo -F vermagic '%s'" % path)
        except Exception as e:
            print(e)
            sys.exit(1)

        try:
            out = run("nm '%s'" % path)
        except Exception as e:
            print(e)
            sys.exit(1)

        for line in out.split("\n"):
            data = line.split(" ")
            if len(data) < 2:
                continue
            if "U " in line and process_whitelists:
                self.find_if(path, data[len(data)-1])
            else:
                self.defined_symbols[path].add(data[len(data)-1])

    def remove_internal_symbols(self):
        for i in self.nonwhite_symbols_used:
            for j in self.defined_symbols:
                if i == j:
                    continue
                self.nonwhite_symbols_used[i] -= self.defined_symbols[j]

    def find_if(self, path, name):
        """
        Find if the symbol is in whitelist or not
        """
        self.all_symbols_used[path].add(name)
        if name in self.matchdata:
            self.white_symbols[path].add(name)
        else:
            self.nonwhite_symbols_used[path].add(name)

    def get_module_name(self, command_line):
        try:
            match = self.HEADER_RE.match(command_line)
            if not match:
                return None
            commands = match.group("cmd").split()

            # Ignore undefined options in parser instead of throwing error
            class IOptParse(OptionParser):
                def error(self, msg):
                    pass

            parser = IOptParse()
            parser.add_option("-k", "--ko")
            opts, _ = parser.parse_args(commands[0:])
            return opts.ko
        except Exception:
            return None


if __name__ == '__main__':
    k = Ksc()
    k.main()
    sys.exit(0)