Blob Blame History Raw
#! /usr/libexec/platform-python
""" 
    Utility for converting *_clat_hist* files generated by fio into latency statistics.
    
    Example usage:
    
            $ fiologparser_hist.py *_clat_hist*
            end-time, samples, min, avg, median, 90%, 95%, 99%, max
            1000, 15, 192, 1678.107, 1788.859, 1856.076, 1880.040, 1899.208, 1888.000
            2000, 43, 152, 1642.368, 1714.099, 1816.659, 1845.552, 1888.131, 1888.000
            4000, 39, 1152, 1546.962, 1545.785, 1627.192, 1640.019, 1691.204, 1744
            ...
    
    @author Karl Cronburg <karl.cronburg@gmail.com>
"""
import os
import sys
import pandas
import re
import numpy as np

runascmd = False

err = sys.stderr.write

class HistFileRdr():
    """ Class to read a hist file line by line, buffering
        a value array for the latest line, and allowing a preview
        of the next timestamp in next line
        Note: this does not follow a generator pattern, but must explicitly
        get next bin array.
    """
    def __init__(self, file):
        self.fp = open(file, 'r')
        self.data = self.nextData()

    def close(self):
        self.fp.close()
        self.fp = None

    def nextData(self):
        self.data = None
        if self.fp:
            line = self.fp.readline()
            if line == "":
                self.close()
            else:
                self.data = [int(x) for x in line.replace(' ', '').rstrip().split(',')]

        return self.data

    @property
    def curTS(self):
        ts = None
        if self.data:
            ts = self.data[0]
        return ts

    @property
    def curDir(self):
        d = None
        if self.data:
            d = self.data[1]
        return d

    @property
    def curBins(self):
        return self.data[3:]

def weighted_percentile(percs, vs, ws):
    """ Use linear interpolation to calculate the weighted percentile.
        
        Value and weight arrays are first sorted by value. The cumulative
        distribution function (cdf) is then computed, after which np.interp
        finds the two values closest to our desired weighted percentile(s)
        and linearly interpolates them.
        
        percs  :: List of percentiles we want to calculate
        vs     :: Array of values we are computing the percentile of
        ws     :: Array of weights for our corresponding values
        return :: Array of percentiles
    """
    idx = np.argsort(vs)
    vs, ws = vs[idx], ws[idx] # weights and values sorted by value
    cdf = 100 * (ws.cumsum() - ws / 2.0) / ws.sum()
    return np.interp(percs, cdf, vs) # linear interpolation

def weights(start_ts, end_ts, start, end):
    """ Calculate weights based on fraction of sample falling in the
        given interval [start,end]. Weights computed using vector / array
        computation instead of for-loops.

        Note that samples with zero time length are effectively ignored
        (we set their weight to zero).

        start_ts :: Array of start times for a set of samples
        end_ts   :: Array of end times for a set of samples
        start    :: int
        end      :: int
        return   :: Array of weights
    """
    sbounds = np.maximum(start_ts, start).astype(float)
    ebounds = np.minimum(end_ts,   end).astype(float)
    ws = (ebounds - sbounds) / (end_ts - start_ts)
    if np.any(np.isnan(ws)):
      err("WARNING: zero-length sample(s) detected. Log file corrupt"
          " / bad time values? Ignoring these samples.\n")
    ws[np.where(np.isnan(ws))] = 0.0;
    return ws

def weighted_average(vs, ws):
    return np.sum(vs * ws) / np.sum(ws)


percs = None
columns = None

def gen_output_columns(ctx):
    global percs,columns
    strpercs = re.split('[,:]', ctx.percentiles)
    percs = [50.0]  # always print 50% in 'median' column
    percs.extend(list(map(float,strpercs)))
    if ctx.directions:
        columns = ["end-time", "dir", "samples", "min", "avg", "median"]
    else:
        columns = ["end-time", "samples", "min", "avg", "median"]
    columns.extend(list(map(lambda x: x+'%', strpercs)))
    columns.append("max")

def fmt_float_list(ctx, num=1):
  """ Return a comma separated list of float formatters to the required number
      of decimal places. For instance:

        fmt_float_list(ctx.decimals=4, num=3) == "%.4f, %.4f, %.4f"
  """
  return ', '.join(["%%.%df" % ctx.decimals] * num)

# Default values - see beginning of main() for how we detect number columns in
# the input files:
__HIST_COLUMNS = 1216
__NON_HIST_COLUMNS = 3
__TOTAL_COLUMNS = __HIST_COLUMNS + __NON_HIST_COLUMNS

