#!/usr/bin/env python
import os
import tarfile
import zipfile
import rpmfile
class ZFile (object):
def __init__(self, path, mode='r', ext=None):
super(ZFile, self).__init__()
if ext is not None:
_ext = ext
else:
_ext = os.path.splitext(path)[-1]
if _ext.startswith('.'):
_ext = _ext[1:]
if zipfile.is_zipfile(path) or _ext == 'zip':
self.f = zipfile.ZipFile(path, mode)
elif tarfile.is_tarfile(path) or _ext in ('tar', 'tgz', 'gz'):
self.f = tarfile.open(path, mode)
elif _ext == 'rpm':
self.f = rpmfile.open(path, mode + 'b')
else:
raise ValueError('Unsupported file extension: %s' % path)
def __enter__(self):
return self
def __exit__(self, *args):
if callable(getattr(self.f, 'close', None)):
self.f.close()
def getnames(self):
if isinstance(self.f, zipfile.ZipFile):
return self.f.namelist()
elif isinstance(self.f, tarfile.TarFile):
return self.f.getnames()
elif isinstance(self.f, rpmfile.RPMFile):
return [x.name for x in self.f.getmembers()]
else:
raise NotImplementedError
def headers(self):
if isinstance(self.f, rpmfile.RPMFile):
return self.f.headers
else:
return dict()
def extract_to(self, member, path):
""" Extract compress file's \p member to \p path
If \p path is a directory the member's basename will used as
filename, otherwise path is considered the full file path name. """
if not os.path.isdir(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
if os.path.isdir(path):
path = os.path.join(path, os.path.basename(member))
with open(path, 'wb') as of:
if isinstance(self.f, zipfile.ZipFile):
zf = self.f.open(member)
else:
zf = self.f.extractfile(member)
while True:
b = zf.read(1024*100)
if b:
of.write(b)
else:
break
zf.close()
@classmethod
def extract (cls, zpath, member, outpath):
"""
Extract file member (full internal path) to output from
archive zpath.
"""
with ZFile(zpath) as zf:
zf.extract_to(member, outpath)
@classmethod
def compress (cls, zpath, paths, stripcnt=0, ext=None):
"""
Create new compressed file \p zpath containing files in \p paths
"""
with ZFile(zpath, 'w', ext=ext) as zf:
for p in paths:
outp = os.path.sep.join(p.split(os.path.sep)[stripcnt:])
print('zip %s to %s (stripcnt %d)' % (p, outp, stripcnt))
zf.f.write(p, outp)