From ff1a3cbfc0b3c83b404a577958d6a36529a526d9 Mon Sep 17 00:00:00 2001 From: James Antill Date: Jan 13 2015 14:19:05 +0000 Subject: Update to latest HEAD, into the correct patch file :-o - Add Ian's parallel deltrarpm support. - allow 'sha' checksum type for modifyrepo - Fix open_checksum and open_size calculation in RepoMetadata.add(). - Fix several AttributeErrors in RepoMetadata.add(). --- diff --git a/createrepo-HEAD.patch b/createrepo-HEAD.patch index 59fffc1..cb60ef2 100644 --- a/createrepo-HEAD.patch +++ b/createrepo-HEAD.patch @@ -1,8 +1,31 @@ diff --git a/createrepo/__init__.py b/createrepo/__init__.py -index b1875f6..85f2a3d 100644 +index b1875f6..517ea04 100644 --- a/createrepo/__init__.py +++ b/createrepo/__init__.py -@@ -400,7 +400,9 @@ class MetaDataGenerator: +@@ -28,6 +28,10 @@ import fcntl + import subprocess + from select import select + ++# To support parallel deltarpms ++import multiprocessing ++import multiprocessing.managers ++ + from yum import misc, Errors + from yum.repoMDObject import RepoMD, RepoData + from yum.sqlutils import executeSQL +@@ -113,7 +117,10 @@ class MetaDataConfig(object): + #self.worker_cmd = './worker.py' # helpful when testing + self.retain_old_md = 0 + self.compress_type = 'compat' +- ++ # Parallel deltas additions ++ self.delta_workers = 1 # number of workers to fork when doing deltarpms ++ # Keep the combined payload size of all in-progress deltarpm creation below this number ++ self.max_concurrent_delta_rpm_size = self.max_delta_rpm_size + + class SimpleMDCallBack(object): + def errorlog(self, thing): +@@ -400,7 +407,9 @@ class MetaDataGenerator: if self.conf.update: self._setup_old_metadata_lookup() # rpms we're going to be dealing with @@ -13,3 +36,399 @@ index b1875f6..85f2a3d 100644 packages = [] for pkg in self.conf.pkglist: if '://' in pkg: # remote +@@ -716,19 +725,216 @@ class MetaDataGenerator: + if err: + raise MDError, "Failed to process %d package(s)." % err + +- for pkgfile in pkgfiles: +- if self.conf.deltas: +- try: +- po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir) +- self._do_delta_rpm_package(po) +- except MDError, e: +- errorprint(e) +- continue +- self.read_pkgs.append(pkgfile) ++ if self.conf.delta_workers == 1: ++ for pkgfile in pkgfiles: ++ if self.conf.deltas: ++ try: ++ po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir) ++ self._do_delta_rpm_package(po) ++ except MDError, e: ++ errorprint(e) ++ continue ++ self.read_pkgs.append(pkgfile) ++ else: ++ self._parallel_deltas(pkgfiles, pkgpath, reldir) + + save_keptpkgs(None) # append anything left + return self.current_pkg + ++ def _parallel_deltas(self, pkgfiles, pkgpath, reldir): ++ class WrappedMDCallBack(object): ++ def __init__(self, log_queue): ++ self.log_queue = log_queue ++ def errorlog(self, thing): ++ self.log_queue.put([ "errorlog", os.getpid(), thing ]) ++ ++ def log(self, thing): ++ self.log_queue.put([ "log", os.getpid(), thing ]) ++ ++ def progress(self, item, current, total): ++ # progress messages in a multiprocess context are likely to just be a confusing mess ++ pass ++ ++ # Init a few things that we'd rather do in the main process and then ++ # inherit in the children ++ if not hasattr(self, 'tempdir'): ++ self.tempdir = tempfile.mkdtemp() ++ self._get_old_package_dict() ++ ++ # queue containing packages that are candidates for processing ++ # now within the memory constraints ++ work_queue = multiprocessing.Queue(1) ++ ++ # queue containing callback messages from the workers ++ log_queue = multiprocessing.Queue() ++ ++ # Event used to allow the manager, when needed, to block for a completed task in a worker ++ completion_event = multiprocessing.Event() ++ ++ # wrapped callback to pass in to workers ++ callback_wrap = WrappedMDCallBack(log_queue) ++ ++ # list containing the completed packages ++ # accessed in children via a Manager and proxy as each child proc ++ # will be appending as it finishes ++ manager = multiprocessing.Manager() ++ read_pkgs_proxy = manager.list() ++ ++ # lists used by the package size reading workers ++ pkgfiles_proxy = manager.list(pkgfiles) ++ pkgfiles_withsize_proxy = manager.list() ++ ++ # process-safe value - total size of RPM payloads being deltaed ++ # The lock for entry into this also functions as our critical section ++ # elsewhere, as changes in the "in-flight" size of deltas is the key ++ # decision point in our work queue ++ # 'L' is unsigned long ++ active_work_size = multiprocessing.Value('L',0) ++ ++ # Our candidate list is the packages sorted from largest to smallest ++ # Do this with workers as well because, parallel is good ++ # Seriously though, this is also CPU-bound ++ self.callback.log("Reading package sizes in preparation for deltarpm creation") ++ ++ def size_reader_entry(pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, repo_obj): ++ while True: ++ try: ++ pkgfile = pkgfiles_proxy.pop() ++ po = repo_obj.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir) ++ pkgfiles_withsize_proxy.append([ pkgfile, po.size ]) ++ except IndexError: ++ break ++ ++ sort_workers = [ ] ++ for i in range(0,self.conf.delta_workers): ++ sw = multiprocessing.Process(target = size_reader_entry, args = (pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, self)) ++ sort_workers.append(sw) ++ sw.start() ++ ++ for worker in sort_workers: ++ worker.join() ++ ++ self.callback.log("Sorting package files by size") ++ sorted_packages = sorted(pkgfiles_withsize_proxy, key=lambda package: package[1], reverse=True) ++ ++ def worker_entry(work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir): ++ # We are now a new process - replace the callback with the wrapper that pushes log messages ++ # to a queue for processing in the main process ++ repo_obj.callback = callback_wrap ++ while True: ++ try: ++ pkg = None ++ pkg = work_queue.get() ++ if not pkg: ++ # The manager feeds each worker a None to indicate we are finished ++ # this allows us to use a blocking get without fear - I think ++ break ++ po = repo_obj.read_in_package(pkg[0], pkgpath=pkgpath, reldir=reldir) ++ repo_obj._do_delta_rpm_package(po) ++ except Exception, e: ++ callback_wrap.errorlog(e) ++ continue ++ finally: ++ if pkg: ++ with active_work_size.get_lock(): ++ active_work_size.value -= pkg[1] ++ completion_event.set() ++ read_pkgs_proxy.append(pkg) ++ ++ def manager_entry(packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, repo_obj, pkgpath, reldir): ++ max_work_size = repo_obj.conf.max_concurrent_delta_rpm_size ++ num_workers = repo_obj.conf.delta_workers ++ workers = [ ] ++ callback_wrap.log("Starting %d workers to process deltarpms - max total work size (%d) bytes" % (num_workers, max_work_size)) ++ for i in range(0,repo_obj.conf.delta_workers): ++ wp = multiprocessing.Process(target = worker_entry, args = (work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir)) ++ workers.append(wp) ++ wp.start() ++ ++ pending_packages = 0 ++ while len(packages) > 0: ++ # Look through the package list and add things that fit under the max size limit ++ # until we reach the end of the list ++ ++ # Don't read shared state for every package - it is an expensive operation ++ work_size_snap = active_work_size.value ++ #log_queue.put("Entered main loop with package list of length %d and size snap %d" % (len(packages), work_size_snap)) ++ consumed = [ ] ++ for i in range(0,len(packages)): ++ package = packages[i] ++ if package[1] + work_size_snap < max_work_size: ++ with active_work_size.get_lock(): ++ # As long as we have the lock we may as well refresh our view of the actual size ++ active_work_size.value += package[1] ++ #Turn on profiling if you want to convince yourself that this really does keep the size sane ++ if self.conf.profile: ++ callback_wrap.log("Adding package (%s) of size %d to deltarpm work queue" % (package[0], package[1])) ++ callback_wrap.log("Current TOTAL in-flight work size: %d" % (active_work_size.value)) ++ callback_wrap.log("Packages remaining to process: %d" % (len(packages)-len(consumed)-1)) ++ work_size_snap = active_work_size.value ++ # Note that we block here if the queue is full ++ pending_packages = work_queue.qsize() + 1 ++ consumed.append(i) ++ # This can block - do it without the lock ++ work_queue.put(package) ++ # Now prune the added items from the list, going backwards to ensure that we don't ++ # shift the index and delete the wrong thing ++ for i in reversed(consumed): ++ del packages[i] ++ if len(packages) == 0: ++ break ++ ++ with active_work_size.get_lock(): ++ work_queue_size = work_queue.qsize() ++ if pending_packages > work_queue_size: ++ # Some work was started since we last touched the queue - try to add more ++ # Note that this guarantees there is at least one free slot in the work_queue ++ # This should also prevent us from constantly spinning in the package loop when ++ # we have space in the queue but not enough active_work_size to allow us to add any ++ # available package ++ pending_packages = work_queue_size ++ continue ++ else: ++ completion_event.clear() ++ ++ # We either have too many items on the work_queue or too much total work size ++ # Wait for a worker to finish and then try again ++ completion_event.wait() ++ ++ # We are done - tell the workers to stop ++ for worker in workers: ++ work_queue.put(None) ++ ++ for worker in workers: ++ worker.join() ++ ++ # Now signal to the main thread that we are done adding work ++ log_queue.put(None) ++ ++ manager = multiprocessing.Process(target = manager_entry, args = (sorted_packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, self, pkgpath, reldir)) ++ manager.start() ++ ++ def log_digest(callback, log_message): ++ if log_message[0] == "errorlog": ++ callback.errorlog("Worker PID(%d) - %s" % (log_message[1], log_message[2])) ++ elif log_message[0] == "log": ++ callback.log("Worker PID(%d) - %s" % (log_message[1], log_message[2])) ++ else: ++ callback.errorlog("Malformed error in queue (%s)" % (str(log_message))) ++ ++ # Process log messages until we get the finished signal "None" ++ while True: ++ log_message = log_queue.get() ++ if log_message is None: ++ break ++ log_digest(self.callback, log_message) ++ ++ # now empty our proxy list ++ for pkg in read_pkgs_proxy: ++ self.read_pkgs.append(pkg) ++ ++ # TODO: we may be able to explicitly stop the Manager at this point ++ + + def closeMetadataDocs(self): + # save them up to the tmp locations: +@@ -847,19 +1053,22 @@ class MetaDataGenerator: + # appending the output. for each of the keys in the dict, return + # the tag for the target + each of the drpm infos + closure for the target + # tag +- targets = {} + results = [] +- for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'): +- drpm_rel_fn = os.path.normpath(self.conf.delta_relative + +- '/' + drpm_fn) # this is annoying +- drpm_po = yumbased.CreateRepoPackage(self.ts, +- self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype) +- +- drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir, +- drpm_rel_fn) +- if not targets.has_key(drpm_po.pkgtup): +- targets[drpm_po.pkgtup] = [] +- targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata()) ++ if self.conf.delta_workers == 1: ++ targets = {} ++ for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'): ++ drpm_rel_fn = os.path.normpath(self.conf.delta_relative + ++ '/' + drpm_fn) # this is annoying ++ drpm_po = yumbased.CreateRepoPackage(self.ts, ++ self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype) ++ ++ drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir, ++ drpm_rel_fn) ++ if not targets.has_key(drpm_po.pkgtup): ++ targets[drpm_po.pkgtup] = [] ++ targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata()) ++ else: ++ targets = self._parallel_generate_delta_xml() + + for (n, a, e, v, r) in targets.keys(): + results.append(""" \n""" % ( +@@ -872,6 +1081,52 @@ class MetaDataGenerator: + + return ' '.join(results) + ++ def _parallel_generate_delta_xml(self): ++ drpm_fns = [ ] ++ for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'): ++ drpm_fns.append(drpm_fn) ++ ++ manager = multiprocessing.Manager() ++ drpm_fns_proxy = manager.list(drpm_fns) ++ targets_proxy = manager.dict() ++ targets_lock = manager.RLock() ++ ++ def drpm_xml_entry(drpm_fns_proxy, targets_proxy, targets_lock, repo_obj): ++ while True: ++ try: ++ drpm_fn = drpm_fns_proxy.pop() ++ drpm_rel_fn = os.path.normpath(repo_obj.conf.delta_relative + ++ '/' + drpm_fn) # this is annoying ++ drpm_po = yumbased.CreateRepoPackage(repo_obj.ts, ++ repo_obj.conf.deltadir + '/' + drpm_fn, sumtype=repo_obj.conf.sumtype) ++ ++ drpm = deltarpms.DeltaRPMPackage(drpm_po, repo_obj.conf.outputdir, ++ drpm_rel_fn) ++ ++ with targets_lock: ++ d_element = targets_proxy.get(drpm_po.pkgtup, [ ]) ++ d_element.append(drpm.xml_dump_metadata()) ++ # managed dict requires that we re-assign modified list rather than modify in place ++ targets_proxy[drpm_po.pkgtup] = d_element ++ except IndexError: ++ break ++ ++ xml_workers = [ ] ++ for i in range(0,self.conf.delta_workers): ++ xw = multiprocessing.Process(target = drpm_xml_entry, args = (drpm_fns_proxy, targets_proxy, targets_lock, self)) ++ xml_workers.append(xw) ++ xw.start() ++ ++ for worker in xml_workers: ++ worker.join() ++ ++ # I'm doing a copy in this way as I believe that prevents references to the manager from lingering ++ # TODO: Verify? ++ targets_copy = { } ++ for key in targets_proxy.keys(): ++ targets_copy[key] = targets_proxy[key] ++ return targets_copy ++ + def _createRepoDataObject(self, mdfile, mdtype, compress=True, + compress_type=None, attribs={}): + """return random metadata as RepoData object to be added to RepoMD +diff --git a/genpkgmetadata.py b/genpkgmetadata.py +index 35e7fc9..a684038 100755 +--- a/genpkgmetadata.py ++++ b/genpkgmetadata.py +@@ -128,9 +128,15 @@ def parse_args(args, conf): + parser.add_option("--max-delta-rpm-size", default=100000000, + dest='max_delta_rpm_size', type='int', + help="max size of an rpm that to run deltarpm against (in bytes)") ++ parser.add_option("--max-concurrent-delta-rpm-size", default=100000000, ++ dest='max_concurrent_delta_rpm_size', type='int', ++ help="max total payload size of concurrent deltarpm runs (in bytes)") + parser.add_option("--workers", default=def_workers, + dest='workers', type='int', + help="number of workers to spawn to read rpms") ++ parser.add_option("--delta-workers", default=1, ++ dest='delta_workers', type='int', ++ help="number of workers to spawn to create delta rpms") + parser.add_option("--xz", default=False, + action="store_true", + help=SUPPRESS_HELP) +@@ -155,6 +161,12 @@ def parse_args(args, conf): + if opts.workers >= 128: + errorprint(_('Warning: More than 128 workers is a lot. Limiting.')) + opts.workers = 128 ++ if opts.delta_workers > opts.workers: ++ errorprint(_('Warning: Requested more delta workers than workers. This is insane. Limiting.')) ++ opts.delta_workers = opts.workers ++ if opts.max_concurrent_delta_rpm_size < opts.max_delta_rpm_size: ++ errorprint(_('Warning: max_concurrent_delta_rpm_size < max_delta_rpm_size - this will deadlock. Setting them to the same value.')) ++ opts.max_concurrent_delta_rpm_size = opts.max_delta_rpm_size + if opts.sumtype == 'sha1': + errorprint(_('Warning: It is more compatible to use sha instead of sha1')) + +diff --git a/modifyrepo.py b/modifyrepo.py +index 3c8a8bd..34b0902 100755 +--- a/modifyrepo.py ++++ b/modifyrepo.py +@@ -37,6 +37,7 @@ from yum.misc import checksum, _available_checksums, AutoFileChecksums + from yum.repoMDObject import RepoMD, RepoMDError, RepoData + from xml.dom import minidom + from optparse import OptionParser ++from cStringIO import StringIO + + + class RepoMetadata: +@@ -46,6 +47,9 @@ class RepoMetadata: + self.repodir = os.path.abspath(repo) + self.repomdxml = os.path.join(self.repodir, 'repomd.xml') + self.compress_type = _available_compression[-1] # best available ++ self.compress = True ++ self.checksum_type = 'sha256' ++ self.unique_md_filenames = True + + if not os.path.exists(self.repomdxml): + raise MDError, '%s not found' % self.repomdxml +@@ -103,6 +107,8 @@ class RepoMetadata: + if isinstance(metadata, minidom.Document): + md = metadata.toxml() + mdname = 'updateinfo.xml' ++ oldmd = AutoFileChecksums(StringIO(md), [self.checksum_type]) ++ oldmd.read() + elif isinstance(metadata, str): + if os.path.exists(metadata): + mdname = os.path.basename(metadata) +@@ -147,7 +153,7 @@ class RepoMetadata: + new_rd.checksum = (self.checksum_type, csum) + new_rd.size = str(os.stat(destmd).st_size) + if self.compress: +- new_rd.openchecksum = oldmd.checksums.hexdigests().popitem() ++ new_rd.openchecksum = (self.checksum_type, oldmd.checksums.hexdigests().popitem()[1]) + new_rd.opensize = str(oldmd.checksums.length) + new_rd.timestamp = str(int(os.stat(destmd).st_mtime)) + self.repoobj.repoData[new_rd.type] = new_rd +@@ -236,7 +242,7 @@ def main(args): + if opts.compress_type not in _available_compression: + print "Compression %s not available: Please choose from: %s" % (opts.compress_type, ', '.join(_available_compression)) + return 1 +- if opts.sumtype not in _available_checksums: ++ if opts.sumtype != 'sha' and opts.sumtype not in _available_checksums: + print "Checksum %s not available: Please choose from: %s" % (opts.sumtype, ', '.join(_available_checksums)) + return 1 + repomd.compress_type = opts.compress_type diff --git a/createrepo-deltarpm.patch b/createrepo-deltarpm.patch deleted file mode 100644 index 45dcb6e..0000000 --- a/createrepo-deltarpm.patch +++ /dev/null @@ -1,374 +0,0 @@ -diff --git a/createrepo/__init__.py b/createrepo/__init__.py -index 85f2a3d..517ea04 100644 ---- a/createrepo/__init__.py -+++ b/createrepo/__init__.py -@@ -28,6 +28,10 @@ import fcntl - import subprocess - from select import select - -+# To support parallel deltarpms -+import multiprocessing -+import multiprocessing.managers -+ - from yum import misc, Errors - from yum.repoMDObject import RepoMD, RepoData - from yum.sqlutils import executeSQL -@@ -113,7 +117,10 @@ class MetaDataConfig(object): - #self.worker_cmd = './worker.py' # helpful when testing - self.retain_old_md = 0 - self.compress_type = 'compat' -- -+ # Parallel deltas additions -+ self.delta_workers = 1 # number of workers to fork when doing deltarpms -+ # Keep the combined payload size of all in-progress deltarpm creation below this number -+ self.max_concurrent_delta_rpm_size = self.max_delta_rpm_size - - class SimpleMDCallBack(object): - def errorlog(self, thing): -@@ -718,19 +725,216 @@ class MetaDataGenerator: - if err: - raise MDError, "Failed to process %d package(s)." % err - -- for pkgfile in pkgfiles: -- if self.conf.deltas: -- try: -- po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir) -- self._do_delta_rpm_package(po) -- except MDError, e: -- errorprint(e) -- continue -- self.read_pkgs.append(pkgfile) -+ if self.conf.delta_workers == 1: -+ for pkgfile in pkgfiles: -+ if self.conf.deltas: -+ try: -+ po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir) -+ self._do_delta_rpm_package(po) -+ except MDError, e: -+ errorprint(e) -+ continue -+ self.read_pkgs.append(pkgfile) -+ else: -+ self._parallel_deltas(pkgfiles, pkgpath, reldir) - - save_keptpkgs(None) # append anything left - return self.current_pkg - -+ def _parallel_deltas(self, pkgfiles, pkgpath, reldir): -+ class WrappedMDCallBack(object): -+ def __init__(self, log_queue): -+ self.log_queue = log_queue -+ def errorlog(self, thing): -+ self.log_queue.put([ "errorlog", os.getpid(), thing ]) -+ -+ def log(self, thing): -+ self.log_queue.put([ "log", os.getpid(), thing ]) -+ -+ def progress(self, item, current, total): -+ # progress messages in a multiprocess context are likely to just be a confusing mess -+ pass -+ -+ # Init a few things that we'd rather do in the main process and then -+ # inherit in the children -+ if not hasattr(self, 'tempdir'): -+ self.tempdir = tempfile.mkdtemp() -+ self._get_old_package_dict() -+ -+ # queue containing packages that are candidates for processing -+ # now within the memory constraints -+ work_queue = multiprocessing.Queue(1) -+ -+ # queue containing callback messages from the workers -+ log_queue = multiprocessing.Queue() -+ -+ # Event used to allow the manager, when needed, to block for a completed task in a worker -+ completion_event = multiprocessing.Event() -+ -+ # wrapped callback to pass in to workers -+ callback_wrap = WrappedMDCallBack(log_queue) -+ -+ # list containing the completed packages -+ # accessed in children via a Manager and proxy as each child proc -+ # will be appending as it finishes -+ manager = multiprocessing.Manager() -+ read_pkgs_proxy = manager.list() -+ -+ # lists used by the package size reading workers -+ pkgfiles_proxy = manager.list(pkgfiles) -+ pkgfiles_withsize_proxy = manager.list() -+ -+ # process-safe value - total size of RPM payloads being deltaed -+ # The lock for entry into this also functions as our critical section -+ # elsewhere, as changes in the "in-flight" size of deltas is the key -+ # decision point in our work queue -+ # 'L' is unsigned long -+ active_work_size = multiprocessing.Value('L',0) -+ -+ # Our candidate list is the packages sorted from largest to smallest -+ # Do this with workers as well because, parallel is good -+ # Seriously though, this is also CPU-bound -+ self.callback.log("Reading package sizes in preparation for deltarpm creation") -+ -+ def size_reader_entry(pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, repo_obj): -+ while True: -+ try: -+ pkgfile = pkgfiles_proxy.pop() -+ po = repo_obj.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir) -+ pkgfiles_withsize_proxy.append([ pkgfile, po.size ]) -+ except IndexError: -+ break -+ -+ sort_workers = [ ] -+ for i in range(0,self.conf.delta_workers): -+ sw = multiprocessing.Process(target = size_reader_entry, args = (pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, self)) -+ sort_workers.append(sw) -+ sw.start() -+ -+ for worker in sort_workers: -+ worker.join() -+ -+ self.callback.log("Sorting package files by size") -+ sorted_packages = sorted(pkgfiles_withsize_proxy, key=lambda package: package[1], reverse=True) -+ -+ def worker_entry(work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir): -+ # We are now a new process - replace the callback with the wrapper that pushes log messages -+ # to a queue for processing in the main process -+ repo_obj.callback = callback_wrap -+ while True: -+ try: -+ pkg = None -+ pkg = work_queue.get() -+ if not pkg: -+ # The manager feeds each worker a None to indicate we are finished -+ # this allows us to use a blocking get without fear - I think -+ break -+ po = repo_obj.read_in_package(pkg[0], pkgpath=pkgpath, reldir=reldir) -+ repo_obj._do_delta_rpm_package(po) -+ except Exception, e: -+ callback_wrap.errorlog(e) -+ continue -+ finally: -+ if pkg: -+ with active_work_size.get_lock(): -+ active_work_size.value -= pkg[1] -+ completion_event.set() -+ read_pkgs_proxy.append(pkg) -+ -+ def manager_entry(packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, repo_obj, pkgpath, reldir): -+ max_work_size = repo_obj.conf.max_concurrent_delta_rpm_size -+ num_workers = repo_obj.conf.delta_workers -+ workers = [ ] -+ callback_wrap.log("Starting %d workers to process deltarpms - max total work size (%d) bytes" % (num_workers, max_work_size)) -+ for i in range(0,repo_obj.conf.delta_workers): -+ wp = multiprocessing.Process(target = worker_entry, args = (work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir)) -+ workers.append(wp) -+ wp.start() -+ -+ pending_packages = 0 -+ while len(packages) > 0: -+ # Look through the package list and add things that fit under the max size limit -+ # until we reach the end of the list -+ -+ # Don't read shared state for every package - it is an expensive operation -+ work_size_snap = active_work_size.value -+ #log_queue.put("Entered main loop with package list of length %d and size snap %d" % (len(packages), work_size_snap)) -+ consumed = [ ] -+ for i in range(0,len(packages)): -+ package = packages[i] -+ if package[1] + work_size_snap < max_work_size: -+ with active_work_size.get_lock(): -+ # As long as we have the lock we may as well refresh our view of the actual size -+ active_work_size.value += package[1] -+ #Turn on profiling if you want to convince yourself that this really does keep the size sane -+ if self.conf.profile: -+ callback_wrap.log("Adding package (%s) of size %d to deltarpm work queue" % (package[0], package[1])) -+ callback_wrap.log("Current TOTAL in-flight work size: %d" % (active_work_size.value)) -+ callback_wrap.log("Packages remaining to process: %d" % (len(packages)-len(consumed)-1)) -+ work_size_snap = active_work_size.value -+ # Note that we block here if the queue is full -+ pending_packages = work_queue.qsize() + 1 -+ consumed.append(i) -+ # This can block - do it without the lock -+ work_queue.put(package) -+ # Now prune the added items from the list, going backwards to ensure that we don't -+ # shift the index and delete the wrong thing -+ for i in reversed(consumed): -+ del packages[i] -+ if len(packages) == 0: -+ break -+ -+ with active_work_size.get_lock(): -+ work_queue_size = work_queue.qsize() -+ if pending_packages > work_queue_size: -+ # Some work was started since we last touched the queue - try to add more -+ # Note that this guarantees there is at least one free slot in the work_queue -+ # This should also prevent us from constantly spinning in the package loop when -+ # we have space in the queue but not enough active_work_size to allow us to add any -+ # available package -+ pending_packages = work_queue_size -+ continue -+ else: -+ completion_event.clear() -+ -+ # We either have too many items on the work_queue or too much total work size -+ # Wait for a worker to finish and then try again -+ completion_event.wait() -+ -+ # We are done - tell the workers to stop -+ for worker in workers: -+ work_queue.put(None) -+ -+ for worker in workers: -+ worker.join() -+ -+ # Now signal to the main thread that we are done adding work -+ log_queue.put(None) -+ -+ manager = multiprocessing.Process(target = manager_entry, args = (sorted_packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, self, pkgpath, reldir)) -+ manager.start() -+ -+ def log_digest(callback, log_message): -+ if log_message[0] == "errorlog": -+ callback.errorlog("Worker PID(%d) - %s" % (log_message[1], log_message[2])) -+ elif log_message[0] == "log": -+ callback.log("Worker PID(%d) - %s" % (log_message[1], log_message[2])) -+ else: -+ callback.errorlog("Malformed error in queue (%s)" % (str(log_message))) -+ -+ # Process log messages until we get the finished signal "None" -+ while True: -+ log_message = log_queue.get() -+ if log_message is None: -+ break -+ log_digest(self.callback, log_message) -+ -+ # now empty our proxy list -+ for pkg in read_pkgs_proxy: -+ self.read_pkgs.append(pkg) -+ -+ # TODO: we may be able to explicitly stop the Manager at this point -+ - - def closeMetadataDocs(self): - # save them up to the tmp locations: -@@ -849,19 +1053,22 @@ class MetaDataGenerator: - # appending the output. for each of the keys in the dict, return - # the tag for the target + each of the drpm infos + closure for the target - # tag -- targets = {} - results = [] -- for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'): -- drpm_rel_fn = os.path.normpath(self.conf.delta_relative + -- '/' + drpm_fn) # this is annoying -- drpm_po = yumbased.CreateRepoPackage(self.ts, -- self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype) -- -- drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir, -- drpm_rel_fn) -- if not targets.has_key(drpm_po.pkgtup): -- targets[drpm_po.pkgtup] = [] -- targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata()) -+ if self.conf.delta_workers == 1: -+ targets = {} -+ for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'): -+ drpm_rel_fn = os.path.normpath(self.conf.delta_relative + -+ '/' + drpm_fn) # this is annoying -+ drpm_po = yumbased.CreateRepoPackage(self.ts, -+ self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype) -+ -+ drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir, -+ drpm_rel_fn) -+ if not targets.has_key(drpm_po.pkgtup): -+ targets[drpm_po.pkgtup] = [] -+ targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata()) -+ else: -+ targets = self._parallel_generate_delta_xml() - - for (n, a, e, v, r) in targets.keys(): - results.append(""" \n""" % ( -@@ -874,6 +1081,52 @@ class MetaDataGenerator: - - return ' '.join(results) - -+ def _parallel_generate_delta_xml(self): -+ drpm_fns = [ ] -+ for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'): -+ drpm_fns.append(drpm_fn) -+ -+ manager = multiprocessing.Manager() -+ drpm_fns_proxy = manager.list(drpm_fns) -+ targets_proxy = manager.dict() -+ targets_lock = manager.RLock() -+ -+ def drpm_xml_entry(drpm_fns_proxy, targets_proxy, targets_lock, repo_obj): -+ while True: -+ try: -+ drpm_fn = drpm_fns_proxy.pop() -+ drpm_rel_fn = os.path.normpath(repo_obj.conf.delta_relative + -+ '/' + drpm_fn) # this is annoying -+ drpm_po = yumbased.CreateRepoPackage(repo_obj.ts, -+ repo_obj.conf.deltadir + '/' + drpm_fn, sumtype=repo_obj.conf.sumtype) -+ -+ drpm = deltarpms.DeltaRPMPackage(drpm_po, repo_obj.conf.outputdir, -+ drpm_rel_fn) -+ -+ with targets_lock: -+ d_element = targets_proxy.get(drpm_po.pkgtup, [ ]) -+ d_element.append(drpm.xml_dump_metadata()) -+ # managed dict requires that we re-assign modified list rather than modify in place -+ targets_proxy[drpm_po.pkgtup] = d_element -+ except IndexError: -+ break -+ -+ xml_workers = [ ] -+ for i in range(0,self.conf.delta_workers): -+ xw = multiprocessing.Process(target = drpm_xml_entry, args = (drpm_fns_proxy, targets_proxy, targets_lock, self)) -+ xml_workers.append(xw) -+ xw.start() -+ -+ for worker in xml_workers: -+ worker.join() -+ -+ # I'm doing a copy in this way as I believe that prevents references to the manager from lingering -+ # TODO: Verify? -+ targets_copy = { } -+ for key in targets_proxy.keys(): -+ targets_copy[key] = targets_proxy[key] -+ return targets_copy -+ - def _createRepoDataObject(self, mdfile, mdtype, compress=True, - compress_type=None, attribs={}): - """return random metadata as RepoData object to be added to RepoMD -diff --git a/genpkgmetadata.py b/genpkgmetadata.py -index 35e7fc9..a684038 100755 ---- a/genpkgmetadata.py -+++ b/genpkgmetadata.py -@@ -128,9 +128,15 @@ def parse_args(args, conf): - parser.add_option("--max-delta-rpm-size", default=100000000, - dest='max_delta_rpm_size', type='int', - help="max size of an rpm that to run deltarpm against (in bytes)") -+ parser.add_option("--max-concurrent-delta-rpm-size", default=100000000, -+ dest='max_concurrent_delta_rpm_size', type='int', -+ help="max total payload size of concurrent deltarpm runs (in bytes)") - parser.add_option("--workers", default=def_workers, - dest='workers', type='int', - help="number of workers to spawn to read rpms") -+ parser.add_option("--delta-workers", default=1, -+ dest='delta_workers', type='int', -+ help="number of workers to spawn to create delta rpms") - parser.add_option("--xz", default=False, - action="store_true", - help=SUPPRESS_HELP) -@@ -155,6 +161,12 @@ def parse_args(args, conf): - if opts.workers >= 128: - errorprint(_('Warning: More than 128 workers is a lot. Limiting.')) - opts.workers = 128 -+ if opts.delta_workers > opts.workers: -+ errorprint(_('Warning: Requested more delta workers than workers. This is insane. Limiting.')) -+ opts.delta_workers = opts.workers -+ if opts.max_concurrent_delta_rpm_size < opts.max_delta_rpm_size: -+ errorprint(_('Warning: max_concurrent_delta_rpm_size < max_delta_rpm_size - this will deadlock. Setting them to the same value.')) -+ opts.max_concurrent_delta_rpm_size = opts.max_delta_rpm_size - if opts.sumtype == 'sha1': - errorprint(_('Warning: It is more compatible to use sha instead of sha1')) - diff --git a/createrepo.spec b/createrepo.spec index 00ef2bc..4ab8450 100644 --- a/createrepo.spec +++ b/createrepo.spec @@ -21,7 +21,6 @@ Group: System Environment/Base Source: http://createrepo.baseurl.org/download/%{name}-%{version}.tar.gz Patch1: ten-changelog-limit.patch Patch2: createrepo-HEAD.patch -Patch3: createrepo-deltarpm.patch URL: http://createrepo.baseurl.org/ BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n) @@ -38,7 +37,6 @@ packages. %setup -q %patch1 -p0 %patch2 -p1 -%patch3 -p1 %build @@ -62,6 +60,13 @@ rm -rf $RPM_BUILD_ROOT %{python_sitelib}/createrepo %changelog +* Tue Jan 13 2015 James Antill - 0.10.3-5 +- Update to latest HEAD, into the correct patch file :-o +- Add Ian's parallel deltrarpm support. +- allow 'sha' checksum type for modifyrepo +- Fix open_checksum and open_size calculation in RepoMetadata.add(). +- Fix several AttributeErrors in RepoMetadata.add(). + * Mon Jan 12 2015 Ian McLeod - 0.10.3-4 - Pull in missing deltarpm patches (upstream fa0520 and bd5577)