def read_chunk(rdr, sz):
    """ Read the next chunk of size sz from the given reader. """
    try:
        """ StopIteration occurs when the pandas reader is empty, and AttributeError
            occurs if rdr is None due to the file being empty. """
        new_arr = rdr.read().values
    except (StopIteration, AttributeError):
        return None

    # Let's leave the array as is, and let later code ignore the block size
    return new_arr

    #""" Extract array of the times, directions wo times, and histograms matrix without times column. """
    #times, rws, szs = new_arr[:,0], new_arr[:,1], new_arr[:,2]
    #hists = new_arr[:,__NON_HIST_COLUMNS:]
    #times = times.reshape((len(times),1))
    #dirs  = rws.reshape((len(rws),1))
    #arr = np.append(times, hists, axis=1)
    #return arr

def get_min(fps, arrs):
    """ Find the file with the current first row with the smallest start time """
    return min([fp for fp in fps if not arrs[fp] is None], key=lambda fp: arrs.get(fp)[0][0])

def histogram_generator(ctx, fps, sz):
    
    # Create a chunked pandas reader for each of the files:
    rdrs = {}
    for fp in fps:
        try:
            rdrs[fp] = pandas.read_csv(fp, dtype=int, header=None, chunksize=sz)
        except ValueError as e:
            if e.message == 'No columns to parse from file':
                if ctx.warn: sys.stderr.write("WARNING: Empty input file encountered.\n")
                rdrs[fp] = None
            else:
                raise(e)

    # Initial histograms from disk:
    arrs = {fp: read_chunk(rdr, sz) for fp,rdr in rdrs.items()}
    while True:

        try:
            """ ValueError occurs when nothing more to read """
            fp = get_min(fps, arrs)
        except ValueError:
            return
        arr = arrs[fp]
        arri = np.insert(arr[0], 1, fps.index(fp))
        yield arri
        arrs[fp] = arr[1:]

        if arrs[fp].shape[0] == 0:
            arrs[fp] = read_chunk(rdrs[fp], sz)

def _plat_idx_to_val(idx, edge=0.5, FIO_IO_U_PLAT_BITS=6, FIO_IO_U_PLAT_VAL=64):
    """ Taken from fio's stat.c for calculating the latency value of a bin
        from that bin's index.
        
            idx  : the value of the index into the histogram bins
            edge : fractional value in the range [0,1]** indicating how far into
            the bin we wish to compute the latency value of.
        
        ** edge = 0.0 and 1.0 computes the lower and upper latency bounds
           respectively of the given bin index. """

    # MSB <= (FIO_IO_U_PLAT_BITS-1), cannot be rounded off. Use
    # all bits of the sample as index
    if (idx < (FIO_IO_U_PLAT_VAL << 1)):
        return idx 

    # Find the group and compute the minimum value of that group
    error_bits = (idx >> FIO_IO_U_PLAT_BITS) - 1 
    base = 1 << (error_bits + FIO_IO_U_PLAT_BITS)

    # Find its bucket number of the group
    k = idx % FIO_IO_U_PLAT_VAL

    # Return the mean (if edge=0.5) of the range of the bucket
    return base + ((k + edge) * (1 << error_bits))
    
def plat_idx_to_val_coarse(idx, coarseness, edge=0.5):
    """ Converts the given *coarse* index into a non-coarse index as used by fio
        in stat.h:plat_idx_to_val(), subsequently computing the appropriate
        latency value for that bin.
        """

    # Multiply the index by the power of 2 coarseness to get the bin
    # bin index with a max of 1536 bins (FIO_IO_U_PLAT_GROUP_NR = 24 in stat.h)
    stride = 1 << coarseness
    idx = idx * stride
    lower = _plat_idx_to_val(idx, edge=0.0)
    upper = _plat_idx_to_val(idx + stride, edge=1.0)
    return lower + (upper - lower) * edge

def print_all_stats(ctx, end, mn, ss_cnt, vs, ws, mx, dir=dir):
    ps = weighted_percentile(percs, vs, ws)

    avg = weighted_average(vs, ws)
    values = [mn, avg] + list(ps) + [mx]
    if ctx.directions:
        row = [end, dir, ss_cnt]
        fmt = "%d, %s, %d, "
    else:
        row = [end, ss_cnt]
        fmt = "%d, %d, "
    row = row + [float(x) / ctx.divisor for x in values]
    if ctx.divisor > 1:
        fmt = fmt + fmt_float_list(ctx, len(percs)+3)
    else:
        # max and min are decimal values if no divisor
        fmt = fmt + "%d, " + fmt_float_list(ctx, len(percs)+1) + ", %d"

    print (fmt % tuple(row))

