Blame org_fedora_oscap/content_handling.py

Packit 792a06
#
Packit 792a06
# Copyright (C) 2013  Red Hat, Inc.
Packit 792a06
#
Packit 792a06
# This copyrighted material is made available to anyone wishing to use,
Packit 792a06
# modify, copy, or redistribute it subject to the terms and conditions of
Packit 792a06
# the GNU General Public License v.2, or (at your option) any later version.
Packit 792a06
# This program is distributed in the hope that it will be useful, but WITHOUT
Packit 792a06
# ANY WARRANTY expressed or implied, including the implied warranties of
Packit 792a06
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
Packit 792a06
# Public License for more details.  You should have received a copy of the
Packit 792a06
# GNU General Public License along with this program; if not, write to the
Packit 792a06
# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
Packit 792a06
# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
Packit 792a06
# source code or documentation are not subject to the GNU General Public
Packit 792a06
# License and may only be used or replicated with the express permission of
Packit 792a06
# Red Hat, Inc.
Packit 792a06
#
Packit 792a06
# Red Hat Author(s): Vratislav Podzimek <vpodzime@redhat.com>
Packit 792a06
#
Packit 792a06
Packit 792a06
"""
Packit 792a06
Module with various classes for SCAP content processing and retrieving data
Packit 792a06
from it.
Packit 792a06
Packit 792a06
"""
Packit 792a06
Packit 792a06
import os.path
Packit 792a06
Packit 792a06
from collections import namedtuple, OrderedDict
Packit 792a06
from openscap_api import OSCAP
Packit 792a06
from pyanaconda.core.util import execReadlines
Packit 792a06
try:
Packit 792a06
    from html.parser import HTMLParser
Packit 792a06
except ImportError:
Packit 792a06
    from HTMLParser import HTMLParser
Packit 792a06
Packit 792a06
Packit 792a06
class ContentHandlingError(Exception):
Packit 792a06
    """Exception class for errors related to SCAP content handling."""
Packit 792a06
Packit 792a06
    pass
Packit 792a06
Packit 792a06
Packit 792a06
class DataStreamHandlingError(ContentHandlingError):
Packit 792a06
    """Exception class for errors related to data stream handling."""
Packit 792a06
Packit 792a06
    pass
Packit 792a06
Packit 792a06
Packit 792a06
class BenchmarkHandlingError(ContentHandlingError):
Packit 792a06
    """Exception class for errors related to benchmark handling."""
Packit 792a06
Packit 792a06
    pass
Packit 792a06
Packit 792a06
Packit 792a06
class ContentCheckError(ContentHandlingError):
Packit 792a06
    """Exception class for errors related to content (integrity,...) checking.
Packit 792a06
    
Packit 792a06
    """
Packit 792a06
Packit 792a06
    pass
Packit 792a06
Packit 792a06
Packit 792a06
class ParseHTMLContent(HTMLParser):
Packit 792a06
    """Parser class for HTML tags within content"""
Packit 792a06
Packit 792a06
    def __init__(self):
Packit 792a06
        HTMLParser.__init__(self)
Packit 792a06
        self.content = ""
Packit 792a06
Packit 792a06
    def handle_starttag(self, tag, attrs):
Packit 792a06
        if tag == "html:ul":
Packit 792a06
            self.content += "\n"
Packit 792a06
        elif tag == "html:li":
Packit 792a06
            self.content += "\n"
Packit 792a06
        elif tag == "html:br":
Packit 792a06
            self.content += "\n"
Packit 792a06
Packit 792a06
    def handle_endtag(self, tag):
Packit 792a06
        if tag == "html:ul":
Packit 792a06
            self.content += "\n"
Packit 792a06
        elif tag == "html:li":
Packit 792a06
            self.content += "\n"
Packit 792a06
Packit 792a06
    def handle_data(self, data):
Packit 792a06
        self.content += data.strip()
Packit 792a06
Packit 792a06
    def get_content(self):
Packit 792a06
        return self.content
