Blob Blame History Raw
# bugzilla - a commandline frontend for the python bugzilla module
# Copyright (C) 2007-2017 Red Hat Inc.
# Author: Will Woods <>
# Author: Cole Robinson <>
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.  See for
# the full text of the license.

from __future__ import print_function

import locale
from logging import getLogger, DEBUG, INFO, WARN, StreamHandler, Formatter
import argparse
import os
import re
import socket
import sys
import tempfile

# pylint: disable=import-error
if sys.version_info[0] >= 3:
    # pylint: disable=no-name-in-module,redefined-builtin
    from xmlrpc.client import Fault, ProtocolError
    from urllib.parse import urlparse
    basestring = (str, bytes)
    from xmlrpclib import Fault, ProtocolError
    from urlparse import urlparse
# pylint: enable=import-error

import requests.exceptions

import bugzilla


format_field_re = re.compile("%{([a-z0-9_]+)(?::([^}]*))?}")

log = getLogger(bugzilla.__name__)

# Util helpers #

def _is_unittest():
    return bool(os.getenv("__BUGZILLA_UNITTEST"))

def _is_unittest_debug():
    return bool(os.getenv("__BUGZILLA_UNITTEST_DEBUG"))

def to_encoding(ustring):
    string = ''
    if isinstance(ustring, basestring):
        string = ustring
    elif ustring is not None:
        string = str(ustring)

    if sys.version_info[0] >= 3:
        return string

    preferred = locale.getpreferredencoding()
    if _is_unittest():
        preferred = "UTF-8"
    return string.encode(preferred, 'replace')

def open_without_clobber(name, *args):
    Try to open the given file with the given mode; if that filename exists,
    try "name.1", "name.2", etc. until we find an unused filename.
    fd = None
    count = 1
    orig_name = name
    while fd is None:
            fd =, os.O_CREAT | os.O_EXCL, 0o666)
        except OSError as err:
            if err.errno == os.errno.EEXIST:
                name = "%s.%i" % (orig_name, count)
                count += 1
                raise IOError(err.errno, err.strerror, err.filename)
    fobj = open(name, *args)
    if fd != fobj.fileno():
    return fobj

def get_default_url():
    Grab a default URL from bugzillarc [DEFAULT] url=X
    from bugzilla.base import _open_bugzillarc
    cfg = _open_bugzillarc()
    if cfg:
        cfgurl = cfg.defaults().get("url", None)
        if cfgurl is not None:
            log.debug("bugzillarc: found cli url=%s", cfgurl)
            return cfgurl
    return DEFAULT_BZ

def setup_logging(debug, verbose):
    handler = StreamHandler(sys.stderr)
        "[%(asctime)s] %(levelname)s (%(module)s:%(lineno)d) %(message)s",

    if debug:
    elif verbose:

    if _is_unittest_debug():

# Option parsing #

def _setup_root_parser():
    epilog = 'Try "bugzilla COMMAND --help" for command-specific help.'
    p = argparse.ArgumentParser(epilog=epilog)

    default_url = get_default_url()

    # General bugzilla connection options
    p.add_argument('--bugzilla', default=default_url,
            help="bugzilla XMLRPC URI. default: %s" % default_url)
    p.add_argument("--nosslverify", dest="sslverify",
                 action="store_false", default=True,
                 help="Don't error on invalid bugzilla SSL certificate")
            help="client side certificate file needed by the webserver")

    p.add_argument('--login', action="store_true",
        help='Run interactive "login" before performing the '
             'specified command.')
    p.add_argument('--username', help="Log in with this username")
    p.add_argument('--password', help="Log in with this password")

    p.add_argument('--ensure-logged-in', action="store_true",
        help="Raise an error if we aren't logged in to bugzilla. "
             "Consider using this if you are depending on "
             "cached credentials, to ensure that when they expire the "
             "tool errors, rather than subtly change output.")
        action='store_false', default=True, dest='cache_credentials',
        help="Don't save any bugzilla cookies or tokens to disk, and "
             "don't use any pre-existing credentials.")

    p.add_argument('--cookiefile', default=None,
            help="cookie file to use for bugzilla authentication")
    p.add_argument('--tokenfile', default=None,
            help="token file to use for bugzilla authentication")

    p.add_argument('--verbose', action='store_true',
            help="give more info about what's going on")
    p.add_argument('--debug', action='store_true',
            help="output bunches of debugging info")
    p.add_argument('--version', action='version',

    # Allow user to specify BZClass to initialize. Kinda weird for the
    # CLI, I'd rather people file bugs about this so we can fix our detection.
    # So hide it from the help output but keep it for back compat
    p.add_argument('--bztype', default='auto', help=argparse.SUPPRESS)

    return p

def _parser_add_output_options(p):
    outg = p.add_argument_group("Output format options")
    outg.add_argument('--full', action='store_const', dest='output',
            const='full', default='normal',
            help="output detailed bug info")
    outg.add_argument('-i', '--ids', action='store_const', dest='output',
            const='ids', help="output only bug IDs")
    outg.add_argument('-e', '--extra', action='store_const',
            dest='output', const='extra',
            help="output additional bug information "
                 "(keywords, Whiteboards, etc.)")
    outg.add_argument('--oneline', action='store_const', dest='output',
            help="one line summary of the bug (useful for scripts)")
    outg.add_argument('--raw', action='store_const', dest='output',
            const='raw', help="raw output of the bugzilla contents")
            help="Print output in the form given. "
                 "You can use RPM-style tags that match bug "
                 "fields, e.g.: '%%{id}: %%{summary}'. See the man page "
                 "section 'Output options' for more details.")

