Blob Blame History Raw
# Copyright 2012,2018 Red Hat Inc.
# Author: Kushal Das <kdas@redhat.com>

# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.  See
# http://www.gnu.org/copyleft/gpl.html for the full text of the
# license.
#

"""
Helper functions for ksc.
"""

import os
import re
import sys
import time
import getpass
import subprocess
import locale

from bugzilla import Bugzilla, BugzillaError

# whitelist directory
WHPATH = '/lib/modules'
# Module.symvers directory
SRCPATH = '/usr/src/kernels'

def query_user(query, max_tries=10, is_valid=lambda x: len(x) > 0):
    """
    Queries user for a value.

    :arg query:     query string
    :arg max_tries: maximal number of times user will be prompted to give a
                    valid reply (avoid cycling)
    :arg is_valid:  lambda function that determines if and when user supplied
                    input is valid

    :return         response     if valid
    :return         ""           if received max_tries invalid reponses
    :return         ""           if we couldn't read data from stdin
    """
    tries_left = max_tries
    response = ""
    while not is_valid(response):
        if tries_left < max_tries:
            if response == "":
                print("Empty response received. Please try again.")
            else:
                print("Option `%s' is invalid. Please try again." % response)

        if tries_left == 0:
            print("Reached maximum number of invalid responses.")
            return ""

        try:
            tries_left = tries_left - 1
            response = input(query)
        except EOFError:
            print("Reached early EOF.")
            return ""

    return response

def query_user_bool(query):
    """
    Queries user for a Y/N value

    :arg query:     query string
    :return         response     if valid
    :return         ""           if received max_tries invalid reponses
    :return         ""           if we couldn't read data from stdin
    """
    check_fx = lambda x: x.lower() in ['y', 'n']
    return query_user(query, is_valid=check_fx)

def get_release_name():
    if not os.path.isfile('/etc/redhat-release'):
        print('This tool needs to run on Red Hat Enterprise Linux')
        return None

    with open('/etc/redhat-release', 'r') as fptr:
        release = fptr.read().split(' ')
        if len(release) <= 6:
            print('This tool needs to run on Red Hat Enterprise Linux')
            return None
    for rel in release:
        if re.match("\d.\d+",rel):
            return rel
    print('This tool needs to run on Red Hat Enterprise Linux')
    return None

def read_list(arch, kabipath, verbose=False):
    """
    Reads a whitelist file and returns the symbols
    """
    result = []
    fpath = os.path.join(WHPATH, kabipath, "kabi_whitelist_%s" % arch)
    if not os.path.isfile(fpath):  # pragma: no cover
        print("File not found:", fpath)
        return [], False
    try:
        if verbose:  # pragma: no cover
            print("Reading %s" % fpath)
        fptr = open(fpath)
        for line in fptr.readlines():
            if line.startswith("["):
                continue
            result.append(line.strip("\n\t"))
        fptr.close()
    except IOError as err:  # pragma: no cover
        print(err)
        print("whitelist missing")

    return result, True


def read_total_list(symvers=None):
    """
    Reads total symbol list and returns the list
    """
    if not symvers:
        release = os.uname()[2]
        symvers = os.path.join(SRCPATH, release, "Module.symvers")
    if not os.path.isfile(symvers):  # pragma: no cover
        print("File not found:", symvers)
        print("Do you have current kernel-devel package installed?")
        sys.exit(1)
    result = []
    try:
        with open(symvers, "r") as fptr:
            for line in fptr.readlines():
                if line.startswith("["):
                    continue  # pragma: no cover
                result.append(line.split()[1])
    except IOError as err:  # pragma: no cover
        print(err)
        print("Missing all symbol list")
    return result


def run(command):
    """
    runs the given command
    """
    ret = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE,
                           stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                           close_fds=True)
    out, err = ret.communicate()
    if err:
        errs = err.decode(locale.getpreferredencoding()).split(':', 1)
        raise IOError(errs[1].strip() if len(errs) > 1 else err)
    return out.decode(locale.getpreferredencoding())


def getconfig(path='/etc/ksc.conf', mock=False):
    """
    Returns the bugzilla config
    """
    result = {}
    result['partner'] = ''

    if not os.path.isfile(path):
        path = '/etc/ksc.conf'
    try:
        fptr = open(path)
        lines = fptr.readlines()
        fptr.close()
        for line in lines:
            if line.startswith("user="):
                result["user"] = line[5:-1]
            elif line.startswith("partner="):
                result["partner"] = line[8:-1]
            elif line.startswith("server="):
                result["server"] = line[7:-1]
            elif line.startswith("partnergroup="):
                result["group"] = line[13:-1]
            elif line.startswith("api_key="):
                result["api_key"] = line[8:-1]
        if 'user' not in result and 'api_key' not in result:
            print("Either user name or api_key must be specified in configuration.")
            return False
        if ('server' not in result or
                not result['server'].endswith('xmlrpc.cgi')):
            print("Servername is not valid in configuration.")
            return False
        if not mock:  # pragma: no cover
            if not result['partner'] or result['partner'] == 'partner-name':
                result["partner"] = input("Partner name: ")
            if not result['group'] or result['group'] == 'partner-group':
                result['group'] = input("Partner group: ")
            if "api_key" not in result or not result['api_key'] or result['api_key'] == 'api_key':
                print('Current Bugzilla user: %s' % result['user'])
                result['password'] = getpass.getpass('Please enter password: ')
            else:
                print('Using API Key for authentication')
        else:
            result['password'] = 'mockpassword'
        if not result['user']:
            print("Error: missing values in configuration file.")
            print("Bug not submitted")
            sys.exit(1)
    except Exception as err:
        print("Error reading %s" % path)
        sys.exit(1)
    return result


