Blame Data/Conduit/Binary.hs

Packit 4b2029
{-# LANGUAGE CPP, RankNTypes #-}
Packit 4b2029
{-# LANGUAGE DeriveDataTypeable #-}
Packit 4b2029
{-# LANGUAGE ScopedTypeVariables #-}
Packit 4b2029
-- | Functions for interacting with bytes.
Packit 4b2029
--
Packit 4b2029
-- For many purposes, it's recommended to use the conduit-combinators library,
Packit 4b2029
-- which provides a more complete set of functions.
Packit 4b2029
module Data.Conduit.Binary
Packit 4b2029
    ( -- * Files and @Handle@s
Packit 4b2029
Packit 4b2029
      -- | Note that most of these functions live in the @MonadResource@ monad
Packit 4b2029
      -- to ensure resource finalization even in the presence of exceptions. In
Packit 4b2029
      -- order to run such code, you will need to use @runResourceT@.
Packit 4b2029
Packit 4b2029
      -- ** Sources
Packit 4b2029
      sourceFile
Packit 4b2029
    , sourceHandle
Packit 4b2029
    , sourceHandleUnsafe
Packit 4b2029
    , sourceIOHandle
Packit 4b2029
    , sourceFileRange
Packit 4b2029
    , sourceHandleRange
Packit 4b2029
    , sourceHandleRangeWithBuffer
Packit 4b2029
    , withSourceFile
Packit 4b2029
      -- ** Sinks
Packit 4b2029
    , sinkFile
Packit 4b2029
    , sinkFileCautious
Packit 4b2029
    , sinkTempFile
Packit 4b2029
    , sinkSystemTempFile
Packit 4b2029
    , sinkHandle
Packit 4b2029
    , sinkIOHandle
Packit 4b2029
    , sinkHandleBuilder
Packit 4b2029
    , sinkHandleFlush
Packit 4b2029
    , withSinkFile
Packit 4b2029
    , withSinkFileBuilder
Packit 4b2029
    , withSinkFileCautious
Packit 4b2029
      -- ** Conduits
Packit 4b2029
    , conduitFile
Packit 4b2029
    , conduitHandle
Packit 4b2029
      -- * Utilities
Packit 4b2029
      -- ** Sources
Packit 4b2029
    , sourceLbs
Packit 4b2029
      -- ** Sinks
Packit 4b2029
    , head
Packit 4b2029
    , dropWhile
Packit 4b2029
    , take
Packit 4b2029
    , drop
Packit 4b2029
    , sinkCacheLength
Packit 4b2029
    , sinkLbs
Packit 4b2029
    , mapM_
Packit 4b2029
      -- *** Storable
Packit 4b2029
    , sinkStorable
Packit 4b2029
    , sinkStorableEx
Packit 4b2029
      -- ** Conduits
Packit 4b2029
    , isolate
Packit 4b2029
    , takeWhile
Packit 4b2029
    , Data.Conduit.Binary.lines
Packit 4b2029
    ) where
Packit 4b2029
Packit 4b2029
import qualified Data.ByteString.Builder as BB
Packit 4b2029
import qualified Data.Streaming.FileRead as FR
Packit 4b2029
import Prelude hiding (head, take, drop, takeWhile, dropWhile, mapM_)
Packit 4b2029
import qualified Data.ByteString as S
Packit 4b2029
import Data.ByteString.Unsafe (unsafeUseAsCString)
Packit 4b2029
import qualified Data.ByteString.Lazy as L
Packit 4b2029
import Data.Conduit
Packit 4b2029
import Data.Conduit.List (sourceList, consume)
Packit 4b2029
import qualified Data.Conduit.List as CL
Packit 4b2029
import Control.Exception (assert, finally, bracket)
Packit 4b2029
import Control.Monad (unless, when)
Packit 4b2029
import Control.Monad.IO.Class (liftIO, MonadIO)
Packit 4b2029
import Control.Monad.IO.Unlift
Packit 4b2029
import Control.Monad.Trans.Resource (allocate, release)
Packit 4b2029
import Control.Monad.Trans.Class (lift)
Packit 4b2029
import qualified System.IO as IO
Packit 4b2029
import Data.Word (Word8, Word64)
Packit 4b2029
#if (__GLASGOW_HASKELL__ < 710)
Packit 4b2029
import Control.Applicative ((<$>))
Packit 4b2029
#endif
Packit 4b2029
import System.Directory (getTemporaryDirectory, removeFile)
Packit 4b2029
import Data.ByteString.Lazy.Internal (defaultChunkSize)
Packit 4b2029
import Data.ByteString.Internal (ByteString (PS), inlinePerformIO)
Packit 4b2029
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
Packit 4b2029
import Foreign.ForeignPtr (touchForeignPtr)
Packit 4b2029
import Foreign.Ptr (plusPtr, castPtr)
Packit 4b2029
import Foreign.Storable (Storable, peek, sizeOf)
Packit 4b2029
import GHC.ForeignPtr           (mallocPlainForeignPtrBytes)
Packit 4b2029
import Control.Monad.Trans.Resource (MonadResource)
Packit 4b2029
import Control.Monad.Catch (MonadThrow (..))
Packit 4b2029
import Control.Exception (Exception)
Packit 4b2029
import Data.Typeable (Typeable)
Packit 4b2029
import Foreign.Ptr (Ptr)
Packit 4b2029
#ifndef ALLOW_UNALIGNED_ACCESS
Packit 4b2029
import Foreign.Marshal (alloca, copyBytes)
Packit 4b2029
#endif
Packit 4b2029
import System.Directory (renameFile)
Packit 4b2029
import System.FilePath (takeDirectory, takeFileName, (<.>))
Packit 4b2029
import System.IO (hClose, openBinaryTempFile)
Packit 4b2029
import Control.Exception (throwIO, catch)
Packit 4b2029
import System.IO.Error (isDoesNotExistError)
Packit 4b2029
import qualified Data.ByteString.Builder as BB
Packit 4b2029
Packit 4b2029
-- | Stream the contents of a file as binary data.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
sourceFile :: MonadResource m
Packit 4b2029
           => FilePath
Packit 4b2029
           -> Producer m S.ByteString
Packit 4b2029
sourceFile fp =
Packit 4b2029
    bracketP
Packit 4b2029
        (FR.openFile fp)
Packit 4b2029
         FR.closeFile
Packit 4b2029
         loop
Packit 4b2029
  where
Packit 4b2029
    loop h = do
Packit 4b2029
        bs <- liftIO $ FR.readChunk h
Packit 4b2029
        unless (S.null bs) $ do
Packit 4b2029
            yield bs
Packit 4b2029
            loop h
Packit 4b2029
Packit 4b2029
-- | Stream the contents of a 'IO.Handle' as binary data. Note that this
Packit 4b2029
-- function will /not/ automatically close the @Handle@ when processing
Packit 4b2029
-- completes, since it did not acquire the @Handle@ in the first place.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
sourceHandle :: MonadIO m
Packit 4b2029
             => IO.Handle
Packit 4b2029
             -> Producer m S.ByteString
Packit 4b2029
sourceHandle h =
Packit 4b2029
    loop
Packit 4b2029
  where
Packit 4b2029
    loop = do
Packit 4b2029
        bs <- liftIO (S.hGetSome h defaultChunkSize)
Packit 4b2029
        if S.null bs
Packit 4b2029
            then return ()
Packit 4b2029
            else yield bs >> loop
Packit 4b2029
Packit 4b2029
-- | Same as @sourceHandle@, but instead of allocating a new buffer for each
Packit 4b2029
-- incoming chunk of data, reuses the same buffer. Therefore, the @ByteString@s
Packit 4b2029
-- yielded by this function are not referentially transparent between two
Packit 4b2029
-- different @yield@s.
Packit 4b2029
--
Packit 4b2029
-- This function will be slightly more efficient than @sourceHandle@ by
Packit 4b2029
-- avoiding allocations and reducing garbage collections, but should only be
Packit 4b2029
-- used if you can guarantee that you do not reuse a @ByteString@ (or any slice
Packit 4b2029
-- thereof) between two calls to @await@.
Packit 4b2029
--
Packit 4b2029
-- Since 1.0.12
Packit 4b2029
sourceHandleUnsafe :: MonadIO m => IO.Handle -> Source m ByteString
Packit 4b2029
sourceHandleUnsafe handle = do
Packit 4b2029
    fptr <- liftIO $ mallocPlainForeignPtrBytes defaultChunkSize
Packit 4b2029
    let ptr = unsafeForeignPtrToPtr fptr
Packit 4b2029
        loop = do
Packit 4b2029
            count <- liftIO $ IO.hGetBuf handle ptr defaultChunkSize
Packit 4b2029
            when (count > 0) $ do
Packit 4b2029
                yield (PS fptr 0 count)
Packit 4b2029
                loop
Packit 4b2029
Packit 4b2029
    loop
Packit 4b2029
Packit 4b2029
    liftIO $ touchForeignPtr fptr
Packit 4b2029
Packit 4b2029
-- | An alternative to 'sourceHandle'.
Packit 4b2029
-- Instead of taking a pre-opened 'IO.Handle', it takes an action that opens
Packit 4b2029
-- a 'IO.Handle' (in read mode), so that it can open it only when needed
Packit 4b2029
-- and close it as soon as possible.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
sourceIOHandle :: MonadResource m
Packit 4b2029
               => IO IO.Handle
Packit 4b2029
               -> Producer m S.ByteString
Packit 4b2029
sourceIOHandle alloc = bracketP alloc IO.hClose sourceHandle
Packit 4b2029
Packit 4b2029
-- | Stream all incoming data to the given 'IO.Handle'. Note that this function
Packit 4b2029
-- does /not/ flush and will /not/ close the @Handle@ when processing completes.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
sinkHandle :: MonadIO m
Packit 4b2029
           => IO.Handle
Packit 4b2029
           -> Consumer S.ByteString m ()
Packit 4b2029
sinkHandle h = awaitForever (liftIO . S.hPut h)
Packit 4b2029
Packit 4b2029
-- | Stream incoming builders, executing them directly on the buffer of the
Packit 4b2029
-- given 'IO.Handle'. Note that this function does /not/ automatically close the
Packit 4b2029
-- @Handle@ when processing completes.
Packit 4b2029
-- Pass 'Data.ByteString.Builder.Extra.flush' to flush the buffer.
Packit 4b2029
--
Packit 4b2029
-- @since 1.2.2
Packit 4b2029
sinkHandleBuilder :: MonadIO m => IO.Handle -> ConduitM BB.Builder o m ()
Packit 4b2029
sinkHandleBuilder h = awaitForever (liftIO . BB.hPutBuilder h)
Packit 4b2029
Packit 4b2029
-- | Stream incoming @Flush@es, executing them on @IO.Handle@
Packit 4b2029
-- Note that this function does /not/ automatically close the @Handle@ when
Packit 4b2029
-- processing completes
Packit 4b2029
--
Packit 4b2029
-- @since 1.2.2
Packit 4b2029
sinkHandleFlush :: MonadIO m
Packit 4b2029
                => IO.Handle
Packit 4b2029
                -> ConduitM (Flush S.ByteString) o m ()
Packit 4b2029
sinkHandleFlush h =
Packit 4b2029
  awaitForever $ \mbs -> liftIO $
Packit 4b2029
  case mbs of
Packit 4b2029
    Chunk bs -> S.hPut h bs
Packit 4b2029
    Flush -> IO.hFlush h
Packit 4b2029
Packit 4b2029
-- | An alternative to 'sinkHandle'.
Packit 4b2029
-- Instead of taking a pre-opened 'IO.Handle', it takes an action that opens
Packit 4b2029
-- a 'IO.Handle' (in write mode), so that it can open it only when needed
Packit 4b2029
-- and close it as soon as possible.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
sinkIOHandle :: MonadResource m
Packit 4b2029
             => IO IO.Handle
Packit 4b2029
             -> Consumer S.ByteString m ()
Packit 4b2029
sinkIOHandle alloc = bracketP alloc IO.hClose sinkHandle
Packit 4b2029
Packit 4b2029
-- | Stream the contents of a file as binary data, starting from a certain
Packit 4b2029
-- offset and only consuming up to a certain number of bytes.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
sourceFileRange :: MonadResource m
Packit 4b2029
                => FilePath
Packit 4b2029
                -> Maybe Integer -- ^ Offset
Packit 4b2029
                -> Maybe Integer -- ^ Maximum count
Packit 4b2029
                -> Producer m S.ByteString
Packit 4b2029
sourceFileRange fp offset count = bracketP
Packit 4b2029
    (IO.openBinaryFile fp IO.ReadMode)
Packit 4b2029
    IO.hClose
Packit 4b2029
    (\h -> sourceHandleRange h offset count)
Packit 4b2029
Packit 4b2029
-- | Stream the contents of a handle as binary data, starting from a certain
Packit 4b2029
-- offset and only consuming up to a certain number of bytes.
Packit 4b2029
--
Packit 4b2029
-- Since 1.0.8
Packit 4b2029
sourceHandleRange :: MonadIO m
Packit 4b2029
                  => IO.Handle
Packit 4b2029
                  -> Maybe Integer -- ^ Offset
Packit 4b2029
                  -> Maybe Integer -- ^ Maximum count
Packit 4b2029
                  -> Producer m S.ByteString
Packit 4b2029
sourceHandleRange handle offset count =
Packit 4b2029
  sourceHandleRangeWithBuffer handle offset count defaultChunkSize
Packit 4b2029
Packit 4b2029
-- | Stream the contents of a handle as binary data, starting from a certain
Packit 4b2029
-- offset and only consuming up to a certain number of bytes. This function
Packit 4b2029
-- consumes chunks as specified by the buffer size.
Packit 4b2029
--
Packit 4b2029
-- Since 1.1.8
Packit 4b2029
sourceHandleRangeWithBuffer :: MonadIO m
Packit 4b2029
                  => IO.Handle
Packit 4b2029
                  -> Maybe Integer -- ^ Offset
Packit 4b2029
                  -> Maybe Integer -- ^ Maximum count
Packit 4b2029
                  -> Int -- ^ Buffer size
Packit 4b2029
                  -> Producer m S.ByteString
Packit 4b2029
sourceHandleRangeWithBuffer handle offset count buffer = do
Packit 4b2029
    case offset of
Packit 4b2029
        Nothing -> return ()
Packit 4b2029
        Just off -> liftIO $ IO.hSeek handle IO.AbsoluteSeek off
Packit 4b2029
    case count of
Packit 4b2029
        Nothing -> pullUnlimited
Packit 4b2029
        Just c -> pullLimited (fromInteger c)
Packit 4b2029
  where
Packit 4b2029
    pullUnlimited = do
Packit 4b2029
        bs <- liftIO $ S.hGetSome handle buffer
Packit 4b2029
        if S.null bs
Packit 4b2029
            then return ()
Packit 4b2029
            else do
Packit 4b2029
                yield bs
Packit 4b2029
                pullUnlimited
Packit 4b2029
Packit 4b2029
    pullLimited c = do
Packit 4b2029
        bs <- liftIO $ S.hGetSome handle (min c buffer)
Packit 4b2029
        let c' = c - S.length bs
Packit 4b2029
        assert (c' >= 0) $
Packit 4b2029
            if S.null bs
Packit 4b2029
                then return ()
Packit 4b2029
                else do
Packit 4b2029
                    yield bs
Packit 4b2029
                    pullLimited c'
Packit 4b2029
Packit 4b2029
-- | Stream all incoming data to the given file.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
sinkFile :: MonadResource m
Packit 4b2029
         => FilePath
Packit 4b2029
         -> Consumer S.ByteString m ()
Packit 4b2029
sinkFile fp = sinkIOHandle (IO.openBinaryFile fp IO.WriteMode)
Packit 4b2029
Packit 4b2029
-- | Cautious version of 'sinkFile'. The idea here is to stream the
Packit 4b2029
-- values to a temporary file in the same directory of the destination
Packit 4b2029
-- file, and only on successfully writing the entire file, moves it
Packit 4b2029
-- atomically to the destination path.
Packit 4b2029
--
Packit 4b2029
-- In the event of an exception occurring, the temporary file will be
Packit 4b2029
-- deleted and no move will be made. If the application shuts down
Packit 4b2029
-- without running exception handling (such as machine failure or a
Packit 4b2029
-- SIGKILL), the temporary file will remain and the destination file
Packit 4b2029
-- will be untouched.
Packit 4b2029
--
Packit 4b2029
-- @since 1.1.14
Packit 4b2029
sinkFileCautious
Packit 4b2029
  :: MonadResource m
Packit 4b2029
  => FilePath
Packit 4b2029
  -> ConduitM S.ByteString o m ()
Packit 4b2029
sinkFileCautious fp =
Packit 4b2029
    bracketP (cautiousAcquire fp) cautiousCleanup inner
Packit 4b2029
  where
Packit 4b2029
    inner (tmpFP, h) = do
Packit 4b2029
        sinkHandle h
Packit 4b2029
        liftIO $ do
Packit 4b2029
            hClose h
Packit 4b2029
            renameFile tmpFP fp
Packit 4b2029
Packit 4b2029
-- | Stream data into a temporary file in the given directory with the
Packit 4b2029
-- given filename pattern, and return the temporary filename. The
Packit 4b2029
-- temporary file will be automatically deleted when exiting the
Packit 4b2029
-- active 'ResourceT' block, if it still exists.
Packit 4b2029
--
Packit 4b2029
-- @since 1.1.15
Packit 4b2029
sinkTempFile :: MonadResource m
Packit 4b2029
             => FilePath -- ^ temp directory
Packit 4b2029
             -> String -- ^ filename pattern
Packit 4b2029
             -> ConduitM ByteString o m FilePath
Packit 4b2029
sinkTempFile tmpdir pattern = do
Packit 4b2029
    (_releaseKey, (fp, h)) <- allocate
Packit 4b2029
        (IO.openBinaryTempFile tmpdir pattern)
Packit 4b2029
        (\(fp, h) -> IO.hClose h `finally` (removeFile fp `Control.Exception.catch` \e ->
Packit 4b2029
            if isDoesNotExistError e
Packit 4b2029
                then return ()
Packit 4b2029
                else throwIO e))
Packit 4b2029
    sinkHandle h
Packit 4b2029
    liftIO $ IO.hClose h
Packit 4b2029
    return fp
Packit 4b2029
Packit 4b2029
-- | Same as 'sinkTempFile', but will use the default temp file
Packit 4b2029
-- directory for the system as the first argument.
Packit 4b2029
--
Packit 4b2029
-- @since 1.1.15
Packit 4b2029
sinkSystemTempFile
Packit 4b2029
    :: MonadResource m
Packit 4b2029
    => String -- ^ filename pattern
Packit 4b2029
    -> ConduitM ByteString o m FilePath
Packit 4b2029
sinkSystemTempFile pattern = do
Packit 4b2029
    dir <- liftIO getTemporaryDirectory
Packit 4b2029
    sinkTempFile dir pattern
Packit 4b2029
Packit 4b2029
-- | Stream the contents of the input to a file, and also send it along the
Packit 4b2029
-- pipeline. Similar in concept to the Unix command @tee@.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
conduitFile :: MonadResource m
Packit 4b2029
            => FilePath
Packit 4b2029
            -> Conduit S.ByteString m S.ByteString
Packit 4b2029
conduitFile fp = bracketP
Packit 4b2029
    (IO.openBinaryFile fp IO.WriteMode)
Packit 4b2029
    IO.hClose
Packit 4b2029
    conduitHandle
Packit 4b2029
Packit 4b2029
-- | Stream the contents of the input to a @Handle@, and also send it along the
Packit 4b2029
-- pipeline. Similar in concept to the Unix command @tee@. Like @sourceHandle@,
Packit 4b2029
-- does not close the handle on completion. Related to: @conduitFile@.
Packit 4b2029
--
Packit 4b2029
-- Since 1.0.9
Packit 4b2029
conduitHandle :: MonadIO m => IO.Handle -> Conduit S.ByteString m S.ByteString
Packit 4b2029
conduitHandle h = awaitForever $ \bs -> liftIO (S.hPut h bs) >> yield bs
Packit 4b2029
Packit 4b2029
-- | Ensure that only up to the given number of bytes are consumed by the inner
Packit 4b2029
-- sink. Note that this does /not/ ensure that all of those bytes are in fact
Packit 4b2029
-- consumed.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
isolate :: Monad m
Packit 4b2029
        => Int
Packit 4b2029
        -> Conduit S.ByteString m S.ByteString
Packit 4b2029
isolate =
Packit 4b2029
    loop
Packit 4b2029
  where
Packit 4b2029
    loop 0 = return ()
Packit 4b2029
    loop count = do
Packit 4b2029
        mbs <- await
Packit 4b2029
        case mbs of
Packit 4b2029
            Nothing -> return ()
Packit 4b2029
            Just bs -> do
Packit 4b2029
                let (a, b) = S.splitAt count bs
Packit 4b2029
                case count - S.length a of
Packit 4b2029
                    0 -> do
Packit 4b2029
                        unless (S.null b) $ leftover b
Packit 4b2029
                        yield a
Packit 4b2029
                    count' -> assert (S.null b) $ yield a >> loop count'
Packit 4b2029
Packit 4b2029
-- | Return the next byte from the stream, if available.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
head :: Monad m => Consumer S.ByteString m (Maybe Word8)
Packit 4b2029
head = do
Packit 4b2029
    mbs <- await
Packit 4b2029
    case mbs of
Packit 4b2029
        Nothing -> return Nothing
Packit 4b2029
        Just bs ->
Packit 4b2029
            case S.uncons bs of
Packit 4b2029
                Nothing -> head
Packit 4b2029
                Just (w, bs') -> leftover bs' >> return (Just w)
Packit 4b2029
Packit 4b2029
-- | Return all bytes while the predicate returns @True@.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
takeWhile :: Monad m => (Word8 -> Bool) -> Conduit S.ByteString m S.ByteString
Packit 4b2029
takeWhile p =
Packit 4b2029
    loop
Packit 4b2029
  where
Packit 4b2029
    loop = await >>= maybe (return ()) go
Packit 4b2029
Packit 4b2029
    go bs
Packit 4b2029
        | S.null x = next
Packit 4b2029
        | otherwise = yield x >> next
Packit 4b2029
      where
Packit 4b2029
        next = if S.null y then loop else leftover y
Packit 4b2029
        (x, y) = S.span p bs
Packit 4b2029
Packit 4b2029
-- | Ignore all bytes while the predicate returns @True@.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
dropWhile :: Monad m => (Word8 -> Bool) -> Consumer S.ByteString m ()
Packit 4b2029
dropWhile p =
Packit 4b2029
    loop
Packit 4b2029
  where
Packit 4b2029
    loop = do
Packit 4b2029
        mbs <- await
Packit 4b2029
        case S.dropWhile p <$> mbs of
Packit 4b2029
            Nothing -> return ()
Packit 4b2029
            Just bs
Packit 4b2029
                | S.null bs -> loop
Packit 4b2029
                | otherwise -> leftover bs
Packit 4b2029
Packit 4b2029
-- | Take the given number of bytes, if available.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
take :: Monad m => Int -> Consumer S.ByteString m L.ByteString
Packit 4b2029
take  0 = return L.empty
Packit 4b2029
take n0 = go n0 id
Packit 4b2029
  where
Packit 4b2029
    go n front =
Packit 4b2029
        await >>= maybe (return $ L.fromChunks $ front []) go'
Packit 4b2029
      where
Packit 4b2029
        go' bs =
Packit 4b2029
            case S.length bs `compare` n of
Packit 4b2029
                LT -> go (n - S.length bs) (front . (bs:))
Packit 4b2029
                EQ -> return $ L.fromChunks $ front [bs]
Packit 4b2029
                GT ->
Packit 4b2029
                    let (x, y) = S.splitAt n bs
Packit 4b2029
                     in assert (not $ S.null y) $ leftover y >> return (L.fromChunks $ front [x])
Packit 4b2029
Packit 4b2029
-- | Drop up to the given number of bytes.
Packit 4b2029
--
Packit 4b2029
-- Since 0.5.0
Packit 4b2029
drop :: Monad m => Int -> Consumer S.ByteString m ()
Packit 4b2029
drop  0 = return ()
Packit 4b2029
drop n0 = go n0
Packit 4b2029
  where
Packit 4b2029
    go n =
Packit 4b2029
        await >>= maybe (return ()) go'
Packit 4b2029
      where
Packit 4b2029
        go' bs =
Packit 4b2029
            case S.length bs `compare` n of
Packit 4b2029
                LT -> go (n - S.length bs)
Packit 4b2029
                EQ -> return ()
Packit 4b2029
                GT ->
Packit 4b2029
                    let y = S.drop n bs
Packit 4b2029
                     in assert (not $ S.null y) $ leftover y >> return ()
Packit 4b2029
Packit 4b2029
-- | Split the input bytes into lines. In other words, split on the LF byte
Packit 4b2029
-- (10), and strip it from the output.
Packit 4b2029
--
Packit 4b2029
-- Since 0.3.0
Packit 4b2029
lines :: Monad m => Conduit S.ByteString m S.ByteString
Packit 4b2029
lines =
Packit 4b2029
    loop []
Packit 4b2029
  where
Packit 4b2029
    loop acc = await >>= maybe (finish acc) (go acc)
Packit 4b2029
Packit 4b2029
    finish acc =
Packit 4b2029
        let final = S.concat $ reverse acc
Packit 4b2029
         in unless (S.null final) (yield final)
Packit 4b2029
Packit 4b2029
    go acc more =
Packit 4b2029
        case S.uncons second of
Packit 4b2029
            Just (_, second') -> yield (S.concat $ reverse $ first:acc) >> go [] second'
Packit 4b2029
            Nothing -> loop $ more:acc
Packit 4b2029
      where
Packit 4b2029
        (first, second) = S.break (== 10) more
Packit 4b2029
Packit 4b2029
-- | Stream the chunks from a lazy bytestring.
Packit 4b2029
--
Packit 4b2029
-- Since 0.5.0
Packit 4b2029
sourceLbs :: Monad m => L.ByteString -> Producer m S.ByteString
Packit 4b2029
sourceLbs = sourceList . L.toChunks
Packit 4b2029
Packit 4b2029
-- | Stream the input data into a temp file and count the number of bytes
Packit 4b2029
-- present. When complete, return a new @Source@ reading from the temp file
Packit 4b2029
-- together with the length of the input in bytes.
Packit 4b2029
--
Packit 4b2029
-- All resources will be cleaned up automatically.
Packit 4b2029
--
Packit 4b2029
-- Since 1.0.5
Packit 4b2029
sinkCacheLength :: (MonadResource m1, MonadResource m2)
Packit 4b2029
                => Sink S.ByteString m1 (Word64, Source m2 S.ByteString)
Packit 4b2029
sinkCacheLength = do
Packit 4b2029
    tmpdir <- liftIO getTemporaryDirectory
Packit 4b2029
    (releaseKey, (fp, h)) <- allocate
Packit 4b2029
        (IO.openBinaryTempFile tmpdir "conduit.cache")
Packit 4b2029
        (\(fp, h) -> IO.hClose h `finally` removeFile fp)
Packit 4b2029
    len <- sinkHandleLen h
Packit 4b2029
    liftIO $ IO.hClose h
Packit 4b2029
    return (len, sourceFile fp >> release releaseKey)
Packit 4b2029
  where
Packit 4b2029
    sinkHandleLen :: MonadResource m => IO.Handle -> Sink S.ByteString m Word64
Packit 4b2029
    sinkHandleLen h =
Packit 4b2029
        loop 0
Packit 4b2029
      where
Packit 4b2029
        loop x =
Packit 4b2029
            await >>= maybe (return x) go
Packit 4b2029
          where
Packit 4b2029
            go bs = do
Packit 4b2029
                liftIO $ S.hPut h bs
Packit 4b2029
                loop $ x + fromIntegral (S.length bs)
Packit 4b2029
Packit 4b2029
-- | Consume a stream of input into a lazy bytestring. Note that no lazy I\/O
Packit 4b2029
-- is performed, but rather all content is read into memory strictly.
Packit 4b2029
--
Packit 4b2029
-- Since 1.0.5
Packit 4b2029
sinkLbs :: Monad m => Sink S.ByteString m L.ByteString
Packit 4b2029
sinkLbs = fmap L.fromChunks consume
Packit 4b2029
Packit 4b2029
mapM_BS :: Monad m => (Word8 -> m ()) -> S.ByteString -> m ()
Packit 4b2029
mapM_BS f (PS fptr offset len) = do
Packit 4b2029
    let start = unsafeForeignPtrToPtr fptr `plusPtr` offset
Packit 4b2029
        end = start `plusPtr` len
Packit 4b2029
        loop ptr
Packit 4b2029
            | ptr >= end = inlinePerformIO (touchForeignPtr fptr) `seq` return ()
Packit 4b2029
            | otherwise = do
Packit 4b2029
                f (inlinePerformIO (peek ptr))
Packit 4b2029
                loop (ptr `plusPtr` 1)
Packit 4b2029
    loop start
Packit 4b2029
{-# INLINE mapM_BS #-}
Packit 4b2029
Packit 4b2029
-- | Perform a computation on each @Word8@ in a stream.
Packit 4b2029
--
Packit 4b2029
-- Since 1.0.10
Packit 4b2029
mapM_ :: Monad m => (Word8 -> m ()) -> Consumer S.ByteString m ()
Packit 4b2029
mapM_ f = awaitForever (lift . mapM_BS f)
Packit 4b2029
{-# INLINE mapM_ #-}
Packit 4b2029
Packit 4b2029
-- | Consume some instance of @Storable@ from the incoming byte stream. In the
Packit 4b2029
-- event of insufficient bytes in the stream, returns a @Nothing@ and returns
Packit 4b2029
-- all unused input as leftovers.
Packit 4b2029
--
Packit 4b2029
-- @since 1.1.13
Packit 4b2029
sinkStorable :: (Monad m, Storable a) => Consumer S.ByteString m (Maybe a)
Packit 4b2029
sinkStorable = sinkStorableHelper Just (return Nothing)
Packit 4b2029
Packit 4b2029
-- | Same as 'sinkStorable', but throws a 'SinkStorableInsufficientBytes'
Packit 4b2029
-- exception (via 'throwM') in the event of insufficient bytes. This can be
Packit 4b2029
-- more efficient to use than 'sinkStorable' as it avoids the need to
Packit 4b2029
-- construct/deconstruct a @Maybe@ wrapper in the success case.
Packit 4b2029
--
Packit 4b2029
-- @since 1.1.13
Packit 4b2029
sinkStorableEx :: (MonadThrow m, Storable a) => Consumer S.ByteString m a
Packit 4b2029
sinkStorableEx = sinkStorableHelper id (throwM SinkStorableInsufficientBytes)
Packit 4b2029
Packit 4b2029
sinkStorableHelper :: forall m a b. (Monad m, Storable a)
Packit 4b2029
                   => (a -> b)
Packit 4b2029
                   -> (Consumer S.ByteString m b)
Packit 4b2029
                   -> Consumer S.ByteString m b
Packit 4b2029
sinkStorableHelper wrap failure = do
Packit 4b2029
    start
Packit 4b2029
  where
Packit 4b2029
    size = sizeOf (undefined :: a)
Packit 4b2029
Packit 4b2029
    -- try the optimal case: next chunk has all the data we need
Packit 4b2029
    start = do
Packit 4b2029
        mbs <- await
Packit 4b2029
        case mbs of
Packit 4b2029
            Nothing -> failure
Packit 4b2029
            Just bs
Packit 4b2029
                | S.null bs -> start
Packit 4b2029
                | otherwise ->
Packit 4b2029
                    case compare (S.length bs) size of
Packit 4b2029
                        LT -> do
Packit 4b2029
                            -- looks like we're stuck concating
Packit 4b2029
                            leftover bs
Packit 4b2029
                            lbs <- take size
Packit 4b2029
                            let bs = S.concat $ L.toChunks lbs
Packit 4b2029
                            case compare (S.length bs) size of
Packit 4b2029
                                LT -> do
Packit 4b2029
                                    leftover bs
Packit 4b2029
                                    failure
Packit 4b2029
                                EQ -> process bs
Packit 4b2029
                                GT -> assert False (process bs)
Packit 4b2029
                        EQ -> process bs
Packit 4b2029
                        GT -> do
Packit 4b2029
                            let (x, y) = S.splitAt size bs
Packit 4b2029
                            leftover y
Packit 4b2029
                            process x
Packit 4b2029
Packit 4b2029
    -- Given a bytestring of exactly the correct size, grab the value
Packit 4b2029
    process bs = return $! wrap $! inlinePerformIO $!
Packit 4b2029
        unsafeUseAsCString bs (safePeek undefined . castPtr)
Packit 4b2029
Packit 4b2029
    safePeek :: a -> Ptr a -> IO a
Packit 4b2029
#ifdef ALLOW_UNALIGNED_ACCESS
Packit 4b2029
    safePeek _ = peek
Packit 4b2029
#else
Packit 4b2029
    safePeek val ptr = alloca (\t -> copyBytes t ptr (sizeOf val) >> peek t)
Packit 4b2029
#endif
Packit 4b2029
{-# INLINE sinkStorableHelper #-}
Packit 4b2029
Packit 4b2029
data SinkStorableException = SinkStorableInsufficientBytes
Packit 4b2029
    deriving (Show, Typeable)
Packit 4b2029
instance Exception SinkStorableException
Packit 4b2029
Packit 4b2029
-- | Like 'IO.withBinaryFile', but provides a source to read bytes from.
Packit 4b2029
--
Packit 4b2029
-- @since 1.2.1
Packit 4b2029
withSourceFile
Packit 4b2029
  :: (MonadUnliftIO m, MonadIO n)
Packit 4b2029
  => FilePath
Packit 4b2029
  -> (ConduitM i ByteString n () -> m a)
Packit 4b2029
  -> m a
Packit 4b2029
withSourceFile fp inner =
Packit 4b2029
  withRunInIO $ \run ->
Packit 4b2029
  IO.withBinaryFile fp IO.ReadMode $
Packit 4b2029
  run . inner . sourceHandle
Packit 4b2029
Packit 4b2029
-- | Like 'IO.withBinaryFile', but provides a sink to write bytes to.
Packit 4b2029
--
Packit 4b2029
-- @since 1.2.1
Packit 4b2029
withSinkFile
Packit 4b2029
  :: (MonadUnliftIO m, MonadIO n)
Packit 4b2029
  => FilePath
Packit 4b2029
  -> (ConduitM ByteString o n () -> m a)
Packit 4b2029
  -> m a
Packit 4b2029
withSinkFile fp inner =
Packit 4b2029
  withRunInIO $ \run ->
Packit 4b2029
  IO.withBinaryFile fp IO.ReadMode $
Packit 4b2029
  run . inner . sinkHandle
Packit 4b2029
Packit 4b2029
-- | Same as 'withSinkFile', but lets you use a 'BB.Builder'.
Packit 4b2029
--
Packit 4b2029
-- @since 1.2.1
Packit 4b2029
withSinkFileBuilder
Packit 4b2029
  :: (MonadUnliftIO m, MonadIO n)
Packit 4b2029
  => FilePath
Packit 4b2029
  -> (ConduitM BB.Builder o n () -> m a)
Packit 4b2029
  -> m a
Packit 4b2029
withSinkFileBuilder fp inner =
Packit 4b2029
  withRunInIO $ \run ->
Packit 4b2029
  IO.withBinaryFile fp IO.WriteMode $ \h ->
Packit 4b2029
  run $ inner $ CL.mapM_ (liftIO . BB.hPutBuilder h)
Packit 4b2029
Packit 4b2029
-- | Like 'sinkFileCautious', but uses the @with@ pattern instead of
Packit 4b2029
-- @MonadResource@.
Packit 4b2029
--
Packit 4b2029
-- @since 1.2.2
Packit 4b2029
withSinkFileCautious
Packit 4b2029
  :: (MonadUnliftIO m, MonadIO n)
Packit 4b2029
  => FilePath
Packit 4b2029
  -> (ConduitM S.ByteString o n () -> m a)
Packit 4b2029
  -> m a
Packit 4b2029
withSinkFileCautious fp inner =
Packit 4b2029
  withRunInIO $ \run -> bracket
Packit 4b2029
    (cautiousAcquire fp)
Packit 4b2029
    cautiousCleanup
Packit 4b2029
    (\(tmpFP, h) -> do
Packit 4b2029
        a <- run $ inner $ sinkHandle h
Packit 4b2029
        hClose h
Packit 4b2029
        renameFile tmpFP fp
Packit 4b2029
        return a)
Packit 4b2029
Packit 4b2029
-- | Helper function for Cautious functions above, do not export!
Packit 4b2029
cautiousAcquire :: FilePath -> IO (FilePath, IO.Handle)
Packit 4b2029
cautiousAcquire fp = openBinaryTempFile (takeDirectory fp) (takeFileName fp <.> "tmp")
Packit 4b2029
Packit 4b2029
-- | Helper function for Cautious functions above, do not export!
Packit 4b2029
cautiousCleanup :: (FilePath, IO.Handle) -> IO ()
Packit 4b2029
cautiousCleanup (tmpFP, h) = do
Packit 4b2029
  hClose h
Packit 4b2029
  removeFile tmpFP `Control.Exception.catch` \e ->
Packit 4b2029
    if isDoesNotExistError e
Packit 4b2029
      then return ()
Packit 4b2029
      else throwIO e