def update_extreme(val, fncn, new_val):
    """ Calculate min / max in the presence of None values """
    if val is None: return new_val
    else: return fncn(val, new_val)

# See beginning of main() for how bin_vals are computed
bin_vals = []
lower_bin_vals = [] # lower edge of each bin
upper_bin_vals = [] # upper edge of each bin 

def process_interval(ctx, iHist, iEnd, dir):
    """ print estimated percentages for the given merged sample
    """
    ss_cnt = 0 # number of samples affecting this interval
    mn_bin_val, mx_bin_val = None, None

    # Update total number of samples affecting current interval histogram:
    ss_cnt += np.sum(iHist)

    # Update min and max bin values
    idxs = np.nonzero(iHist != 0)[0]
    if idxs.size > 0:
        mn_bin_val = bin_vals[idxs[0]]
        mx_bin_val = bin_vals[idxs[-1]]

    if ss_cnt > 0: print_all_stats(ctx, iEnd, mn_bin_val, ss_cnt, bin_vals, iHist, mx_bin_val, dir=dir)


dir_map = ['r', 'w', 't']  # map of directional value in log to textual representation
def process_weighted_interval(ctx, samples, iStart, iEnd, printdirs):
    """ Construct the weighted histogram for the given interval by scanning
        through all the histograms and figuring out which of their bins have
        samples with latencies which overlap with the given interval
        [iStart,iEnd].
    """

    times, files, dirs, sizes, hists = samples[:,0], samples[:,1], samples[:,2], samples[:,3], samples[:,4:]
    iHist={}; ss_cnt = {}; mn_bin_val={}; mx_bin_val={}
    for dir in printdirs:
        iHist[dir] = np.zeros(__HIST_COLUMNS, dtype=float)
        ss_cnt[dir] = 0 # number of samples affecting this interval
        mn_bin_val[dir] = None
        mx_bin_val[dir] = None

    for end_time,file,dir,hist in zip(times,files,dirs,hists):

        # Only look at bins of the current histogram sample which
        # started before the end of the current time interval [start,end]
        start_times = (end_time - 0.5 * ctx.interval) - bin_vals / ctx.time_divisor
        idx = np.where(start_times < iEnd)
        s_ts, l_bvs, u_bvs, hs = start_times[idx], lower_bin_vals[idx], upper_bin_vals[idx], hist[idx]

        # Increment current interval histogram by weighted values of future histogram
        # total number of samples
        # and min and max values as necessary
        textdir = dir_map[dir]
        ws = hs * weights(s_ts, end_time, iStart, iEnd)
        mmidx = np.where(hs != 0)[0]
        if 'm' in printdirs:
            iHist['m'][idx] += ws
            ss_cnt['m'] += np.sum(hs)
            if mmidx.size > 0:
                mn_bin_val['m'] = update_extreme(mn_bin_val['m'], min, l_bvs[max(0,           mmidx[0]  - 1)])
                mx_bin_val['m'] = update_extreme(mx_bin_val['m'], max, u_bvs[min(len(hs) - 1, mmidx[-1] + 1)])
        if textdir in printdirs:
            iHist[textdir][idx] += ws
            ss_cnt[textdir] += np.sum(hs)  # Update total number of samples affecting current interval histogram:
            if mmidx.size > 0:
                mn_bin_val[textdir] = update_extreme(mn_bin_val[textdir], min, l_bvs[max(0,           mmidx[0]  - 1)])
                mx_bin_val[textdir] = update_extreme(mx_bin_val[textdir], max, u_bvs[min(len(hs) - 1, mmidx[-1] + 1)])

    for textdir in sorted(printdirs):
        if ss_cnt[textdir] > 0: print_all_stats(ctx, iEnd, mn_bin_val[textdir], ss_cnt[textdir], bin_vals, iHist[textdir], mx_bin_val[textdir], dir=textdir)

