#!/usr/bin/python3 # Copyright 2017 Facebook. # Licensed under the same terms as memcached itself. import argparse import socket import sys import re import traceback from time import sleep, time parser = argparse.ArgumentParser(description="daemon for rebalancing slabs") parser.add_argument("--host", help="host to connect to", default="localhost:11211", metavar="HOST:PORT") parser.add_argument("-s", "--sleep", help="seconds between runs", type=int, default="1") parser.add_argument("-v", "--verbose", action="store_true") parser.add_argument("-a", "--automove", action="store_true", default=False, help="enable automatic page rebalancing") parser.add_argument("-w", "--window", type=int, default="30", help="rolling window size for decision history") parser.add_argument("-r", "--ratio", type=float, default=0.8, help="ratio limiting distance between low/high class ages") parser.add_argument("-f", "--free", type=float, default=0.005, help="free chunks/pages buffer ratio") parser.add_argument("-z", "--size", type=int, default=512, help="item size cutoff for storage") args = parser.parse_args() host, port = args.host.split(':') MIN_PAGES_FOR_SOURCE = 2 MIN_PAGES_FOR_RECLAIM = 2.5 MIN_PAGES_FREE = 1.5 MEMCHECK_PERIOD = 60 def window_check(history, sid, key): total = 0 for window in history['w']: s = window.get(sid) if s and s.get(key): total += s.get(key) return total def window_key_check(history, key): total = 0 for window in history['w']: v = window.get(key) if v: total += v return total def determine_move(history, stats, diffs, memfree): """ Figure out of a page move is in order. - Use as much memory as possible to hold items, reducing the load on flash. - tries to keep the global free page pool inbetween poolmin/poolmax. - avoids flapping as much as possible: - only pull pages off of a class if it hasn't recently evicted or allocated pages. - only pull pages off if a sufficient number of free chunks are available. - if global pool is below minimum remove pages from oldest large class. - if global pool is above maximum, move pages to youngest large class. - extstore manages a desired number of free chunks in each slab class. - automover adjusts above limits once per minute based on current sizes. - if youngest is below the age ratio limit of oldest, move a page to it. """ # rotate windows history['w'].append({}) if (len(history['w']) > args.window): history['w'].pop(0) w = history['w'][-1] oldest = (-1, 0) youngest = (-1, sys.maxsize) too_free = False # Most bytes free decision = (-1, -1) if int(stats['slab_global_page_pool']) < memfree[0] / 2: w['slab_pool_low'] = 1 if int(stats['slab_global_page_pool']) > memfree[0]: w['slab_pool_high'] = 1 if args.verbose: print("global pool: [{}]".format(stats['slab_global_page_pool'])) pool_low = window_key_check(history, 'slab_pool_low') for sid, slab in diffs.items(): small_slab = False free_enough = False # Only balance larger slab classes if slab['chunk_size'] < args.size: small_slab = True w[sid] = {} if 'evicted_d' not in slab or 'total_pages_d' not in slab: continue # mark this window as dirty if total pages increases or evictions # happened if slab['total_pages_d'] > 0: w[sid]['dirty'] = 1 if slab['evicted_d'] > 0: w[sid]['dirty'] = 1 w[sid]['ev'] = 1 if slab['free_chunks'] > memfree[sid]: free_enough = True if memfree[sid] > 0 and slab['free_chunks'] > (memfree[sid] * 2): w[sid]['excess_free'] = 1 w[sid]['age'] = slab['age'] age = window_check(history, sid, 'age') / len(history['w']) # if > 2.5 pages free, and not dirty, reassign to global page pool if slab['free_chunks'] > slab['chunks_per_page'] * MIN_PAGES_FOR_RECLAIM and too_free == False: dirt = window_check(history, sid, 'dirty') excess = window_check(history, sid, 'excess_free') if small_slab == True and dirt == 0: # If we're a small slab, don't hang on to free memory forever. decision = (sid, 0) too_free = True elif small_slab == False and dirt == 0 \ and excess >= len(history['w']): decision = (sid, 0) too_free = True # are we the oldest slab class? (and a valid target) # don't consider for young if we've recently given it unused memory if small_slab == False: if age > oldest[1] and slab['total_pages'] > MIN_PAGES_FOR_SOURCE: oldest = (sid, age) if age < youngest[1] and slab['total_pages'] > 0 \ and window_check(history, sid, 'excess_free') < len(history['w']) \ and not (window_check(history, sid, 'relaxed') and free_enough): youngest = (sid, age) if w.get('slab_pool_high') and youngest[0] != -1: # if global pool is too high, feed youngest large class. if slab['free_chunks'] <= memfree[youngest[0]]: decision = (0, youngest[0]) w[youngest[0]]['relaxed'] = 1 elif too_free == False and pool_low and oldest[0] != -1: # if pool is too low, take from oldest large class. if args.verbose: print("oldest: [class: {}] [age: {:.2f}]".format(int(oldest[0]), oldest[1])) decision = (oldest[0], 0) elif too_free == False and youngest[0] != -1 and oldest[0] != -1 and youngest[0] != oldest[0]: # youngest is outside of the tolerance ratio, move a page around. if args.verbose: print("old: [class: {}] [age: {:.2f}]\nyoung: [class: {}] [age: {:.2f}]".format( int(oldest[0]), oldest[1], int(youngest[0]), youngest[1])) slab = diffs[youngest[0]] #print("F:{} L:{} Y:{} R:{}".format(slab['free_chunks'], memfree[youngest[0]], int(youngest[1]), int(oldest[1] * args.ratio))) if youngest[1] < oldest[1] * args.ratio: w[youngest[0]]['relaxed'] = 1 if slab['free_chunks'] <= memfree[youngest[0]]: decision = (0, youngest[0]) if (len(history['w']) >= args.window): return decision return (-1, -1) def run_move(s, decision): s.write("slabs reassign " + str(decision[0]) + " " + str(decision[1]) + "\r\n") line = s.readline().rstrip() if args.verbose: print("move result:", line) def diff_stats(before, after): """ fills out "diffs" as deltas between before/after, and "totals" as the sum of all slab classes. "_d" postfix to keys means the delta between before/after. non-postfix keys are total as of 'after's reading. """ diffs = {} totals = {} for slabid in after.keys(): sb = before.get(slabid) sa = after.get(slabid) if not (sb and sa): continue slab = sa.copy() for k in sa.keys(): if k not in sb: continue if k not in totals: totals[k] = 0 totals[k + '_d'] = 0 if k + '_d' not in slab: slab[k + '_d'] = 0 if re.search(r"^\d+$", sa[k]): totals[k] += int(sa[k]) slab[k] = int(sa[k]) slab[k + '_d'] = int(sa[k]) - int(sb[k]) totals[k + '_d'] += int(sa[k]) - int(sb[k]) slab['slab'] = slabid diffs[slabid] = slab return (diffs, totals) def read_slab_stats(s): slabs = {} for statcmd in ['items', 'slabs']: #print("stat cmd: " + statcmd) # FIXME: Formatting s.write("stats " + statcmd + "\r\n") while True: line = s.readline().rstrip() if line.startswith("END"): break m = re.match(r"^STAT (?:items:)?(\d+):(\S+) (\S+)", line) if m: (slab, var, val) = m.groups() if slab not in slabs: slabs[slab] = {} slabs[slab][var] = val #print("line: " + line) return slabs # HACK: lets look at 'evictions' being nonzero to indicate memory filled at some point. def read_stats(s): stats = {} s.write("stats\r\n") while True: line = s.readline().rstrip() if line.startswith("END"): break m = re.match(r"^STAT (\S+) (\S+)", line) if m: (key, val) = m.groups() stats[key] = val return stats def pct(num, divisor): if not divisor: return 0 return (num / divisor) def show_detail(diffs, totals): """ just a pretty printer for some extra data """ print("\n {:2s}: {:8s} (pct ) {:10s} (pct ) {:6s} (pct) {:6s}".format('sb', 'evicted', 'items', 'pages', 'age')) for sid, slab in diffs.items(): if 'evicted_d' not in slab: continue print(" {:2d}: {:8d} ({:.2f}%) {:10d} ({:.4f}%) {:6d} ({:.2f}%) {:6d}".format( int(sid), slab['evicted_d'], pct(slab['evicted_d'], totals['evicted_d']), slab['number'], pct(slab['number'], totals['number']), slab['total_pages'], pct(slab['total_pages'], totals['total_pages']), slab['age'])) def memfree_check(s, diffs, totals): info = {} # manage about this many free chunks in each slab class. for sid, slab in diffs.items(): if sid == 0: continue hold_free = int((slab['used_chunks'] + slab['free_chunks']) * args.free) # Hold a minimum of 1.5 pages so page moves are unlikely to lose items. if slab['chunks_per_page'] * MIN_PAGES_FREE > hold_free: hold_free = int(slab['chunks_per_page'] * MIN_PAGES_FREE) info[sid] = hold_free # TODO: only adjust if different? s.write("extstore free_memchunks {} {}\r\n".format(sid, hold_free)) s.readline() # how many pages to leave in the global pool. info[0] = int(totals['total_pages'] * args.free) return info stats_pre = {} history = { 'w': [{}] } memfree = { 0: 2 } last_memfree_check = 0 while True: try: with socket.create_connection((host, port), 5) as c: s = c.makefile(mode="rw", buffering=1) s.write("slabs automove 0\r\n") print(s.readline().rstrip()) while True: stats_post = read_slab_stats(s) stats = read_stats(s) (diffs, totals) = diff_stats(stats_pre, stats_post) #if args.verbose: # show_detail(diffs, totals) if int(stats['evictions']) > 0: if (last_memfree_check < time() - 60) and totals.get('total_pages'): memfree = memfree_check(s, diffs, totals) last_memfree_check = time() decision = (-1, -1) decision = determine_move(history, stats, diffs, memfree) if int(decision[0]) > 0 and int(decision[1]) >= 0: print("moving page from, to:", decision) if args.automove: run_move(s, decision) # Minimize sleeping if we just moved a page to global pool. # Improves responsiveness during flushes/quick changes. if decision[1] == 0: continue else: sleep(args.sleep) stats_pre = stats_post except: err = sys.exc_info() print("disconnected:", err[0], err[1]) traceback.print_exc() stats_pre = {} history = { 'w': [{}] } sleep(args.sleep)