def createbug(filename, arch, mock=False, path='/etc/ksc.conf',
              releasename='7.0', module=None):
    """
    Opens a bug in the Bugzilla
    """

    if releasename.startswith('6.'):
        bughash = {'product': 'Red Hat Enterprise Linux 6'}
    elif releasename.startswith('7.'):
        bughash = {'product': 'Red Hat Enterprise Linux 7'}
    elif releasename.startswith('8.'):
        bughash = {'product': 'Red Hat Enterprise Linux 8'}
    else:
        print("Invalid releasename: Bug not created")
        return
    bughash["component"] = 'kernel'
    bughash["sub_component"] = 'kabi-whitelists'
    bughash["summary"] = "kABI Symbol Usage"
    bughash["version"] = releasename
    bughash["platform"] = arch
    bughash["severity"] = "medium"
    bughash["priority"] = "medium"
    bughash["description"] = "Creating the bug to attach the symbol " + \
                             "usage details."
    bughash["qa_contact"] = "kernel-qe@redhat.com"
    groups = []

    if module:
        bughash["summary"] += " ({})".format(str(module))

    # We change the path if only it is mock
    if mock:
        print("Using local config file data/ksc.conf")
        path = './data/ksc.conf'

    try:
        conf = getconfig(path, mock=mock)
    except Exception as err:
        print("Problem in parsing the configuration.")
        print(err)
        return

    if not conf:
        return
    if 'group' in conf:
        if conf['group'] != 'partner-group':
            groups.append(conf['group'])

    groups = list(filter(lambda x: len(x) > 0, groups))
    if not groups:
        print("Error: Please specify a non-empty partner-group config " +\
              "option in your ksc.conf config file or in the prompt above. " +\
              "Bug was not filed!")
        return

    bughash["groups"] = groups

    if 'api_key' in conf and conf['api_key'] != 'api_key':
        bughash["Bugzilla_api_key"] = conf["api_key"]
    else:
        bughash["Bugzilla_login"] = conf["user"]
        bughash["Bugzilla_password"] = conf["password"]
    bughash["cf_partner"] = [conf["partner"], ]

    bugid = 0
    try:
        if 'api_key' in conf and conf['api_key'] != 'api_key':
            bz = Bugzilla(
                url=conf['server'],
                api_key=conf["api_key"]
            )
        else:
            bz = Bugzilla(
                url=conf['server'],
                user=conf["user"],
                password=conf["password"]
            )

        if not mock:  # pragma: no cover
            print("Creating a new bug")

        try:
            ret = bz.build_createbug(
                product=bughash['product'],
                component=bughash['component'],
                sub_component=bughash['sub_component'],
                summary=bughash['summary'],
                version=bughash['version'],
                platform=bughash['platform'],
                qa_contact=bughash['qa_contact'],
                severity=bughash['severity'],
                priority=bughash['priority'],
                description=bughash['description'],
                groups=bughash['groups']
            )
            ret['cf_partner'] = bughash['cf_partner']
            bug = bz.createbug(ret)

            bugid = bug.id

            if not mock:  # pragma: no cover
                print("Bug URL %s/show_bug.cgi?id=%s" % \
                      (conf['server'][:-11], bugid))
                print("Attaching the report")

            dhash = {}
            dhash["filename"] = "ksc-result.txt"
            dhash["contenttype"] = "text/plain"
            desc = "kABI symbol usage."

            for _ in range(3):
                with open(filename, "r") as fptr:
                    attachment_id = bz.attachfile(bugid, fptr, desc, **dhash)

                if not mock:  # pragma: no cover
                    if not attachment_id:
                        time.sleep(1)
                    else:
                        print("Attached successfully as %s on bug %s" % (attachment_id, bugid))
                        break
            else:
                print("Failed to attach symbol usage result")
                sys.exit()

        except Exception as err:  # pragma: no cover
            print("Could not create bug. %s" % err)
            if not mock:
                sys.exit(1)
    except BugzillaError as err:
        print("Bug not submitted. %s" % err)
        if not mock:
            sys.exit(1)

    return bugid