def guess_max_from_bins(ctx, hist_cols):
    """ Try to guess the GROUP_NR from given # of histogram
        columns seen in an input file """
    max_coarse = 8
    if ctx.group_nr < 19 or ctx.group_nr > 26:
        bins = [ctx.group_nr * (1 << 6)]
    else:
        bins = [1216,1280,1344,1408,1472,1536,1600,1664]
    coarses = range(max_coarse + 1)
    fncn = lambda z: list(map(lambda x: z/2**x if z % 2**x == 0 else -10, coarses))
    
    arr = np.transpose(list(map(fncn, bins)))
    idx = np.where(arr == hist_cols)
    if len(idx[1]) == 0:
        table = repr(arr.astype(int)).replace('-10', 'N/A').replace('array','     ')
        errmsg = ("Unable to determine bin values from input clat_hist files. Namely \n"
            "the first line of file '%s' " % ctx.FILE[0] + "has %d \n" % (__TOTAL_COLUMNS,) +
            "columns of which we assume %d " % (hist_cols,) + "correspond to histogram bins. \n"
            "This number needs to be equal to one of the following numbers:\n\n"
            + table + "\n\n"
            "Possible reasons and corresponding solutions:\n"
            "  - Input file(s) does not contain histograms.\n"
            "  - You recompiled fio with a different GROUP_NR. If so please specify this\n"
            "    new GROUP_NR on the command line with --group_nr\n")
        if runascmd:
            err(errmsg)
            exit(1)
        else:
            raise RuntimeError(errmsg) 

    return bins[idx[1][0]]

def output_weighted_interval_data(ctx,printdirs):

    fps = [open(f, 'r') for f in ctx.FILE]
    gen = histogram_generator(ctx, fps, ctx.buff_size)

    print(', '.join(columns))

    try:
        start, end = 0, ctx.interval
        arr = np.empty(shape=(0,__TOTAL_COLUMNS + 1),dtype=int)
        more_data = True
        while more_data or len(arr) > 0:

            # Read up to ctx.max_latency (default 20 seconds) of data from end of current interval.
            while len(arr) == 0 or arr[-1][0] < ctx.max_latency * 1000 + end:
                try:
                    new_arr = next(gen)
                except StopIteration:
                    more_data = False
                    break
                nashape  = new_arr.reshape((1,__TOTAL_COLUMNS + 1))
                arr = np.append(arr, nashape, axis=0)
            #arr = arr.astype(int)
            
            if arr.size > 0:
                # Jump immediately to the start of the input, rounding
                # down to the nearest multiple of the interval (useful when --log_unix_epoch
                # was used to create these histograms):
                if start == 0 and arr[0][0] - ctx.max_latency > end:
                    start = arr[0][0] - ctx.max_latency
                    start = start - (start % ctx.interval)
                    end = start + ctx.interval

                process_weighted_interval(ctx, arr, start, end, printdirs)
                
                # Update arr to throw away samples we no longer need - samples which
                # end before the start of the next interval, i.e. the end of the
                # current interval:
                idx = np.where(arr[:,0] > end)
                arr = arr[idx]
            
            start += ctx.interval
            end = start + ctx.interval
    finally:
        for fp in fps:
            fp.close()

def output_interval_data(ctx,directions):
    fps = [HistFileRdr(f) for f in ctx.FILE]

    print(', '.join(columns))

    start = 0
    end = ctx.interval
    while True:

        more_data = False

        # add bins from all files in target intervals
        arr = None
        numSamples = 0
        while True:
            foundSamples = False
            for fp in fps:
                ts = fp.curTS
                if ts and ts+10 < end:  # shift sample time when very close to an end time
                    curdirect = fp.curDir
                    numSamples += 1
                    foundSamples = True
                    if arr is None:
                        arr = {}
                        for d in directions:
                            arr[d] = np.zeros(shape=(__HIST_COLUMNS), dtype=int)
                    if 'm' in arr:
                        arr['m'] = np.add(arr['m'], fp.curBins)
                    if 'r' in arr and curdirect == 0:
                        arr['r'] = np.add(arr['r'], fp.curBins)
                    if 'w' in arr and curdirect == 1:
                        arr['w'] = np.add(arr['w'], fp.curBins)
                    if 't' in arr and curdirect == 2:
                        arr['t'] = np.add(arr['t'], fp.curBins)

                    more_data = True
                    fp.nextData()
                elif ts:
                    more_data = True

            # reached end of all files
            # or gone through all files without finding sample in interval
            if not more_data or not foundSamples:
                break

        if arr is not None:
            #print("{} size({}) samples({}) nonzero({}):".format(end, arr.size, numSamples, np.count_nonzero(arr)), str(arr), )
            for d in sorted(arr.keys()):
                aval = arr[d]
                process_interval(ctx, aval, end, d)

        # reach end of all files
        if not more_data:
            break

        start += ctx.interval
        end = start + ctx.interval

