Blob Blame History Raw
diff --git a/createrepo/__init__.py b/createrepo/__init__.py
index b1875f6..517ea04 100644
--- a/createrepo/__init__.py
+++ b/createrepo/__init__.py
@@ -28,6 +28,10 @@ import fcntl
 import subprocess
 from select import select
 
+# To support parallel deltarpms
+import multiprocessing
+import multiprocessing.managers
+
 from yum import misc, Errors
 from yum.repoMDObject import RepoMD, RepoData
 from yum.sqlutils import executeSQL
@@ -113,7 +117,10 @@ class MetaDataConfig(object):
         #self.worker_cmd = './worker.py' # helpful when testing
         self.retain_old_md = 0
         self.compress_type = 'compat'
-
+        # Parallel deltas additions
+        self.delta_workers = 1 # number of workers to fork when doing deltarpms
+        # Keep the combined payload size of all in-progress deltarpm creation below this number
+        self.max_concurrent_delta_rpm_size = self.max_delta_rpm_size
         
 class SimpleMDCallBack(object):
     def errorlog(self, thing):
@@ -400,7 +407,9 @@ class MetaDataGenerator:
         if self.conf.update:
             self._setup_old_metadata_lookup()
         # rpms we're going to be dealing with
-        if self.conf.pkglist:
+        if isinstance(self.conf.pkglist, MetaSack):
+            packages = self.conf.pkglist
+        elif self.conf.pkglist:
             packages = []
             for pkg in self.conf.pkglist:
                 if '://' in pkg: # remote
@@ -716,19 +725,216 @@ class MetaDataGenerator:
             if err:
                 raise MDError, "Failed to process %d package(s)." % err
 
-            for pkgfile in pkgfiles:
-                if self.conf.deltas:
-                    try:
-                        po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
-                        self._do_delta_rpm_package(po)
-                    except MDError, e:
-                        errorprint(e)
-                        continue
-                self.read_pkgs.append(pkgfile)
+            if self.conf.delta_workers == 1:
+                for pkgfile in pkgfiles:
+                    if self.conf.deltas:
+                        try:
+                            po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
+                            self._do_delta_rpm_package(po)
+                        except MDError, e:
+                            errorprint(e)
+                            continue
+                    self.read_pkgs.append(pkgfile)
+            else:
+                self._parallel_deltas(pkgfiles, pkgpath, reldir)
 
         save_keptpkgs(None) # append anything left
         return self.current_pkg
 