Packit 792a06
Packit 792a06
Packit 792a06
def parse_HTML_from_content(content):
Packit 792a06
    """This is a very simple HTML to text parser.
Packit 792a06
Packit 792a06
    HTML tags will be removed while trying to maintain readability
Packit 792a06
    of content.
Packit 792a06
Packit 792a06
    :param content: content whose HTML tags will be parsed
Packit 792a06
    :return: content without HTML tags
Packit 792a06
    """
Packit 792a06
Packit 792a06
    parser = ParseHTMLContent()
Packit 792a06
    parser.feed(content)
Packit 792a06
    return parser.get_content()
Packit 792a06
Packit 792a06
Packit 792a06
# namedtuple class (not a constant, pylint!) for info about a XCCDF profile
Packit 792a06
# pylint: disable-msg=C0103
Packit 792a06
ProfileInfo = namedtuple("ProfileInfo", ["id", "title", "description"])
Packit 792a06
Packit 792a06
# namedtuple class for info about content files found
Packit 792a06
# pylint: disable-msg=C0103
Packit 792a06
ContentFiles = namedtuple("ContentFiles", ["xccdf", "cpe", "tailoring"])
Packit 792a06
Packit 792a06
Packit 792a06
def oscap_text_itr_get_text(itr):
Packit 792a06
    """
Packit 792a06
    Helper function for getting a text from the oscap_text_iterator.
Packit 792a06
Packit 792a06
    :param itr: oscap_text_iterator to get the text from
Packit 792a06
    :type itr: oscap_text_iterator
Packit 792a06
    :return: text gotten from the iterator
Packit 792a06
    :rtype: str
Packit 792a06
Packit 792a06
    """
Packit 792a06
Packit 792a06
    ret = ""
Packit 792a06
    while OSCAP.oscap_text_iterator_has_more(itr):
Packit 792a06
        text_item = OSCAP.oscap_text_iterator_next(itr)
Packit 792a06
        ret += OSCAP.oscap_text_get_text(text_item)
Packit 792a06
Packit 792a06
    return ret
Packit 792a06
Packit 792a06
Packit 792a06
def explore_content_files(fpaths):
Packit 792a06
    """
Packit 792a06
    Function for finding content files in a list of file paths. SIMPLY PICKS
Packit 792a06
    THE FIRST USABLE CONTENT FILE OF A PARTICULAR TYPE AND JUST PREFERS DATA
Packit 792a06
    STREAMS OVER STANDALONE BENCHMARKS.
Packit 792a06
Packit 792a06
    :param fpaths: a list of file paths to search for content files in
Packit 792a06
    :type fpaths: [str]
Packit 792a06
    :return: a tuple containing the content handling class and an ContentFiles
Packit 792a06
             instance containing the file names of the XCCDF file, CPE
Packit 792a06
             dictionary and tailoring file or "" in place of those items if not
Packit 792a06
             found
Packit 792a06
    :rtype: (class, ContentFiles)
Packit 792a06
Packit 792a06
    """
Packit 792a06
Packit 792a06
    def get_doc_type(file_path):
Packit 792a06
        try:
Packit 792a06
            for line in execReadlines("oscap", ["info", file_path]):
Packit 792a06
                if line.startswith("Document type:"):
Packit 792a06
                    _prefix, _sep, type_info = line.partition(":")
Packit 792a06
                    return type_info.strip()
Packit 792a06
        except OSError:
Packit 792a06
            # 'oscap info' exitted with a non-zero exit code -> unknown doc
Packit 792a06
            # type
Packit 792a06
            return None
Packit 792a06
Packit 792a06
    xccdf_file = ""
Packit 792a06
    cpe_file = ""
Packit 792a06
    tailoring_file = ""
Packit 792a06
    found_ds = False
Packit 792a06
    content_class = None
Packit 792a06
Packit 792a06
    for fpath in fpaths:
Packit 792a06
        doc_type = get_doc_type(fpath)
Packit 792a06
        if not doc_type:
Packit 792a06
            continue
Packit 792a06
Packit 792a06
        # prefer DS over standalone XCCDF
Packit 792a06
        if doc_type == "Source Data Stream" and (not xccdf_file or not found_ds):
Packit 792a06
            xccdf_file = fpath
Packit 792a06
            content_class = DataStreamHandler