def _parser_add_bz_fields(rootp, command):
    cmd_new = (command == "new")
    cmd_query = (command == "query")
    cmd_modify = (command == "modify")
    if cmd_new:
        comment_help = "Set initial bug comment/description"
    elif cmd_query:
        comment_help = "Search all bug comments"
        comment_help = "Add new bug comment"

    p = rootp.add_argument_group("Standard bugzilla options")

    p.add_argument('-p', '--product', help="Product name")
    p.add_argument('-v', '--version', help="Product version")
    p.add_argument('-c', '--component', help="Component name")
    p.add_argument('-t', '--summary', '--short_desc', help="Bug summary")
    p.add_argument('-l', '--comment', '--long_desc', help=comment_help)
    if not cmd_query:
        p.add_argument("--comment-tag", action="append",
                help="Comment tag for the new comment")
    p.add_argument("--sub-component", action="append",
        help="RHBZ sub component field")
    p.add_argument('-o', '--os', help="Operating system")
    p.add_argument('--arch', help="Arch this bug occurs on")
    p.add_argument('-x', '--severity', help="Bug severity")
    p.add_argument('-z', '--priority', help="Bug priority")
    p.add_argument('--alias', help='Bug alias (name)')
    p.add_argument('-s', '--status', '--bug_status',
        help='Bug status (NEW, ASSIGNED, etc.)')
    p.add_argument('-u', '--url', help="URL field")
    p.add_argument('-m', '--target_milestone', help="Target milestone")
    p.add_argument('--target_release', help="RHBZ Target release")

    p.add_argument('--blocked', action="append",
        help="Bug IDs that this bug blocks")
    p.add_argument('--dependson', action="append",
        help="Bug IDs that this bug depends on")
    p.add_argument('--keywords', action="append",
        help="Bug keywords")
    p.add_argument('--groups', action="append",
        help="Which user groups can view this bug")

    p.add_argument('--cc', action="append", help="CC list")
    p.add_argument('-a', '--assigned_to', '--assignee', help="Bug assignee")
    p.add_argument('-q', '--qa_contact', help='QA contact')

    if not cmd_new:
        p.add_argument('-f', '--flag', action='append',
            help="Bug flags state. Ex:\n"
                 "  --flag needinfo?\n"
                 "  --flag dev_ack+ \n"
                 "  clear with --flag needinfoX")
        p.add_argument("--tags", action="append",
                help="Tags/Personal Tags field.")

        p.add_argument('-w', "--whiteboard", '--status_whiteboard',
            action="append", help='Whiteboard field')
        p.add_argument("--devel_whiteboard", action="append",
            help='RHBZ devel whiteboard field')
        p.add_argument("--internal_whiteboard", action="append",
            help='RHBZ internal whiteboard field')
        p.add_argument("--qa_whiteboard", action="append",
            help='RHBZ QA whiteboard field')
        p.add_argument('-F', '--fixed_in',
            help="RHBZ 'Fixed in version' field")

    # Put this at the end, so it sticks out more
        metavar="FIELD=VALUE", action="append", dest="fields",
        help="Manually specify a bugzilla XMLRPC field. FIELD is "
        "the raw name used by the bugzilla instance. For example if your "
        "bugzilla instance has a custom field cf_my_field, do:\n"
        "  --field cf_my_field=VALUE")

    # Used by unit tests, not for end user consumption
    p.add_argument('--__test-return-result', action="store_true",
        dest="test_return_result", help=argparse.SUPPRESS)

    if not cmd_modify:

