diff --git a/createrepo/__init__.py b/createrepo/__init__.py
index b1875f6..517ea04 100644
--- a/createrepo/__init__.py
+++ b/createrepo/__init__.py
@@ -28,6 +28,10 @@ import fcntl
import subprocess
from select import select
+# To support parallel deltarpms
+import multiprocessing
+import multiprocessing.managers
+
from yum import misc, Errors
from yum.repoMDObject import RepoMD, RepoData
from yum.sqlutils import executeSQL
@@ -113,7 +117,10 @@ class MetaDataConfig(object):
#self.worker_cmd = './worker.py' # helpful when testing
self.retain_old_md = 0
self.compress_type = 'compat'
-
+ # Parallel deltas additions
+ self.delta_workers = 1 # number of workers to fork when doing deltarpms
+ # Keep the combined payload size of all in-progress deltarpm creation below this number
+ self.max_concurrent_delta_rpm_size = self.max_delta_rpm_size
class SimpleMDCallBack(object):
def errorlog(self, thing):
@@ -400,7 +407,9 @@ class MetaDataGenerator:
if self.conf.update:
self._setup_old_metadata_lookup()
# rpms we're going to be dealing with
- if self.conf.pkglist:
+ if isinstance(self.conf.pkglist, MetaSack):
+ packages = self.conf.pkglist
+ elif self.conf.pkglist:
packages = []
for pkg in self.conf.pkglist:
if '://' in pkg: # remote
@@ -716,19 +725,216 @@ class MetaDataGenerator:
if err:
raise MDError, "Failed to process %d package(s)." % err
- for pkgfile in pkgfiles:
- if self.conf.deltas:
- try:
- po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
- self._do_delta_rpm_package(po)
- except MDError, e:
- errorprint(e)
- continue
- self.read_pkgs.append(pkgfile)
+ if self.conf.delta_workers == 1:
+ for pkgfile in pkgfiles:
+ if self.conf.deltas:
+ try:
+ po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
+ self._do_delta_rpm_package(po)
+ except MDError, e:
+ errorprint(e)
+ continue
+ self.read_pkgs.append(pkgfile)
+ else:
+ self._parallel_deltas(pkgfiles, pkgpath, reldir)
save_keptpkgs(None) # append anything left
return self.current_pkg
+ def _parallel_deltas(self, pkgfiles, pkgpath, reldir):
+ class WrappedMDCallBack(object):
+ def __init__(self, log_queue):
+ self.log_queue = log_queue
+ def errorlog(self, thing):
+ self.log_queue.put([ "errorlog", os.getpid(), thing ])
+
+ def log(self, thing):
+ self.log_queue.put([ "log", os.getpid(), thing ])
+
+ def progress(self, item, current, total):
+ # progress messages in a multiprocess context are likely to just be a confusing mess
+ pass
+
+ # Init a few things that we'd rather do in the main process and then
+ # inherit in the children
+ if not hasattr(self, 'tempdir'):
+ self.tempdir = tempfile.mkdtemp()
+ self._get_old_package_dict()
+
+ # queue containing packages that are candidates for processing
+ # now within the memory constraints
+ work_queue = multiprocessing.Queue(1)
+
+ # queue containing callback messages from the workers
+ log_queue = multiprocessing.Queue()
+
+ # Event used to allow the manager, when needed, to block for a completed task in a worker
+ completion_event = multiprocessing.Event()
+
+ # wrapped callback to pass in to workers
+ callback_wrap = WrappedMDCallBack(log_queue)
+
+ # list containing the completed packages
+ # accessed in children via a Manager and proxy as each child proc
+ # will be appending as it finishes
+ manager = multiprocessing.Manager()
+ read_pkgs_proxy = manager.list()
+
+ # lists used by the package size reading workers
+ pkgfiles_proxy = manager.list(pkgfiles)
+ pkgfiles_withsize_proxy = manager.list()
+
+ # process-safe value - total size of RPM payloads being deltaed
+ # The lock for entry into this also functions as our critical section
+ # elsewhere, as changes in the "in-flight" size of deltas is the key
+ # decision point in our work queue
+ # 'L' is unsigned long
+ active_work_size = multiprocessing.Value('L',0)
+
+ # Our candidate list is the packages sorted from largest to smallest
+ # Do this with workers as well because, parallel is good
+ # Seriously though, this is also CPU-bound
+ self.callback.log("Reading package sizes in preparation for deltarpm creation")
+
+ def size_reader_entry(pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, repo_obj):
+ while True:
+ try:
+ pkgfile = pkgfiles_proxy.pop()
+ po = repo_obj.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
+ pkgfiles_withsize_proxy.append([ pkgfile, po.size ])
+ except IndexError:
+ break
+
+ sort_workers = [ ]
+ for i in range(0,self.conf.delta_workers):
+ sw = multiprocessing.Process(target = size_reader_entry, args = (pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, self))
+ sort_workers.append(sw)
+ sw.start()
+
+ for worker in sort_workers:
+ worker.join()
+
+ self.callback.log("Sorting package files by size")
+ sorted_packages = sorted(pkgfiles_withsize_proxy, key=lambda package: package[1], reverse=True)
+
+ def worker_entry(work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir):
+ # We are now a new process - replace the callback with the wrapper that pushes log messages
+ # to a queue for processing in the main process
+ repo_obj.callback = callback_wrap
+ while True:
+ try:
+ pkg = None
+ pkg = work_queue.get()
+ if not pkg:
+ # The manager feeds each worker a None to indicate we are finished
+ # this allows us to use a blocking get without fear - I think
+ break
+ po = repo_obj.read_in_package(pkg[0], pkgpath=pkgpath, reldir=reldir)
+ repo_obj._do_delta_rpm_package(po)
+ except Exception, e:
+ callback_wrap.errorlog(e)
+ continue
+ finally:
+ if pkg:
+ with active_work_size.get_lock():
+ active_work_size.value -= pkg[1]
+ completion_event.set()
+ read_pkgs_proxy.append(pkg)
+
+ def manager_entry(packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, repo_obj, pkgpath, reldir):
+ max_work_size = repo_obj.conf.max_concurrent_delta_rpm_size
+ num_workers = repo_obj.conf.delta_workers
+ workers = [ ]
+ callback_wrap.log("Starting %d workers to process deltarpms - max total work size (%d) bytes" % (num_workers, max_work_size))
+ for i in range(0,repo_obj.conf.delta_workers):
+ wp = multiprocessing.Process(target = worker_entry, args = (work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir))
+ workers.append(wp)
+ wp.start()
+
+ pending_packages = 0
+ while len(packages) > 0:
+ # Look through the package list and add things that fit under the max size limit
+ # until we reach the end of the list
+
+ # Don't read shared state for every package - it is an expensive operation
+ work_size_snap = active_work_size.value
+ #log_queue.put("Entered main loop with package list of length %d and size snap %d" % (len(packages), work_size_snap))
+ consumed = [ ]
+ for i in range(0,len(packages)):
+ package = packages[i]
+ if package[1] + work_size_snap < max_work_size:
+ with active_work_size.get_lock():
+ # As long as we have the lock we may as well refresh our view of the actual size
+ active_work_size.value += package[1]
+ #Turn on profiling if you want to convince yourself that this really does keep the size sane
+ if self.conf.profile:
+ callback_wrap.log("Adding package (%s) of size %d to deltarpm work queue" % (package[0], package[1]))
+ callback_wrap.log("Current TOTAL in-flight work size: %d" % (active_work_size.value))
+ callback_wrap.log("Packages remaining to process: %d" % (len(packages)-len(consumed)-1))
+ work_size_snap = active_work_size.value
+ # Note that we block here if the queue is full
+ pending_packages = work_queue.qsize() + 1
+ consumed.append(i)
+ # This can block - do it without the lock
+ work_queue.put(package)
+ # Now prune the added items from the list, going backwards to ensure that we don't
+ # shift the index and delete the wrong thing
+ for i in reversed(consumed):
+ del packages[i]
+ if len(packages) == 0:
+ break
+
+ with active_work_size.get_lock():
+ work_queue_size = work_queue.qsize()
+ if pending_packages > work_queue_size:
+ # Some work was started since we last touched the queue - try to add more
+ # Note that this guarantees there is at least one free slot in the work_queue
+ # This should also prevent us from constantly spinning in the package loop when
+ # we have space in the queue but not enough active_work_size to allow us to add any
+ # available package
+ pending_packages = work_queue_size
+ continue
+ else:
+ completion_event.clear()
+
+ # We either have too many items on the work_queue or too much total work size
+ # Wait for a worker to finish and then try again
+ completion_event.wait()
+
+ # We are done - tell the workers to stop
+ for worker in workers:
+ work_queue.put(None)
+
+ for worker in workers:
+ worker.join()
+
+ # Now signal to the main thread that we are done adding work
+ log_queue.put(None)
+
+ manager = multiprocessing.Process(target = manager_entry, args = (sorted_packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, self, pkgpath, reldir))
+ manager.start()
+
+ def log_digest(callback, log_message):
+ if log_message[0] == "errorlog":
+ callback.errorlog("Worker PID(%d) - %s" % (log_message[1], log_message[2]))
+ elif log_message[0] == "log":
+ callback.log("Worker PID(%d) - %s" % (log_message[1], log_message[2]))
+ else:
+ callback.errorlog("Malformed error in queue (%s)" % (str(log_message)))
+
+ # Process log messages until we get the finished signal "None"
+ while True:
+ log_message = log_queue.get()
+ if log_message is None:
+ break
+ log_digest(self.callback, log_message)
+
+ # now empty our proxy list
+ for pkg in read_pkgs_proxy:
+ self.read_pkgs.append(pkg)
+
+ # TODO: we may be able to explicitly stop the Manager at this point
+
def closeMetadataDocs(self):
# save them up to the tmp locations:
@@ -847,19 +1053,22 @@ class MetaDataGenerator:
# appending the output. for each of the keys in the dict, return
# the tag for the target + each of the drpm infos + closure for the target
# tag
- targets = {}
results = []
- for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
- drpm_rel_fn = os.path.normpath(self.conf.delta_relative +
- '/' + drpm_fn) # this is annoying
- drpm_po = yumbased.CreateRepoPackage(self.ts,
- self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype)
-
- drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir,
- drpm_rel_fn)
- if not targets.has_key(drpm_po.pkgtup):
- targets[drpm_po.pkgtup] = []
- targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata())
+ if self.conf.delta_workers == 1:
+ targets = {}
+ for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
+ drpm_rel_fn = os.path.normpath(self.conf.delta_relative +
+ '/' + drpm_fn) # this is annoying
+ drpm_po = yumbased.CreateRepoPackage(self.ts,
+ self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype)
+
+ drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir,
+ drpm_rel_fn)
+ if not targets.has_key(drpm_po.pkgtup):
+ targets[drpm_po.pkgtup] = []
+ targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata())
+ else:
+ targets = self._parallel_generate_delta_xml()
for (n, a, e, v, r) in targets.keys():
results.append(""" <newpackage name="%s" epoch="%s" version="%s" release="%s" arch="%s">\n""" % (
@@ -872,6 +1081,52 @@ class MetaDataGenerator:
return ' '.join(results)
+ def _parallel_generate_delta_xml(self):
+ drpm_fns = [ ]
+ for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
+ drpm_fns.append(drpm_fn)
+
+ manager = multiprocessing.Manager()
+ drpm_fns_proxy = manager.list(drpm_fns)
+ targets_proxy = manager.dict()
+ targets_lock = manager.RLock()
+
+ def drpm_xml_entry(drpm_fns_proxy, targets_proxy, targets_lock, repo_obj):
+ while True:
+ try:
+ drpm_fn = drpm_fns_proxy.pop()
+ drpm_rel_fn = os.path.normpath(repo_obj.conf.delta_relative +
+ '/' + drpm_fn) # this is annoying
+ drpm_po = yumbased.CreateRepoPackage(repo_obj.ts,
+ repo_obj.conf.deltadir + '/' + drpm_fn, sumtype=repo_obj.conf.sumtype)
+
+ drpm = deltarpms.DeltaRPMPackage(drpm_po, repo_obj.conf.outputdir,
+ drpm_rel_fn)
+
+ with targets_lock:
+ d_element = targets_proxy.get(drpm_po.pkgtup, [ ])
+ d_element.append(drpm.xml_dump_metadata())
+ # managed dict requires that we re-assign modified list rather than modify in place
+ targets_proxy[drpm_po.pkgtup] = d_element
+ except IndexError:
+ break
+
+ xml_workers = [ ]
+ for i in range(0,self.conf.delta_workers):
+ xw = multiprocessing.Process(target = drpm_xml_entry, args = (drpm_fns_proxy, targets_proxy, targets_lock, self))
+ xml_workers.append(xw)
+ xw.start()
+
+ for worker in xml_workers:
+ worker.join()
+
+ # I'm doing a copy in this way as I believe that prevents references to the manager from lingering
+ # TODO: Verify?
+ targets_copy = { }
+ for key in targets_proxy.keys():
+ targets_copy[key] = targets_proxy[key]
+ return targets_copy
+
def _createRepoDataObject(self, mdfile, mdtype, compress=True,
compress_type=None, attribs={}):
"""return random metadata as RepoData object to be added to RepoMD
diff --git a/genpkgmetadata.py b/genpkgmetadata.py
index 35e7fc9..a684038 100755
--- a/genpkgmetadata.py
+++ b/genpkgmetadata.py
@@ -128,9 +128,15 @@ def parse_args(args, conf):
parser.add_option("--max-delta-rpm-size", default=100000000,
dest='max_delta_rpm_size', type='int',
help="max size of an rpm that to run deltarpm against (in bytes)")
+ parser.add_option("--max-concurrent-delta-rpm-size", default=100000000,
+ dest='max_concurrent_delta_rpm_size', type='int',
+ help="max total payload size of concurrent deltarpm runs (in bytes)")
parser.add_option("--workers", default=def_workers,
dest='workers', type='int',
help="number of workers to spawn to read rpms")
+ parser.add_option("--delta-workers", default=1,
+ dest='delta_workers', type='int',
+ help="number of workers to spawn to create delta rpms")
parser.add_option("--xz", default=False,
action="store_true",
help=SUPPRESS_HELP)
@@ -155,6 +161,12 @@ def parse_args(args, conf):
if opts.workers >= 128:
errorprint(_('Warning: More than 128 workers is a lot. Limiting.'))
opts.workers = 128
+ if opts.delta_workers > opts.workers:
+ errorprint(_('Warning: Requested more delta workers than workers. This is insane. Limiting.'))
+ opts.delta_workers = opts.workers
+ if opts.max_concurrent_delta_rpm_size < opts.max_delta_rpm_size:
+ errorprint(_('Warning: max_concurrent_delta_rpm_size < max_delta_rpm_size - this will deadlock. Setting them to the same value.'))
+ opts.max_concurrent_delta_rpm_size = opts.max_delta_rpm_size
if opts.sumtype == 'sha1':
errorprint(_('Warning: It is more compatible to use sha instead of sha1'))
diff --git a/modifyrepo.py b/modifyrepo.py
index 3c8a8bd..34b0902 100755
--- a/modifyrepo.py
+++ b/modifyrepo.py
@@ -37,6 +37,7 @@ from yum.misc import checksum, _available_checksums, AutoFileChecksums
from yum.repoMDObject import RepoMD, RepoMDError, RepoData
from xml.dom import minidom
from optparse import OptionParser
+from cStringIO import StringIO
class RepoMetadata:
@@ -46,6 +47,9 @@ class RepoMetadata:
self.repodir = os.path.abspath(repo)
self.repomdxml = os.path.join(self.repodir, 'repomd.xml')
self.compress_type = _available_compression[-1] # best available
+ self.compress = True
+ self.checksum_type = 'sha256'
+ self.unique_md_filenames = True
if not os.path.exists(self.repomdxml):
raise MDError, '%s not found' % self.repomdxml
@@ -103,6 +107,8 @@ class RepoMetadata:
if isinstance(metadata, minidom.Document):
md = metadata.toxml()
mdname = 'updateinfo.xml'
+ oldmd = AutoFileChecksums(StringIO(md), [self.checksum_type])
+ oldmd.read()
elif isinstance(metadata, str):
if os.path.exists(metadata):
mdname = os.path.basename(metadata)
@@ -147,7 +153,7 @@ class RepoMetadata:
new_rd.checksum = (self.checksum_type, csum)
new_rd.size = str(os.stat(destmd).st_size)
if self.compress:
- new_rd.openchecksum = oldmd.checksums.hexdigests().popitem()
+ new_rd.openchecksum = (self.checksum_type, oldmd.checksums.hexdigests().popitem()[1])
new_rd.opensize = str(oldmd.checksums.length)
new_rd.timestamp = str(int(os.stat(destmd).st_mtime))
self.repoobj.repoData[new_rd.type] = new_rd
@@ -236,7 +242,7 @@ def main(args):
if opts.compress_type not in _available_compression:
print "Compression %s not available: Please choose from: %s" % (opts.compress_type, ', '.join(_available_compression))
return 1
- if opts.sumtype not in _available_checksums:
+ if opts.sumtype != 'sha' and opts.sumtype not in _available_checksums:
print "Checksum %s not available: Please choose from: %s" % (opts.sumtype, ', '.join(_available_checksums))
return 1
repomd.compress_type = opts.compress_type