def main(ctx):

    if ctx.job_file:
        try:
            from configparser import SafeConfigParser, NoOptionError
        except ImportError:
            from ConfigParser import SafeConfigParser, NoOptionError

        cp = SafeConfigParser(allow_no_value=True)
        with open(ctx.job_file, 'r') as fp:
            cp.readfp(fp)

        if ctx.interval is None:
            # Auto detect --interval value
            for s in cp.sections():
                try:
                    hist_msec = cp.get(s, 'log_hist_msec')
                    if hist_msec is not None:
                        ctx.interval = int(hist_msec)
                except NoOptionError:
                    pass

    if not hasattr(ctx, 'percentiles'):
        ctx.percentiles = "90,95,99"

    if ctx.directions:
        ctx.directions = ctx.directions.lower()

    if ctx.interval is None:
        ctx.interval = 1000

    if ctx.usbin:
        ctx.time_divisor = 1000.0        # bins are in us
    else:
        ctx.time_divisor = 1000000.0     # bins are in ns

    gen_output_columns(ctx)


    # Automatically detect how many columns are in the input files,
    # calculate the corresponding 'coarseness' parameter used to generate
    # those files, and calculate the appropriate bin latency values:
    with open(ctx.FILE[0], 'r') as fp:
        global bin_vals,lower_bin_vals,upper_bin_vals,__HIST_COLUMNS,__TOTAL_COLUMNS
        __TOTAL_COLUMNS = len(fp.readline().split(','))
        __HIST_COLUMNS = __TOTAL_COLUMNS - __NON_HIST_COLUMNS

        max_cols = guess_max_from_bins(ctx, __HIST_COLUMNS)
        coarseness = int(np.log2(float(max_cols) / __HIST_COLUMNS))
        bin_vals = np.array([plat_idx_to_val_coarse(x, coarseness) for x in np.arange(__HIST_COLUMNS)], dtype=float)
        lower_bin_vals = np.array([plat_idx_to_val_coarse(x, coarseness, 0.0) for x in np.arange(__HIST_COLUMNS)], dtype=float)
        upper_bin_vals = np.array([plat_idx_to_val_coarse(x, coarseness, 1.0) for x in np.arange(__HIST_COLUMNS)], dtype=float)

    # indicate which directions to output (read(0), write(1), trim(2), mixed(3))
    directions = set()
    if not ctx.directions or 'm' in ctx.directions: directions.add('m')
    if ctx.directions and 'r' in ctx.directions:    directions.add('r')
    if ctx.directions and 'w' in ctx.directions:    directions.add('w')
    if ctx.directions and 't' in ctx.directions:    directions.add('t')

    if ctx.noweight:
        output_interval_data(ctx, directions)
    else:
        output_weighted_interval_data(ctx, directions)


if __name__ == '__main__':
    import argparse
    runascmd = True
    p = argparse.ArgumentParser()
    arg = p.add_argument
    arg("FILE", help='space separated list of latency log filenames', nargs='+')
    arg('--buff_size',
        default=10000,
        type=int,
        help='number of samples to buffer into numpy at a time')

    arg('--max_latency',
        default=20,
        type=float,
        help='number of seconds of data to process at a time')

    arg('-i', '--interval',
        type=int,
        help='interval width (ms), default 1000 ms')

    arg('--noweight',
        action='store_true',
        default=False,
        help='do not perform weighting of samples between output intervals')

    arg('-d', '--divisor',
        required=False,
        type=int,
        default=1,
        help='divide the results by this value.')

    arg('--decimals',
        default=3,
        type=int,
        help='number of decimal places to print floats to')

    arg('--warn',
        dest='warn',
        action='store_true',
        default=False,
        help='print warning messages to stderr')

    arg('--group_nr',
        default=29,
        type=int,
        help='FIO_IO_U_PLAT_GROUP_NR as defined in stat.h')

    arg('--job-file',
        default=None,
        type=str,
        help='Optional argument pointing to the job file used to create the '
             'given histogram files. Useful for auto-detecting --log_hist_msec and '
             '--log_unix_epoch (in fio) values.')

    arg('--percentiles',
        default="90:95:99",
        type=str,
        help='Optional argument of comma or colon separated percentiles to print. '
             'The default is "90.0:95.0:99.0".  min, median(50%%) and max percentiles are always printed')

    arg('--usbin',
        default=False,
        action='store_true',
        help='histogram bin latencies are in us (fio versions < 2.99. fio uses ns for version >= 2.99')

    arg('--directions',
        default=None,
        type=str,
        help='Optionally split results output by reads, writes, trims or mixed. '
             'Value may be any combination of "rwtm" characters. '
             'By default, only "mixed" results are output without a "dir" field. '
             'But, specifying the --directions option '
             'adds a "dir" field to the output content, and separate rows for each of the indicated '
             'directions.')

    main(p.parse_args())