Packit 792a06
            found_ds = True
Packit 792a06
        elif doc_type == "XCCDF Checklist" and not xccdf_file:
Packit 792a06
            xccdf_file = fpath
Packit 792a06
            content_class = BenchmarkHandler
Packit 792a06
        elif doc_type == "CPE Dictionary" and not cpe_file:
Packit 792a06
            cpe_file = fpath
Packit 792a06
        elif doc_type == "XCCDF Tailoring" and not tailoring_file:
Packit 792a06
            tailoring_file = fpath
Packit 792a06
Packit 792a06
    # TODO: raise exception if no xccdf_file is found?
Packit 792a06
    files = ContentFiles(xccdf_file, cpe_file, tailoring_file)
Packit 792a06
    return (content_class, files)
Packit 792a06
Packit 792a06
Packit 792a06
class DataStreamHandler(object):
Packit 792a06
    """
Packit 792a06
    Class for handling data streams in the data stream collection and
Packit 792a06
    retrieving data from it. For example a list of data stream indices,
Packit 792a06
    checklists in a given data stream of profiles.
Packit 792a06
Packit 792a06
    """
Packit 792a06
Packit 792a06
    def __init__(self, dsc_file_path, tailoring_file_path=""):
Packit 792a06
        """
Packit 792a06
        Constructor for the DataStreamHandler class.
Packit 792a06
Packit 792a06
        :param dsc_file_path: path to a file with a data stream collection
Packit 792a06
        :type dsc_file_path: str
Packit 792a06
        :param tailoring_file_path: path to a tailoring file
Packit 792a06
        :type tailoring_file_path: str
Packit 792a06
Packit 792a06
        """
Packit 792a06
Packit 792a06
        # is used to speed up getting lists of profiles
Packit 792a06
        self._profiles_cache = dict()
Packit 792a06
Packit 792a06
        if not os.path.exists(dsc_file_path):
Packit 792a06
            msg = "Invalid file path: '%s'" % dsc_file_path
Packit 792a06
            raise DataStreamHandlingError(msg)
Packit 792a06
Packit 792a06
        self._dsc_file_path = dsc_file_path
Packit 792a06
Packit 792a06
        # create an XCCDF session for the file
Packit 792a06
        self._session = OSCAP.xccdf_session_new(dsc_file_path)
Packit 792a06
        if not self._session:
Packit 792a06
            msg = "'%s' is not a valid SCAP content file" % dsc_file_path
Packit 792a06
            raise DataStreamHandlingError(msg)
Packit 792a06
        if OSCAP.xccdf_session_load(self._session) != 0:
Packit 792a06
            raise DataStreamHandlingError(OSCAP.oscap_err_desc())
Packit 792a06
Packit 792a06
        if tailoring_file_path:
Packit 792a06
            OSCAP.xccdf_session_set_user_tailoring_file(self._session,
Packit 792a06
                                                        tailoring_file_path)
Packit 792a06
Packit 792a06
        if not OSCAP.xccdf_session_is_sds(self._session):
Packit 792a06
            msg = "'%s' is not a data stream collection" % dsc_file_path
Packit 792a06
            raise DataStreamHandlingError(msg)
Packit 792a06
Packit 792a06
        # dictionary holding the items gathered from DSC processing
Packit 792a06
        self._items = OrderedDict()
Packit 792a06
Packit 792a06
        # create an sds index for the content
Packit 792a06
        self._sds_idx = OSCAP.xccdf_session_get_sds_idx(self._session)
Packit 792a06
Packit 792a06
        # iterate over streams and get checklists from each stream
Packit 792a06
        streams_itr = OSCAP.ds_sds_index_get_streams(self._sds_idx)
Packit 792a06
        while OSCAP.ds_stream_index_iterator_has_more(streams_itr):
Packit 792a06
            stream_idx = OSCAP.ds_stream_index_iterator_next(streams_itr)
Packit 792a06
Packit 792a06
            # will be used to store the checklists for streams
Packit 792a06
            stream_id = OSCAP.ds_stream_index_get_id(stream_idx)
Packit 792a06
            checklists = []
Packit 792a06
Packit 792a06
            # iterate over checklists and append their ids to the list