+    def _parallel_deltas(self, pkgfiles, pkgpath, reldir):
+        class WrappedMDCallBack(object):
+            def __init__(self, log_queue):
+                self.log_queue = log_queue
+            def errorlog(self, thing):
+                self.log_queue.put([ "errorlog", os.getpid(), thing ])
+
+            def log(self, thing):
+                self.log_queue.put([ "log", os.getpid(), thing ])
+
+            def progress(self, item, current, total):
+                # progress messages in a multiprocess context are likely to just be a confusing mess
+                pass
+
+        # Init a few things that we'd rather do in the main process and then
+        # inherit in the children
+        if not hasattr(self, 'tempdir'):
+            self.tempdir = tempfile.mkdtemp()
+        self._get_old_package_dict()
+
+        # queue containing packages that are candidates for processing
+        # now within the memory constraints
+        work_queue = multiprocessing.Queue(1)
+
+        # queue containing callback messages from the workers
+        log_queue = multiprocessing.Queue()
+
+        # Event used to allow the manager, when needed, to block for a completed task in a worker
+        completion_event = multiprocessing.Event()
+
+        # wrapped callback to pass in to workers
+        callback_wrap = WrappedMDCallBack(log_queue)
+
+        # list containing the completed packages
+        # accessed in children via a Manager and proxy as each child proc
+        # will be appending as it finishes
+        manager = multiprocessing.Manager()
+        read_pkgs_proxy = manager.list()
+
+        # lists used by the package size reading workers
+        pkgfiles_proxy = manager.list(pkgfiles)
+        pkgfiles_withsize_proxy = manager.list()
+
+        # process-safe value - total size of RPM payloads being deltaed
+        # The lock for entry into this also functions as our critical section
+        # elsewhere, as changes in the "in-flight" size of deltas is the key
+        # decision point in our work queue
+        # 'L' is unsigned long
+        active_work_size = multiprocessing.Value('L',0)
+
+        # Our candidate list is the packages sorted from largest to smallest
+        # Do this with workers as well because, parallel is good
+        # Seriously though, this is also CPU-bound
+        self.callback.log("Reading package sizes in preparation for deltarpm creation")
+
+        def size_reader_entry(pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, repo_obj):
+            while True:
+                try:
+                    pkgfile = pkgfiles_proxy.pop()
+                    po = repo_obj.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
+                    pkgfiles_withsize_proxy.append([ pkgfile, po.size ])
+                except IndexError:
+                    break
+
+        sort_workers = [ ]
+        for i in range(0,self.conf.delta_workers):
+            sw = multiprocessing.Process(target = size_reader_entry, args = (pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, self))
+            sort_workers.append(sw)
+            sw.start()
+
+        for worker in sort_workers:
+            worker.join()
+
+        self.callback.log("Sorting package files by size")
+        sorted_packages = sorted(pkgfiles_withsize_proxy, key=lambda package: package[1], reverse=True)
+
+        def worker_entry(work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir):
+            # We are now a new process - replace the callback with the wrapper that pushes log messages
+            # to a queue for processing in the main process
+            repo_obj.callback = callback_wrap
+            while True:
+                try:
+                    pkg = None
+                    pkg = work_queue.get()
+                    if not pkg:
+                        # The manager feeds each worker a None to indicate we are finished
+                        # this allows us to use a blocking get without fear - I think
+                        break
+                    po = repo_obj.read_in_package(pkg[0], pkgpath=pkgpath, reldir=reldir)
+                    repo_obj._do_delta_rpm_package(po)
+                except Exception, e:
+                    callback_wrap.errorlog(e)
+                    continue
+                finally:
+                    if pkg:
+                        with active_work_size.get_lock():
+                            active_work_size.value -= pkg[1]
+                            completion_event.set()
+                        read_pkgs_proxy.append(pkg)
+
+        def manager_entry(packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, repo_obj, pkgpath, reldir):
+            max_work_size = repo_obj.conf.max_concurrent_delta_rpm_size
+            num_workers = repo_obj.conf.delta_workers
+            workers = [ ]
+            callback_wrap.log("Starting %d workers to process deltarpms - max total work size (%d) bytes" % (num_workers, max_work_size))
+            for i in range(0,repo_obj.conf.delta_workers):
+                wp = multiprocessing.Process(target = worker_entry, args = (work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir))
+                workers.append(wp)
+                wp.start()
+
+            pending_packages = 0
+            while len(packages) > 0:
+                # Look through the package list and add things that fit under the max size limit
+                # until we reach the end of the list
+
+                # Don't read shared state for every package - it is an expensive operation
+                work_size_snap = active_work_size.value
+                #log_queue.put("Entered main loop with package list of length %d and size snap %d" % (len(packages), work_size_snap))
+                consumed = [ ]
+                for i in range(0,len(packages)):
+                    package = packages[i]
+                    if package[1] + work_size_snap < max_work_size:
+                        with active_work_size.get_lock():
+                            # As long as we have the lock we may as well refresh our view of the actual size
+                            active_work_size.value += package[1]
+                            #Turn on profiling if you want to convince yourself that this really does keep the size sane
+                            if self.conf.profile:
+                                callback_wrap.log("Adding package (%s) of size %d to deltarpm work queue" % (package[0], package[1]))
+                                callback_wrap.log("Current TOTAL in-flight work size: %d" % (active_work_size.value))
+                                callback_wrap.log("Packages remaining to process: %d" % (len(packages)-len(consumed)-1))
+                            work_size_snap = active_work_size.value
+                            # Note that we block here if the queue is full
+                            pending_packages = work_queue.qsize() + 1
+                            consumed.append(i)
+                        # This can block - do it without the lock
+                        work_queue.put(package)
+                # Now prune the added items from the list, going backwards to ensure that we don't
+                # shift the index and delete the wrong thing
+                for i in reversed(consumed):
+                    del packages[i]
+                if len(packages) == 0:
+                    break
+
+                with active_work_size.get_lock():
+                    work_queue_size = work_queue.qsize()
+                    if pending_packages > work_queue_size:
+                        # Some work was started since we last touched the queue - try to add more
+                        # Note that this guarantees there is at least one free slot in the work_queue
+                        # This should also prevent us from constantly spinning in the package loop when
+                        # we have space in the queue but not enough active_work_size to allow us to add any
+                        # available package
+                        pending_packages = work_queue_size
+                        continue
+                    else:
+                        completion_event.clear()
+
+                # We either have too many items on the work_queue or too much total work size
+                # Wait for a worker to finish and then try again
+                completion_event.wait()
+
+            # We are done - tell the workers to stop
+            for worker in workers:
+                work_queue.put(None)
+
+            for worker in workers:
+                worker.join()
+
+            # Now signal to the main thread that we are done adding work
+            log_queue.put(None)
+
+        manager = multiprocessing.Process(target = manager_entry, args = (sorted_packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, self, pkgpath, reldir))
+        manager.start()
+
+        def log_digest(callback, log_message):
+            if log_message[0] == "errorlog":
+                callback.errorlog("Worker PID(%d) - %s" % (log_message[1], log_message[2]))
+            elif log_message[0] == "log":
+                callback.log("Worker PID(%d) - %s" % (log_message[1], log_message[2]))
+            else:
+                callback.errorlog("Malformed error in queue (%s)" % (str(log_message)))
+
+        # Process log messages until we get the finished signal "None"
+        while True:
+            log_message = log_queue.get()
+            if log_message is None:
+                break
+            log_digest(self.callback, log_message)
+
+        # now empty our proxy list
+        for pkg in read_pkgs_proxy:
+            self.read_pkgs.append(pkg)
+
+        # TODO: we may be able to explicitly stop the Manager at this point
+
 
     def closeMetadataDocs(self):
         # save them up to the tmp locations:
