Blob Blame History Raw
#!/usr/bin/env python

import os
import tarfile
import zipfile
import rpmfile

class ZFile (object):
    def __init__(self, path, mode='r', ext=None):
        super(ZFile, self).__init__()

        if ext is not None:
            _ext = ext
        else:
            _ext = os.path.splitext(path)[-1]
        if _ext.startswith('.'):
            _ext = _ext[1:]

        if zipfile.is_zipfile(path) or _ext == 'zip':
            self.f = zipfile.ZipFile(path, mode)
        elif tarfile.is_tarfile(path) or _ext in ('tar', 'tgz', 'gz'):
            self.f = tarfile.open(path, mode)
        elif _ext == 'rpm':
            self.f = rpmfile.open(path, mode + 'b')
        else:
            raise ValueError('Unsupported file extension: %s' % path)

    def __enter__(self):
        return self

    def __exit__(self, *args):
        if callable(getattr(self.f, 'close', None)):
            self.f.close()

    def getnames(self):
        if isinstance(self.f, zipfile.ZipFile):
            return self.f.namelist()
        elif isinstance(self.f, tarfile.TarFile):
            return self.f.getnames()
        elif isinstance(self.f, rpmfile.RPMFile):
            return [x.name for x in self.f.getmembers()]
        else:
            raise NotImplementedError

    def headers(self):
        if isinstance(self.f, rpmfile.RPMFile):
            return self.f.headers
        else:
            return dict()

    def extract_to(self, member, path):
        """ Extract compress file's \p member to \p path
            If \p path is a directory the member's basename will used as
            filename, otherwise path is considered the full file path name. """

        if not os.path.isdir(os.path.dirname(path)):
            os.makedirs(os.path.dirname(path))

        if os.path.isdir(path):
            path = os.path.join(path, os.path.basename(member))

        with open(path, 'wb') as of:
            if isinstance(self.f, zipfile.ZipFile):
                zf = self.f.open(member)
            else:
                zf = self.f.extractfile(member)

            while True:
                b = zf.read(1024*100)
                if b:
                    of.write(b)
                else:
                    break

            zf.close()


    @classmethod
    def extract (cls, zpath, member, outpath):
        """
        Extract file member (full internal path) to output from
        archive zpath.
        """

        with ZFile(zpath) as zf:
            zf.extract_to(member, outpath)


    @classmethod
    def compress (cls, zpath, paths, stripcnt=0, ext=None):
        """
        Create new compressed file \p zpath containing files in \p paths
        """

        with ZFile(zpath, 'w', ext=ext) as zf:
            for p in paths:
                outp = os.path.sep.join(p.split(os.path.sep)[stripcnt:])
                print('zip %s to %s (stripcnt %d)' % (p, outp, stripcnt))
                zf.f.write(p, outp)