Blob Blame History Raw
################################################################################
# BSD LICENSE
#
# Copyright(c) 2019-2020 Intel Corporation. All rights reserved.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#   * Redistributions of source code must retain the above copyright
#     notice, this list of conditions and the following disclaimer.
#   * Redistributions in binary form must reproduce the above copyright
#     notice, this list of conditions and the following disclaimer in
#     the documentation and/or other materials provided with the
#     distribution.
#   * Neither the name of Intel Corporation nor the names of its
#     contributors may be used to endorse or promote products derived
#     from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
################################################################################

import os
import re
import subprocess
import pexpect

RESCTRL_ROOT_PATH = "/sys/fs/resctrl"


class ResctrlSchemata:
    def __init__(self, path):
        self.path = path
        self.data = {}

        self.parse()

    def parse(self):
        # Open file and fill each line into a list
        with open(self.path, 'r') as schema_file:
            lines = schema_file.readlines()

        for line in lines:
            match = re.search(r"^\s*([A-Z23]*):(.*)$", line)
            if not match:
                continue

            label = match.group(1)
            data = match.group(2).split(";")

            self.data[label] = []

            for mask in data:
                match = re.search(r"^([0-9]*)=\s*([0-9a-f]*)$", mask)
                if not match:
                    continue

                if label == "MB":
                    value = int(match.group(2), 10)
                else:
                    value = int(match.group(2), 16)

                self.data[label].append(value)

    def get(self, label, resource_id):
        return self.data[label][resource_id]


class Resctrl:
    def __init__(self):
        self.root = RESCTRL_ROOT_PATH

    def run_cmd(self, command):
        child = subprocess.Popen(command.split(), stdin=subprocess.PIPE,
                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        (stdout, stderr) = child.communicate()

        if stdout is not None:
            stdout = stdout.decode("utf-8")
        if stderr is not None:
            stderr = stderr.decode("utf-8")

        return stdout, stderr, child.returncode

    def is_mounted(self):
        command = "mount -t resctrl"
        output, _, _ = self.run_cmd(command)
        return "resctrl" in output and self.root in output and "rw" in output

    def mount(self, cdp=False, cdpl2=False, mba_mbps=False):
        options_str = ''
        options = []

        if cdp:
            options.append('cdp')
        if cdpl2:
            options.append('cdpl2')
        if mba_mbps:
            options.append('mba_MBps')

        if options:
            options_str = ' -o ' + ','.join(options)

        command = "sudo mount -t resctrl resctrl%s %s" % (options_str, self.root)
        self.run_cmd(command)
        return self.is_mounted()

    def umount(self):
        command = "sudo umount -a -t resctrl"
        self.run_cmd(command)
        return not self.is_mounted()

    def info_dir_exists(self, dir_name):
        dir_path = os.path.join(self.root, "info", dir_name)
        return os.path.exists(dir_path)

    def get_mon_features(self):
        mon_feat_path = os.path.join(self.root, "info", "L3_MON", "mon_features")
        if not os.path.exists(mon_feat_path):
            return []

        with open(mon_feat_path, 'r') as feat_file:
            lines = feat_file.readlines()

        return [line.strip() for line in lines]

    def get_schemata_path(self, cos=0):
        if cos == 0:
            return os.path.join(self.root, "schemata")

        return os.path.join(self.root, "COS%s" % str(cos), "schemata")

    def get_schemata(self, cos=0):
        path = self.get_schemata_path(cos)
        return ResctrlSchemata(path)


    @staticmethod
    def get_ctrl_group_count():
        ctrl_group_count = 0

        command = 'bash -c "ls %s"' % RESCTRL_ROOT_PATH
        output, exit_code = pexpect.run(command, withexitstatus=True)

        if exit_code == 0:
            ctrl_group_count = 1

            command = 'bash -c "ls %s | grep COS | wc -l"' % RESCTRL_ROOT_PATH
            output = pexpect.run(command)
            ctrl_group_count += int(output)

        return ctrl_group_count