@@ -847,19 +1053,22 @@ class MetaDataGenerator:
         # appending the output. for each of the keys in the dict, return
         # the tag for the target + each of the drpm infos + closure for the target
         # tag
-        targets = {}
         results = []
-        for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
-            drpm_rel_fn = os.path.normpath(self.conf.delta_relative +
-                                           '/' + drpm_fn) # this is annoying
-            drpm_po = yumbased.CreateRepoPackage(self.ts,
-                 self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype)
-
-            drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir,
-                                             drpm_rel_fn)
-            if not targets.has_key(drpm_po.pkgtup):
-                targets[drpm_po.pkgtup] = []
-            targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata())
+        if self.conf.delta_workers == 1:
+            targets = {}
+            for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
+                drpm_rel_fn = os.path.normpath(self.conf.delta_relative +
+                                               '/' + drpm_fn) # this is annoying
+                drpm_po = yumbased.CreateRepoPackage(self.ts,
+                     self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype)
+
+                drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir,
+                                                 drpm_rel_fn)
+                if not targets.has_key(drpm_po.pkgtup):
+                    targets[drpm_po.pkgtup] = []
+                targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata())
+        else:
+            targets = self._parallel_generate_delta_xml()
 
         for (n, a, e, v, r) in targets.keys():
             results.append("""  <newpackage name="%s" epoch="%s" version="%s" release="%s" arch="%s">\n""" % (
@@ -872,6 +1081,52 @@ class MetaDataGenerator:
 
         return ' '.join(results)
 
+    def _parallel_generate_delta_xml(self):
+        drpm_fns = [ ]
+        for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
+            drpm_fns.append(drpm_fn)
+
+        manager = multiprocessing.Manager()
+        drpm_fns_proxy = manager.list(drpm_fns)
+        targets_proxy = manager.dict()
+        targets_lock = manager.RLock()
+
+        def drpm_xml_entry(drpm_fns_proxy, targets_proxy, targets_lock, repo_obj):
+            while True:
+                try:
+                    drpm_fn = drpm_fns_proxy.pop()
+                    drpm_rel_fn = os.path.normpath(repo_obj.conf.delta_relative +
+                                                   '/' + drpm_fn) # this is annoying
+                    drpm_po = yumbased.CreateRepoPackage(repo_obj.ts,
+                         repo_obj.conf.deltadir + '/' + drpm_fn, sumtype=repo_obj.conf.sumtype)
+
+                    drpm = deltarpms.DeltaRPMPackage(drpm_po, repo_obj.conf.outputdir,
+                                                     drpm_rel_fn)
+
+                    with targets_lock:
+                        d_element = targets_proxy.get(drpm_po.pkgtup, [ ])
+                        d_element.append(drpm.xml_dump_metadata())
+                        # managed dict requires that we re-assign modified list rather than modify in place
+                        targets_proxy[drpm_po.pkgtup] = d_element
+                except IndexError:
+                    break
+
+        xml_workers = [ ]
+        for i in range(0,self.conf.delta_workers):
+            xw = multiprocessing.Process(target = drpm_xml_entry, args = (drpm_fns_proxy, targets_proxy, targets_lock, self))
+            xml_workers.append(xw)
+            xw.start()
+
+        for worker in xml_workers:
+            worker.join()
+
+        # I'm doing a copy in this way as I believe that prevents references to the manager from lingering
+        # TODO: Verify?
+        targets_copy = { }
+        for key in targets_proxy.keys():
+            targets_copy[key] = targets_proxy[key]
+        return targets_copy
+
     def _createRepoDataObject(self, mdfile, mdtype, compress=True, 
                               compress_type=None, attribs={}):
         """return random metadata as RepoData object to be  added to RepoMD
diff --git a/genpkgmetadata.py b/genpkgmetadata.py
index 35e7fc9..a684038 100755
--- a/genpkgmetadata.py
+++ b/genpkgmetadata.py
@@ -128,9 +128,15 @@ def parse_args(args, conf):
     parser.add_option("--max-delta-rpm-size", default=100000000,
         dest='max_delta_rpm_size', type='int',
         help="max size of an rpm that to run deltarpm against (in bytes)")
+    parser.add_option("--max-concurrent-delta-rpm-size", default=100000000,
+        dest='max_concurrent_delta_rpm_size', type='int',
+        help="max total payload size of concurrent deltarpm runs (in bytes)")
     parser.add_option("--workers", default=def_workers,
         dest='workers', type='int',
         help="number of workers to spawn to read rpms")
+    parser.add_option("--delta-workers", default=1,
+        dest='delta_workers', type='int',
+        help="number of workers to spawn to create delta rpms")
     parser.add_option("--xz", default=False,
         action="store_true",
         help=SUPPRESS_HELP)
@@ -155,6 +161,12 @@ def parse_args(args, conf):
     if opts.workers >= 128:
         errorprint(_('Warning: More than 128 workers is a lot. Limiting.'))
         opts.workers = 128
+    if opts.delta_workers > opts.workers:
+        errorprint(_('Warning: Requested more delta workers than workers. This is insane. Limiting.'))
+        opts.delta_workers = opts.workers
+    if opts.max_concurrent_delta_rpm_size < opts.max_delta_rpm_size:
+        errorprint(_('Warning: max_concurrent_delta_rpm_size < max_delta_rpm_size - this will deadlock. Setting them to the same value.'))
+        opts.max_concurrent_delta_rpm_size = opts.max_delta_rpm_size
     if opts.sumtype == 'sha1':
         errorprint(_('Warning: It is more compatible to use sha instead of sha1'))
 
diff --git a/modifyrepo.py b/modifyrepo.py
index 3c8a8bd..34b0902 100755
--- a/modifyrepo.py
+++ b/modifyrepo.py
@@ -37,6 +37,7 @@ from yum.misc import checksum, _available_checksums, AutoFileChecksums
 from yum.repoMDObject import RepoMD, RepoMDError, RepoData
 from xml.dom import minidom
 from optparse import OptionParser
+from cStringIO import StringIO
 
 
 class RepoMetadata:
@@ -46,6 +47,9 @@ class RepoMetadata:
         self.repodir = os.path.abspath(repo)
         self.repomdxml = os.path.join(self.repodir, 'repomd.xml')
         self.compress_type = _available_compression[-1] # best available
+        self.compress = True
+        self.checksum_type = 'sha256'
+        self.unique_md_filenames = True
 
         if not os.path.exists(self.repomdxml):
             raise MDError, '%s not found' % self.repomdxml
@@ -103,6 +107,8 @@ class RepoMetadata:
         if isinstance(metadata, minidom.Document):
             md = metadata.toxml()
             mdname = 'updateinfo.xml'
+            oldmd = AutoFileChecksums(StringIO(md), [self.checksum_type])
+            oldmd.read()
         elif isinstance(metadata, str):
             if os.path.exists(metadata):
                 mdname = os.path.basename(metadata)
@@ -147,7 +153,7 @@ class RepoMetadata:
         new_rd.checksum = (self.checksum_type, csum)
         new_rd.size = str(os.stat(destmd).st_size)
         if self.compress:
-            new_rd.openchecksum = oldmd.checksums.hexdigests().popitem()
+            new_rd.openchecksum = (self.checksum_type, oldmd.checksums.hexdigests().popitem()[1])
             new_rd.opensize = str(oldmd.checksums.length)
         new_rd.timestamp = str(int(os.stat(destmd).st_mtime))
         self.repoobj.repoData[new_rd.type] = new_rd
@@ -236,7 +242,7 @@ def main(args):
     if opts.compress_type not in _available_compression:
         print "Compression %s not available: Please choose from: %s" % (opts.compress_type, ', '.join(_available_compression))
         return 1
-    if opts.sumtype not in _available_checksums:
+    if opts.sumtype != 'sha' and opts.sumtype not in _available_checksums:
         print "Checksum %s not available: Please choose from: %s" % (opts.sumtype, ', '.join(_available_checksums))
         return 1
     repomd.compress_type = opts.compress_type