def _setup_action_new_parser(subparsers):
    description = ("Create a new bug report. "
        "--product, --component, --version, --summary, and --comment "
        "must be specified. "
        "Options that take multiple values accept comma separated lists, "
        "including --cc, --blocks, --dependson, --groups, and --keywords.")
    p = subparsers.add_parser("new", description=description)

    _parser_add_bz_fields(p, "new")

def _setup_action_query_parser(subparsers):
    description = ("List bug reports that match the given criteria. "
        "Certain options can accept a comma separated list to query multiple "
        "values, including --status, --component, --product, --version, --id.")
    epilog = ("Note: querying via explicit command line options will only "
        "get you so far. See the --from-url option for a way to use powerful "
        "Web UI queries from the command line.")
    p = subparsers.add_parser("query",
        description=description, epilog=epilog)

    _parser_add_bz_fields(p, "query")

    g = p.add_argument_group("'query' specific options")
    g.add_argument('-b', '--id', '--bug_id',
        help="specify individual bugs by IDs, separated with commas")
    g.add_argument('-r', '--reporter',
        help="Email: search reporter email for given address")
        help="Search using bugzilla's quicksearch functionality.")
        help="Name of a bugzilla saved search. If you don't own this "
            "saved search, you must passed --savedsearch_sharer_id.")
        help="Owner ID of the --savedsearch. You can get this ID from "
            "the URL bugzilla generates when running the saved search "
            "from the web UI.")

    # Keep this at the end so it sticks out more
    g.add_argument('--from-url', metavar="WEB_QUERY_URL",
        help="Make a working query via bugzilla's 'Advanced search' web UI, "
             "grab the url from your browser (the string with query.cgi or "
             "buglist.cgi in it), and --from-url will run it via the "
             "bugzilla API. Don't forget to quote the string! "
             "This only works for Bugzilla 5 and Red Hat bugzilla")

    # Deprecated options
    p.add_argument('-E', '--emailtype', help=argparse.SUPPRESS)
    p.add_argument('--components_file', help=argparse.SUPPRESS)
    p.add_argument('-U', '--url_type',
    p.add_argument('-K', '--keywords_type',
    p.add_argument('-W', '--status_whiteboard_type',
    p.add_argument('-B', '--booleantype',
    p.add_argument('--boolean_query', action="append",
    p.add_argument('--fixed_in_type', help=argparse.SUPPRESS)

def _setup_action_info_parser(subparsers):
    description = ("List products or component information about the "
        "bugzilla server.")
    p = subparsers.add_parser("info", description=description)

    x = p.add_mutually_exclusive_group(required=True)
    x.add_argument('-p', '--products', action='store_true',
            help='Get a list of products')
    x.add_argument('-c', '--components', metavar="PRODUCT",
            help='List the components in the given product')
    x.add_argument('-o', '--component_owners', metavar="PRODUCT",
            help='List components (and their owners)')
    x.add_argument('-v', '--versions', metavar="PRODUCT",
            help='List the versions for the given product')
    p.add_argument('--active-components', action="store_true",
            help='Only show active components. Combine with --components*')

def _setup_action_modify_parser(subparsers):
    usage = ("bugzilla modify [options] BUGID [BUGID...]\n"
        "Fields that take multiple values have a special input format.\n"
        "Options that accept this format: --cc, --blocked, --dependson,\n"
        "    --groups, --tags, whiteboard fields.")
    p = subparsers.add_parser("modify", usage=usage)

    _parser_add_bz_fields(p, "modify")

    g = p.add_argument_group("'modify' specific options")
    g.add_argument("ids", nargs="+", help="Bug IDs to modify")
    g.add_argument('-k', '--close', metavar="RESOLUTION",
        help='Close with the given resolution (WONTFIX, NOTABUG, etc.)')
    g.add_argument('-d', '--dupeid', metavar="ORIGINAL",
        help='ID of original bug. Implies --close DUPLICATE')
    g.add_argument('--private', action='store_true', default=False,
        help='Mark new comment as private')
    g.add_argument('--reset-assignee', action="store_true",
        help='Reset assignee to component default')
    g.add_argument('--reset-qa-contact', action="store_true",
        help='Reset QA contact to component default')

def _setup_action_attach_parser(subparsers):
    usage = """
bugzilla attach --file=FILE --desc=DESC [--type=TYPE] BUGID [BUGID...]
bugzilla attach --get=ATTACHID --getall=BUGID [...]
bugzilla attach --type=TYPE BUGID [BUGID...]"""
    description = "Attach files or download attachments."
    p = subparsers.add_parser("attach", description=description, usage=usage)

    p.add_argument("ids", nargs="*", help="BUGID references")
    p.add_argument('-f', '--file', metavar="FILENAME",
            help='File to attach, or filename for data provided on stdin')
    p.add_argument('-d', '--description', '--summary',
            metavar="SUMMARY", dest='desc',
            help="A short summary of the file being attached")
    p.add_argument('-t', '--type', metavar="MIMETYPE",
            help="Mime-type for the file being attached")
    p.add_argument('-g', '--get', metavar="ATTACHID", action="append",
            default=[], help="Download the attachment with the given ID")
    p.add_argument("--getall", "--get-all", metavar="BUGID", action="append",
            default=[], help="Download all attachments on the given bug")
    p.add_argument('-l', '--comment', '--long_desc',
            help="Add comment with attachment")

def _setup_action_login_parser(subparsers):
    usage = 'bugzilla login [username [password]]'
    description = "Log into bugzilla and save a login cookie or token."
    p = subparsers.add_parser("login", description=description, usage=usage)
    p.add_argument("pos_username", nargs="?", help="Optional username",
    p.add_argument("pos_password", nargs="?", help="Optional password",

def setup_parser():
    rootparser = _setup_root_parser()
    subparsers = rootparser.add_subparsers(dest="command")
    subparsers.required = True
    return rootparser

# Command routines #

def _merge_field_opts(query, opt, parser):
    # Add any custom fields if specified
    if opt.fields is None:

    for f in opt.fields:
            f, v = f.split('=', 1)
            query[f] = v
        except Exception:
            parser.error("Invalid field argument provided: %s" % (f))

def _do_query(bz, opt, parser):
    q = {}

    # Parse preconstructed queries.
    u = opt.from_url
    if u:
        q = bz.url_to_query(u)

    if opt.components_file:
        # Components slurped in from file (one component per line)
        # This can be made more robust
        clist = []
        f = open(opt.components_file, 'r')
        for line in f.readlines():
            line = line.rstrip("\n")
        opt.component = clist

    if opt.status:
        val = opt.status
        stat = val
        if val == 'ALL':
            # leaving this out should return bugs of any status
            stat = None
        elif val == 'DEV':
            # Alias for all development bug statuses
            stat = ['NEW', 'ASSIGNED', 'NEEDINFO', 'ON_DEV',
                'MODIFIED', 'POST', 'REOPENED']
        elif val == 'QE':
            # Alias for all QE relevant bug statuses
            stat = ['ASSIGNED', 'ON_QA', 'FAILS_QA', 'PASSES_QA']
        elif val == 'EOL':
            # Alias for EndOfLife bug statuses
            stat = ['VERIFIED', 'RELEASE_PENDING', 'CLOSED']
        elif val == 'OPEN':
            # non-Closed statuses
            stat = ['NEW', 'ASSIGNED', 'MODIFIED', 'ON_DEV', 'ON_QA',
                'VERIFIED', 'RELEASE_PENDING', 'POST']
        opt.status = stat

    # Convert all comma separated list parameters to actual lists,
    # which is what bugzilla wants
    # According to bugzilla docs, any parameter can be a list, but
    # let's only do this for options we explicitly mention can be
    # comma separated.
    for optname in ["severity", "id", "status", "component",
                    "priority", "product", "version"]:
        val = getattr(opt, optname, None)
        if not isinstance(val, str):
        setattr(opt, optname, val.split(","))

    include_fields = None
    if opt.output == 'raw':
        # 'raw' always does a getbug() call anyways, so just ask for ID back
        include_fields = ['id']

    elif opt.outputformat:
        include_fields = []
        for fieldname, rest in format_field_re.findall(opt.outputformat):
            if fieldname == "whiteboard" and rest:
                fieldname = rest + "_" + fieldname
            elif fieldname == "flag":
                fieldname = "flags"
            elif fieldname == "cve":
                fieldname = ["keywords", "blocks"]
            elif fieldname == "__unicode__":
                # Needs to be in sync with bug.__unicode__
                fieldname = ["id", "status", "assigned_to", "summary"]

            flist = isinstance(fieldname, list) and fieldname or [fieldname]
            for f in flist:
                if f not in include_fields:

    if include_fields is not None:

    built_query = bz.build_query(
        product=opt.product or None,
        component=opt.component or None,
        sub_component=opt.sub_component or None,
        version=opt.version or None,
        reporter=opt.reporter or None, or None,
        short_desc=opt.summary or None,
        long_desc=opt.comment or None, or None,
        assigned_to=opt.assigned_to or None,
        qa_contact=opt.qa_contact or None,
        status=opt.status or None,
        blocked=opt.blocked or None,
        dependson=opt.dependson or None,
        keywords=opt.keywords or None,
        keywords_type=opt.keywords_type or None,
        url=opt.url or None,
        url_type=opt.url_type or None,
        status_whiteboard=opt.whiteboard or None,
        status_whiteboard_type=opt.status_whiteboard_type or None,
        fixed_in=opt.fixed_in or None,
        fixed_in_type=opt.fixed_in_type or None,
        flag=opt.flag or None,
        alias=opt.alias or None,
        qa_whiteboard=opt.qa_whiteboard or None,
        devel_whiteboard=opt.devel_whiteboard or None,
        boolean_query=opt.boolean_query or None,
        bug_severity=opt.severity or None,
        priority=opt.priority or None,
        target_release=opt.target_release or None,
        target_milestone=opt.target_milestone or None,
        emailtype=opt.emailtype or None,
        booleantype=opt.booleantype or None,
        quicksearch=opt.quicksearch or None,
        savedsearch=opt.savedsearch or None,
        savedsearch_sharer_id=opt.savedsearch_sharer_id or None,
        tags=opt.tags or None)

    _merge_field_opts(built_query, opt, parser)

    q = built_query

    if not q:
        parser.error("'query' command requires additional arguments")
    if opt.test_return_result:
        return q
    return bz.query(q)

def _do_info(bz, opt):
    Handle the 'info' subcommand
    # All these commands call getproducts internally, so do it up front
    # with minimal include_fields for speed
    def _filter_components(compdetails):
        ret = {}
        for k, v in compdetails.items():
            if v.get("is_active", True):
                ret[k] = v
        return ret

    productname = (opt.components or opt.component_owners or opt.versions)
    include_fields = ["name", "id"]
    fastcomponents = (opt.components and not opt.active_components)
    if opt.versions:
        include_fields += ["versions"]
    if opt.component_owners:
        include_fields += [
    if (opt.active_components and
        any(["components" in i for i in include_fields])):
        include_fields += ["components.is_active"]

    bz.refresh_products(names=productname and [productname] or None,

    if opt.products:
        for name in sorted([p["name"] for p in bz.getproducts()]):

    elif fastcomponents:
        for name in sorted(bz.getcomponents(productname)):

    elif opt.components:
        details = bz.getcomponentsdetails(productname)
        for name in sorted(_filter_components(details)):

    elif opt.versions:
        proddict = bz.getproducts()[0]
        for v in proddict['versions']:

    elif opt.component_owners:
        details = bz.getcomponentsdetails(productname)
        for c in sorted(_filter_components(details)):
            print(to_encoding(u"%s: %s" % (c,

def _convert_to_outputformat(output):
    fmt = ""

    if output == "normal":
        fmt = "%{__unicode__}"

    elif output == "ids":
        fmt = "%{id}"

    elif output == 'full':
        fmt += "%{__unicode__}\n"
        fmt += "Component: %{component}\n"
        fmt += "CC: %{cc}\n"
        fmt += "Blocked: %{blocks}\n"
        fmt += "Depends: %{depends_on}\n"
        fmt += "%{comments}\n"

    elif output == 'extra':
        fmt += "%{__unicode__}\n"
        fmt += " +Keywords: %{keywords}\n"
        fmt += " +QA Whiteboard: %{qa_whiteboard}\n"
        fmt += " +Status Whiteboard: %{status_whiteboard}\n"
        fmt += " +Devel Whiteboard: %{devel_whiteboard}\n"

    elif output == 'oneline':
        fmt += "#%{bug_id} %{status} %{assigned_to} %{component}\t"
        fmt += "[%{target_milestone}] %{flags} %{cve}"

        raise RuntimeError("Unknown output type '%s'" % output)

    return fmt

def _format_output(bz, opt, buglist):
    if opt.output == 'raw':
        buglist = bz.getbugs([b.bug_id for b in buglist])
        for b in buglist:
            print("Bugzilla %s: " % b.bug_id)
            for attrname in sorted(b.__dict__):
                print(to_encoding(u"ATTRIBUTE[%s]: %s" %
                                  (attrname, b.__dict__[attrname])))

    def bug_field(matchobj):
        # whiteboard and flag allow doing
        #   %{whiteboard:devel} and %{flag:needinfo}
        # That's what 'rest' matches
        (fieldname, rest) = matchobj.groups()

        if fieldname == "whiteboard" and rest:
            fieldname = rest + "_" + fieldname

        if fieldname == "flag" and rest:
            val = b.get_flag_status(rest)

        elif fieldname == "flags" or fieldname == "flags_requestee":
            tmpstr = []
            for f in getattr(b, "flags", []):
                requestee = f.get('requestee', "")
                if fieldname == "flags":
                    requestee = ""
                if fieldname == "flags_requestee":
                    if requestee == "":
                    tmpstr.append("%s" % requestee)
                    tmpstr.append("%s%s%s" %
                            (f['name'], f['status'], requestee))

            val = ",".join(tmpstr)

        elif fieldname == "cve":
            cves = []
            for key in getattr(b, "keywords", []):
                # grab CVE from keywords and blockers
                if key.find("Security") == -1:
                for bl in b.blocks:
                    cvebug = bz.getbug(bl)
                    for cb in cvebug.alias:
                        if cb.find("CVE") == -1:
                        if cb.strip() not in cves:
            val = ",".join(cves)

        elif fieldname == "comments":
            val = ""
            for c in getattr(b, "comments", []):
                val += ("\n* %s - %s:\n%s\n" % (c['time'],
                         c.get("creator", c.get("author", "")), c['text']))

        elif fieldname == "external_bugs":
            val = ""
            for e in getattr(b, "external_bugs", []):
                url = e["type"]["full_url"].replace("%id%", e["ext_bz_bug_id"])
                if not val:
                    val += "\n"
                val += "External bug: %s\n" % url

        elif fieldname == "__unicode__":
            val = b.__unicode__()
            val = getattr(b, fieldname, "")

        vallist = isinstance(val, list) and val or [val]
        val = ','.join([to_encoding(v) for v in vallist])

        return val

    for b in buglist:
        print(format_field_re.sub(bug_field, opt.outputformat))

def _parse_triset(vallist, checkplus=True, checkminus=True, checkequal=True,
    add_val = []
    rm_val = []
    set_val = None

    def make_list(v):
        if not v:
            return []
        if splitcomma:
            return v.split(",")
        return [v]

    for val in isinstance(vallist, list) and vallist or [vallist]:
        val = val or ""

        if val.startswith("+") and checkplus:
            add_val += make_list(val[1:])
        elif val.startswith("-") and checkminus:
            rm_val += make_list(val[1:])
        elif val.startswith("=") and checkequal:
            # Intentionally overwrite this
            set_val = make_list(val[1:])
            add_val += make_list(val)

    return add_val, rm_val, set_val

def _do_new(bz, opt, parser):
    # Parse options that accept comma separated list
    def parse_multi(val):
        return _parse_triset(val, checkplus=False, checkminus=False,
                             checkequal=False, splitcomma=True)[0]

    ret = bz.build_createbug(
        blocks=parse_multi(opt.blocked) or None,
        cc=parse_multi( or None,
        component=opt.component or None,
        depends_on=parse_multi(opt.dependson) or None,
        description=opt.comment or None,
        groups=parse_multi(opt.groups) or None,
        keywords=parse_multi(opt.keywords) or None,
        op_sys=opt.os or None,
        platform=opt.arch or None,
        priority=opt.priority or None,
        product=opt.product or None,
        severity=opt.severity or None,
        summary=opt.summary or None,
        url=opt.url or None,
        version=opt.version or None,
        assigned_to=opt.assigned_to or None,
        qa_contact=opt.qa_contact or None,
        sub_component=opt.sub_component or None,
        alias=opt.alias or None,
        comment_tags=opt.comment_tag or None,

    _merge_field_opts(ret, opt, parser)

    if opt.test_return_result:
        return ret

    b = bz.createbug(ret)
    return [b]

def _do_modify(bz, parser, opt):
    bugid_list = [bugid for a in opt.ids for bugid in a.split(',')]

    add_wb, rm_wb, set_wb = _parse_triset(opt.whiteboard)
    add_devwb, rm_devwb, set_devwb = _parse_triset(opt.devel_whiteboard)
    add_intwb, rm_intwb, set_intwb = _parse_triset(opt.internal_whiteboard)
    add_qawb, rm_qawb, set_qawb = _parse_triset(opt.qa_whiteboard)

    add_blk, rm_blk, set_blk = _parse_triset(opt.blocked, splitcomma=True)
    add_deps, rm_deps, set_deps = _parse_triset(opt.dependson, splitcomma=True)
    add_key, rm_key, set_key = _parse_triset(opt.keywords)
    add_cc, rm_cc, ignore = _parse_triset(,
    add_groups, rm_groups, ignore = _parse_triset(opt.groups,
    add_tags, rm_tags, ignore = _parse_triset(opt.tags, checkequal=False)

    status = opt.status or None
    if opt.dupeid is not None:
        opt.close = "DUPLICATE"
    if opt.close:
        status = "CLOSED"

    flags = []
    if opt.flag:
        # Convert "foo+" to tuple ("foo", "+")
        for f in opt.flag:
            flags.append({"name": f[:-1], "status": f[-1]})

    update = bz.build_update(
        assigned_to=opt.assigned_to or None,
        comment=opt.comment or None,
        comment_private=opt.private or None,
        component=opt.component or None,
        product=opt.product or None,
        blocks_add=add_blk or None,
        blocks_remove=rm_blk or None,
        url=opt.url or None,
        cc_add=add_cc or None,
        cc_remove=rm_cc or None,
        depends_on_add=add_deps or None,
        depends_on_remove=rm_deps or None,
        groups_add=add_groups or None,
        groups_remove=rm_groups or None,
        keywords_add=add_key or None,
        keywords_remove=rm_key or None,
        op_sys=opt.os or None,
        platform=opt.arch or None,
        priority=opt.priority or None,
        qa_contact=opt.qa_contact or None,
        severity=opt.severity or None,
        summary=opt.summary or None,
        version=opt.version or None,
        reset_assigned_to=opt.reset_assignee or None,
        reset_qa_contact=opt.reset_qa_contact or None,
        resolution=opt.close or None,
        target_release=opt.target_release or None,
        target_milestone=opt.target_milestone or None,
        dupe_of=opt.dupeid or None,
        fixed_in=opt.fixed_in or None,
        whiteboard=set_wb and set_wb[0] or None,
        devel_whiteboard=set_devwb and set_devwb[0] or None,
        internal_whiteboard=set_intwb and set_intwb[0] or None,
        qa_whiteboard=set_qawb and set_qawb[0] or None,
        sub_component=opt.sub_component or None,
        alias=opt.alias or None,
        flags=flags or None,
        comment_tags=opt.comment_tag or None,

    # We make this a little convoluted to facilitate unit testing
    wbmap = {
        "whiteboard": (add_wb, rm_wb),
        "internal_whiteboard": (add_intwb, rm_intwb),
        "qa_whiteboard": (add_qawb, rm_qawb),
        "devel_whiteboard": (add_devwb, rm_devwb),

    for k, v in wbmap.copy().items():
        if not v[0] and not v[1]:

    _merge_field_opts(update, opt, parser)

    log.debug("update bug dict=%s", update)
    log.debug("update whiteboard dict=%s", wbmap)

    if not any([update, wbmap, add_tags, rm_tags]):
        parser.error("'modify' command requires additional arguments")

    if opt.test_return_result:
        return (update, wbmap, add_tags, rm_tags)

    if add_tags or rm_tags:
        ret = bz.update_tags(bugid_list,
            tags_add=add_tags, tags_remove=rm_tags)
        log.debug("bz.update_tags returned=%s", ret)
    if update:
        ret = bz.update_bugs(bugid_list, update)
        log.debug("bz.update_bugs returned=%s", ret)

    if not wbmap:

    # Now for the things we can't blindly batch.
    # Being able to prepend/append to whiteboards, which are just
    # plain string values, is an old rhbz semantic that we try to maintain
    # here. This is a bit weird for traditional bugzilla XMLRPC
    log.debug("Adjusting whiteboard fields one by one")
    for bug in bz.getbugs(bugid_list):
        for wb, (add_list, rm_list) in wbmap.items():
            for tag in add_list:
                newval = getattr(bug, wb) or ""
                if newval:
                    newval += " "
                newval += tag
                               bz.build_update(**{wb: newval}))

            for tag in rm_list:
                newval = (getattr(bug, wb) or "").split()
                for t in newval[:]:
                    if t == tag:
                               bz.build_update(**{wb: " ".join(newval)}))

def _do_get_attach(bz, opt):
    for bug in bz.getbugs(opt.getall):
        opt.get += bug.get_attachment_ids()

    for attid in set(opt.get):
        att = bz.openattachment(attid)
        outfile = open_without_clobber(, "wb")
        data =
        while data:
            data =
        print("Wrote %s" %


def _do_set_attach(bz, opt, parser):
    if not opt.ids:
        parser.error("Bug ID must be specified for setting attachments")

    if sys.stdin.isatty():
        if not opt.file:
            parser.error("--file must be specified")
        fileobj = open(opt.file, "rb")
        # piped input on stdin
        if not opt.desc:
            parser.error("--description must be specified if passing "
                         "file on stdin")

        fileobj = tempfile.NamedTemporaryFile(prefix="bugzilla-attach.")
        data =

        while data:
            data =

    kwargs = {}
    if opt.file:
        kwargs["filename"] = os.path.basename(opt.file)
    if opt.type:
        kwargs["contenttype"] = opt.type
    if opt.type in ["text/x-patch"]:
        kwargs["ispatch"] = True
    if opt.comment:
        kwargs["comment"] = opt.comment
    desc = opt.desc or os.path.basename(

    # Upload attachments
    for bugid in opt.ids:
        attid = bz.attachfile(bugid, fileobj, desc, **kwargs)
        print("Created attachment %i on bug %s" % (attid, bugid))

# Main handling #

def _make_bz_instance(opt):
    Build the Bugzilla instance we will use
    if opt.bztype != 'auto':"Explicit --bztype is no longer supported, ignoring")

    cookiefile = None
    tokenfile = None
    if opt.cache_credentials:
        cookiefile = opt.cookiefile or -1
        tokenfile = opt.tokenfile or -1

    bz = bugzilla.Bugzilla(
    return bz

def _handle_login(opt, action, bz):
    Handle all login related bits
    is_login_command = (action == 'login')

    do_interactive_login = (is_login_command or
        opt.login or opt.username or opt.password)
    username = getattr(opt, "pos_username", None) or opt.username
    password = getattr(opt, "pos_password", None) or opt.password

        if do_interactive_login:
            if bz.url:
                print("Logging into %s" % urlparse(bz.url)[1])
            bz.interactive_login(username, password)
    except bugzilla.BugzillaError as e:

    if opt.ensure_logged_in and not bz.logged_in:
        print("--ensure-logged-in passed but you aren't logged in to %s" %

    if is_login_command:
        msg = "Login successful."
        if bz.cookiefile or bz.tokenfile:
            msg = "Login successful, token cache updated."


def _main(unittest_bz_instance):
    parser = setup_parser()
    opt = parser.parse_args()
    action = opt.command
    setup_logging(opt.debug, opt.verbose)

    log.debug("Launched with command line: %s", " ".join(sys.argv))
    log.debug("Bugzilla module: %s", bugzilla)

    # Connect to bugzilla'Connecting to %s', opt.bugzilla)

    if unittest_bz_instance:
        bz = unittest_bz_instance
        bz = _make_bz_instance(opt)

    # Handle login options
    _handle_login(opt, action, bz)

    # Run the actual commands #

    if hasattr(opt, "outputformat"):
        if not opt.outputformat and opt.output not in ['raw', None]:
            opt.outputformat = _convert_to_outputformat(opt.output)

    buglist = []
    if action == 'info':
        if not (opt.products or
                opt.components or
                opt.component_owners or
            parser.error("'info' command requires additional arguments")

        _do_info(bz, opt)

    elif action == 'query':
        buglist = _do_query(bz, opt, parser)
        if opt.test_return_result:
            return buglist

    elif action == 'new':
        buglist = _do_new(bz, opt, parser)
        if opt.test_return_result:
            return buglist

    elif action == 'attach':
        if opt.get or opt.getall:
            if opt.ids:
                parser.error("Bug IDs '%s' not used for "
                    "getting attachments" % opt.ids)
            _do_get_attach(bz, opt)
            _do_set_attach(bz, opt, parser)

    elif action == 'modify':
        modout = _do_modify(bz, parser, opt)
        if opt.test_return_result:
            return modout
        raise RuntimeError("Unexpected action '%s'" % action)

    # If we're doing new/query/modify, output our results
    if action in ['new', 'query']:
        _format_output(bz, opt, buglist)

def main(unittest_bz_instance=None):
            return _main(unittest_bz_instance)
        except (Exception, KeyboardInterrupt):
            log.debug("", exc_info=True)
    except (Fault, bugzilla.BugzillaError) as e:
        print("\nServer error: %s" % str(e))
    except requests.exceptions.SSLError as e:
        # Give SSL recommendations
        print("SSL error: %s" % e)
        print("\nIf you trust the remote server, you can work "
              "around this error with:\n"
              "  bugzilla --nosslverify ...")
    except (socket.error,
            ProtocolError) as e:
        print("\nConnection lost/failed: %s" % str(e))

def cli():
    except KeyboardInterrupt:
        log.debug("", exc_info=True)
        print("\nExited at user request.")