Packit 792a06
            chklist_itr = OSCAP.ds_stream_index_get_checklists(stream_idx)
Packit 792a06
            while OSCAP.oscap_string_iterator_has_more(chklist_itr):
Packit 792a06
                checklists.append(OSCAP.oscap_string_iterator_next(chklist_itr))
Packit 792a06
Packit 792a06
            # store the list of checklist for the current stream
Packit 792a06
            self._items[stream_id] = checklists
Packit 792a06
Packit 792a06
            OSCAP.oscap_string_iterator_free(chklist_itr)
Packit 792a06
Packit 792a06
        OSCAP.ds_stream_index_iterator_free(streams_itr)
Packit 792a06
Packit 792a06
    def __del__(self):
Packit 792a06
        """Destructor for the DataStreamHandler class."""
Packit 792a06
Packit 792a06
        if '_session' in locals():
Packit 792a06
            # we should free the session
Packit 792a06
            OSCAP.xccdf_session_free(self._session)
Packit 792a06
Packit 792a06
    def get_data_streams(self):
Packit 792a06
        """
Packit 792a06
        Method to get a list of data streams found in the data stream
Packit 792a06
        collection.
Packit 792a06
Packit 792a06
        :return: list of data stream IDs
Packit 792a06
        :rtype: list of strings
Packit 792a06
Packit 792a06
        """
Packit 792a06
Packit 792a06
        return list(self._items.keys())
Packit 792a06
Packit 792a06
    def get_data_streams_checklists(self):
Packit 792a06
        """
Packit 792a06
        Method to get data streams and their checklists found in the data
Packit 792a06
        stream collection.
Packit 792a06
Packit 792a06
        :return: a dictionary consisting of the IDs of the data streams as keys
Packit 792a06
                 and lists of their checklists' IDs as values
Packit 792a06
        :rtype: dict(str -> list of strings)
Packit 792a06
Packit 792a06
        """
Packit 792a06
Packit 792a06
        # easy, we already have exactly what should be returned, just create a
Packit 792a06
        # copy, so that the caller cannot modify our internal attributes
Packit 792a06
        return dict(self._items)
Packit 792a06
Packit 792a06
    def get_checklists(self, data_stream_id):
Packit 792a06
        """
Packit 792a06
        Method to get a list of checklists found in the data stream given by
Packit 792a06
        the data_stream_id.
Packit 792a06
Packit 792a06
        :param data_stream_id: ID of the data stream to get checklists from
Packit 792a06
        :type data_stream_id: str
Packit 792a06
        :return: list of checklist IDs found in the data stream given by the ID
Packit 792a06
        :rtype: list of strings
Packit 792a06
Packit 792a06
        """
Packit 792a06
Packit 792a06
        if data_stream_id not in self._items:
Packit 792a06
            msg = "Invalid data stream id given: '%s'" % data_stream_id
Packit 792a06
            raise DataStreamHandlingError(msg)
Packit 792a06
Packit 792a06
        return self._items[data_stream_id]
Packit 792a06
Packit 792a06
    def get_profiles(self, data_stream_id, checklist_id):
Packit 792a06
        """
Packit 792a06
        Method to get a list of profiles defined in the checklist given by the
Packit 792a06
        checklist_id that is defined in the data stream given by the
Packit 792a06
        data_stream_id.
Packit 792a06
Packit 792a06
        :param data_stream_id: ID of the data stream to get checklists from
Packit 792a06
        :type data_stream_id: str
Packit 792a06
        :param checklist_id: ID of the checklist to get profiles from
Packit 792a06
        :type checklist_id: str
Packit 792a06
        :return: list of profiles found in the checklist
Packit 792a06
        :rtype: list of ProfileInfo instances
Packit 792a06
Packit 792a06
        """
Packit 792a06
Packit 792a06
        cache_id = "%s;%s" % (data_stream_id, checklist_id)
Packit 792a06
        if cache_id in self._profiles_cache:
Packit 792a06
            # found in cache, return the value
Packit 792a06
            return self._profiles_cache[cache_id]
Packit 792a06
Packit 792a06
        # not found in the cache, needs to be gathered
