{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
-- | Streaming compression and decompression using conduits.
--
-- Parts of this code were taken from zlib-enum and adapted for conduits.
module Data.Conduit.Zlib (
-- * Conduits
compress, decompress, gzip, ungzip,
-- * Flushing
compressFlush, decompressFlush,
-- * Decompression combinators
multiple,
-- * Re-exported from zlib-bindings
WindowBits (..), defaultWindowBits
) where
import Data.Streaming.Zlib
import Data.Conduit
import Data.ByteString (ByteString)
import qualified Data.ByteString as S
import Control.Monad (unless, liftM)
import Control.Monad.Trans.Class (lift, MonadTrans)
import Control.Monad.Primitive (PrimMonad, unsafePrimToPrim)
import Control.Monad.Base (MonadBase, liftBase)
import Control.Monad.Trans.Resource (MonadThrow, monadThrow)
import Data.Function (fix)
-- | Gzip compression with default parameters.
gzip :: (MonadThrow m, MonadBase base m, PrimMonad base) => Conduit ByteString m ByteString
gzip = compress 1 (WindowBits 31)
-- | Gzip decompression with default parameters.
ungzip :: (MonadBase base m, PrimMonad base, MonadThrow m) => Conduit ByteString m ByteString
ungzip = decompress (WindowBits 31)
unsafeLiftIO :: (MonadBase base m, PrimMonad base, MonadThrow m) => IO a -> m a
unsafeLiftIO = liftBase . unsafePrimToPrim
-- |
-- Decompress (inflate) a stream of 'ByteString's. For example:
--
-- > sourceFile "test.z" $= decompress defaultWindowBits $$ sinkFile "test"
decompress
:: (MonadBase base m, PrimMonad base, MonadThrow m)
=> WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library)
-> Conduit ByteString m ByteString
decompress =
helperDecompress (liftM (fmap Chunk) await) yield' leftover
where
yield' Flush = return ()
yield' (Chunk bs) = yield bs
-- | Same as 'decompress', but allows you to explicitly flush the stream.
decompressFlush
:: (MonadBase base m, PrimMonad base, MonadThrow m)
=> WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library)
-> Conduit (Flush ByteString) m (Flush ByteString)
decompressFlush = helperDecompress await yield (leftover . Chunk)
helperDecompress :: (Monad (t m), MonadBase base m, PrimMonad base, MonadThrow m, MonadTrans t)
=> t m (Maybe (Flush ByteString))
-> (Flush ByteString -> t m ())
-> (ByteString -> t m ())
-> WindowBits
-> t m ()
helperDecompress await' yield' leftover' config = do
-- Initialize the stateful inflater, which will be used below
-- This inflater is never exposed outside of this function
inf <- lift $ unsafeLiftIO $ initInflate config
-- Some helper functions used by the main feeder loop below
let -- Flush any remaining inflated bytes downstream
flush = do
chunk <- lift $ unsafeLiftIO $ flushInflate inf
unless (S.null chunk) $ yield' $ Chunk chunk
-- Get any input which is unused by the inflater
getUnused = lift $ unsafeLiftIO $ getUnusedInflate inf
-- If there is any unused data, return it as leftovers to the stream
unused = do
rem' <- getUnused
unless (S.null rem') $ leftover' rem'
-- Main loop: feed data from upstream into the inflater
fix $ \feeder -> do
mnext <- await'
case mnext of
-- No more data is available from upstream
Nothing -> do
-- Flush any remaining uncompressed data
flush
-- Return the rest of the unconsumed data as leftovers
unused
-- Another chunk of compressed data arrived
Just (Chunk x) -> do
-- Feed the compressed data into the inflater, returning a
-- "popper" which will return chunks of decompressed data
popper <- lift $ unsafeLiftIO $ feedInflate inf x
-- Loop over the popper grabbing decompressed chunks and
-- yielding them downstream
fix $ \pop -> do
mbs <- lift $ unsafeLiftIO popper
case mbs of
-- No more data from this popper
PRDone -> do
rem' <- getUnused
if S.null rem'
-- No data was unused by the inflater, so let's
-- fill it up again and get more data out of it
then feeder
-- In this case, there is some unconsumed data,
-- meaning the compressed stream is complete.
-- At this point, we need to stop feeding,
-- return the unconsumed data as leftovers, and
-- flush any remaining content (which should be
-- nothing)
else do
flush
leftover' rem'
-- Another chunk available, yield it downstream and
-- loop again
PRNext bs -> do
yield' (Chunk bs)
pop
-- An error occurred inside zlib, throw it
PRError e -> lift $ monadThrow e
-- We've been asked to flush the stream
Just Flush -> do
-- Get any uncompressed data waiting for us
flush
-- Put a Flush in the stream
yield' Flush
-- Feed in more data
feeder
-- |
-- Compress (deflate) a stream of 'ByteString's. The 'WindowBits' also control
-- the format (zlib vs. gzip).
compress
:: (MonadBase base m, PrimMonad base, MonadThrow m)
=> Int -- ^ Compression level
-> WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library)
-> Conduit ByteString m ByteString
compress =
helperCompress (liftM (fmap Chunk) await) yield'
where
yield' Flush = return ()
yield' (Chunk bs) = yield bs
-- | Same as 'compress', but allows you to explicitly flush the stream.
compressFlush
:: (MonadBase base m, PrimMonad base, MonadThrow m)
=> Int -- ^ Compression level
-> WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library)
-> Conduit (Flush ByteString) m (Flush ByteString)
compressFlush = helperCompress await yield
helperCompress :: (Monad (t m), MonadBase base m, PrimMonad base, MonadThrow m, MonadTrans t)
=> t m (Maybe (Flush ByteString))
-> (Flush ByteString -> t m ())
-> Int
-> WindowBits
-> t m ()
helperCompress await' yield' level config =
await' >>= maybe (return ()) start
where
start input = do
def <- lift $ unsafeLiftIO $ initDeflate level config
push def input
continue def = await' >>= maybe (close def) (push def)
goPopper popper = do
mbs <- lift $ unsafeLiftIO popper
case mbs of
PRDone -> return ()
PRNext bs -> yield' (Chunk bs) >> goPopper popper
PRError e -> lift $ monadThrow e
push def (Chunk x) = do
popper <- lift $ unsafeLiftIO $ feedDeflate def x
goPopper popper
continue def
push def Flush = do
mchunk <- lift $ unsafeLiftIO $ flushDeflate def
case mchunk of
PRDone -> return ()
PRNext x -> yield' $ Chunk x
PRError e -> lift $ monadThrow e
yield' Flush
continue def
close def = do
mchunk <- lift $ unsafeLiftIO $ finishDeflate def
case mchunk of
PRDone -> return ()
PRNext chunk -> yield' (Chunk chunk) >> close def
PRError e -> lift $ monadThrow e
-- | The standard 'decompress' and 'ungzip' functions will only decompress a
-- single compressed entity from the stream. This combinator will exhaust the
-- stream completely of all individual compressed entities. This is useful for
-- cases where you have a concatenated archive, e.g. @cat file1.gz file2.gz >
-- combined.gz@.
--
-- Usage:
--
-- > sourceFile "combined.gz" $$ multiple ungzip =$ consume
--
-- This combinator will not fail on an empty stream. If you want to ensure that
-- at least one compressed entity in the stream exists, consider a usage such
-- as:
--
-- > sourceFile "combined.gz" $$ (ungzip >> multiple ungzip) =$ consume
--
-- @since 1.1.10
multiple :: Monad m
=> Conduit ByteString m a
-> Conduit ByteString m a
multiple inner =
loop
where
loop = do
mbs <- await
case mbs of
Nothing -> return ()
Just bs
| S.null bs -> loop
| otherwise -> do
leftover bs
inner
loop