Packit 792a06
Packit 792a06
        # set the data stream and component (checklist) for the session
Packit 792a06
        OSCAP.xccdf_session_free(self._session)
Packit 792a06
Packit 792a06
        self._session = OSCAP.xccdf_session_new(self._dsc_file_path)
Packit 792a06
        if not self._session:
Packit 792a06
            msg = "'%s' is not a valid SCAP content file" % self._dsc_file_path
Packit 792a06
            raise DataStreamHandlingError(msg)
Packit 792a06
Packit 792a06
        OSCAP.xccdf_session_set_datastream_id(self._session, data_stream_id)
Packit 792a06
        OSCAP.xccdf_session_set_component_id(self._session, checklist_id)
Packit 792a06
        if OSCAP.xccdf_session_load(self._session) != 0:
Packit 792a06
            raise DataStreamHandlingError(OSCAP.oscap_err_desc())
Packit 792a06
Packit 792a06
        # get the benchmark (checklist)
Packit 792a06
        policy_model = OSCAP.xccdf_session_get_policy_model(self._session)
Packit 792a06
Packit 792a06
        default_policy = OSCAP.xccdf_policy_new(policy_model, None)
Packit 792a06
        default_rules_count = OSCAP.xccdf_policy_get_selected_rules_count(default_policy)
Packit 792a06
Packit 792a06
        # will hold items for the profiles for the speficied DS and checklist
Packit 792a06
        profiles = []
Packit 792a06
Packit 792a06
        if default_rules_count > 0:
Packit 792a06
            profiles.append(ProfileInfo("default", "Default",
Packit 792a06
                            "The implicit XCCDF profile. Usually, the default contains no rules."))
Packit 792a06
Packit 792a06
        benchmark = OSCAP.xccdf_policy_model_get_benchmark(policy_model)
Packit 792a06
Packit 792a06
        # iterate over the profiles in the benchmark and store them
Packit 792a06
        profile_itr = OSCAP.xccdf_benchmark_get_profiles(benchmark)
Packit 792a06
        while OSCAP.xccdf_profile_iterator_has_more(profile_itr):
Packit 792a06
            profile = OSCAP.xccdf_profile_iterator_next(profile_itr)
Packit 792a06
Packit 792a06
            id_ = OSCAP.xccdf_profile_get_id(profile)
Packit 792a06
            title = oscap_text_itr_get_text(OSCAP.xccdf_profile_get_title(profile))
Packit 792a06
            desc = parse_HTML_from_content(
Packit 792a06
                oscap_text_itr_get_text(OSCAP.xccdf_profile_get_description(profile)))
Packit 792a06
            info = ProfileInfo(id_, title, desc)
Packit 792a06
Packit 792a06
            profiles.append(info)
Packit 792a06
Packit 792a06
        OSCAP.xccdf_profile_iterator_free(profile_itr)
Packit 792a06
Packit 792a06
        # cache the result
Packit 792a06
        self._profiles_cache[cache_id] = profiles
Packit 792a06
Packit 792a06
        return profiles
Packit 792a06
Packit 792a06
Packit 792a06
class BenchmarkHandler(object):
Packit 792a06
    """
Packit 792a06
    Class for handling XCCDF benchmark and retrieving data from it (mainly the
Packit 792a06
    list of profiles).
Packit 792a06
Packit 792a06
    """
Packit 792a06
Packit 792a06
    def __init__(self, xccdf_file_path, tailoring_file_path=""):
Packit 792a06
        """
Packit 792a06
        Constructor for the BenchmarkHandler class.
Packit 792a06
Packit 792a06
        :param xccdf_file_path: path to a file with an XCCDF benchmark
Packit 792a06
        :type xccdf_file_path: str
Packit 792a06
        :param tailoring_file_path: path to a tailoring file
Packit 792a06
        :type tailoring_file_path: str
Packit 792a06
        """
Packit 792a06
Packit 792a06
        if not os.path.exists(xccdf_file_path):
Packit 792a06
            msg = "Invalid file path: '%s'" % xccdf_file_path
Packit 792a06
            raise BenchmarkHandlingError(msg)
Packit 792a06
Packit 792a06
        session = OSCAP.xccdf_session_new(xccdf_file_path)
Packit 792a06
        if not session:
Packit 792a06
            msg = "'%s' is not a valid SCAP content file" % xccdf_file_path
Packit 792a06
            raise BenchmarkHandlingError(msg)
Packit 792a06
Packit 792a06
        if tailoring_file_path:
Packit 792a06
            OSCAP.xccdf_session_set_user_tailoring_file(session,
Packit 792a06
                                                        tailoring_file_path)
Packit 792a06
        if OSCAP.xccdf_session_load(session) != 0:
Packit 792a06
            raise BenchmarkHandlingError(OSCAP.oscap_err_desc())
Packit 792a06
Packit 792a06
        # get the benchmark object
Packit 792a06
        policy_model = OSCAP.xccdf_session_get_policy_model(session)
Packit 792a06
        benchmark = OSCAP.xccdf_policy_model_get_benchmark(policy_model)
Packit 792a06
Packit 792a06
        default_policy = OSCAP.xccdf_policy_new(policy_model, None)
Packit 792a06
        default_rules_count = OSCAP.xccdf_policy_get_selected_rules_count(default_policy)
Packit 792a06
Packit 792a06
        # stores a list of profiles in the benchmark
Packit 792a06
        self._profiles = []
Packit 792a06
Packit 792a06
        if default_rules_count > 0:
Packit 792a06
            self._profiles.append(
Packit 792a06
                ProfileInfo(
Packit 792a06
                    "default", "Default",
Packit 792a06
                    "The implicit XCCDF profile. Usually, the default contains no rules."))
Packit 792a06
Packit 792a06
        if not benchmark:
Packit 792a06
            msg = "Not a valid benchmark file: '%s'" % xccdf_file_path
Packit 792a06
            raise BenchmarkHandlingError(msg)
Packit 792a06
Packit 792a06
        # iterate over the profiles in the benchmark and store them
Packit 792a06
        profile_itr = OSCAP.xccdf_benchmark_get_profiles(benchmark)
Packit 792a06
        while OSCAP.xccdf_profile_iterator_has_more(profile_itr):
Packit 792a06
            profile = OSCAP.xccdf_profile_iterator_next(profile_itr)
Packit 792a06
Packit 792a06
            id_ = OSCAP.xccdf_profile_get_id(profile)
Packit 792a06
            title = oscap_text_itr_get_text(OSCAP.xccdf_profile_get_title(profile))
Packit 792a06
            desc = parse_HTML_from_content(oscap_text_itr_get_text(OSCAP.xccdf_profile_get_description(profile)))
Packit 792a06
            info = ProfileInfo(id_, title, desc)
Packit 792a06
Packit 792a06
            self._profiles.append(info)
Packit 792a06
Packit 792a06
        if tailoring_file_path:
Packit 792a06
            tailoring = OSCAP.xccdf_policy_model_get_tailoring(policy_model)
Packit 792a06
            profile_itr = OSCAP.xccdf_tailoring_get_profiles(tailoring)
Packit 792a06
            while OSCAP.xccdf_profile_iterator_has_more(profile_itr):
Packit 792a06
                profile = OSCAP.xccdf_profile_iterator_next(profile_itr)
Packit 792a06
Packit 792a06
                id_ = OSCAP.xccdf_profile_get_id(profile)
Packit 792a06
                title = oscap_text_itr_get_text(OSCAP.xccdf_profile_get_title(profile))
Packit 792a06
                desc = parse_HTML_from_content(oscap_text_itr_get_text(OSCAP.xccdf_profile_get_description(profile)))
Packit 792a06
                info = ProfileInfo(id_, title, desc)
Packit 792a06
Packit 792a06
                self._profiles.append(info)
Packit 792a06
Packit 792a06
        OSCAP.xccdf_profile_iterator_free(profile_itr)
Packit 792a06
        OSCAP.xccdf_session_free(session)
Packit 792a06
Packit 792a06
    @property
Packit 792a06
    def profiles(self):
Packit 792a06
        """Property for the list of profiles defined in the benchmark."""
Packit 792a06
Packit 792a06
        return self._profiles