|
Packit |
4b2029 |
{-# LANGUAGE CPP, RankNTypes #-}
|
|
Packit |
4b2029 |
{-# LANGUAGE DeriveDataTypeable #-}
|
|
Packit |
4b2029 |
{-# LANGUAGE ScopedTypeVariables #-}
|
|
Packit |
4b2029 |
-- | Functions for interacting with bytes.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- For many purposes, it's recommended to use the conduit-combinators library,
|
|
Packit |
4b2029 |
-- which provides a more complete set of functions.
|
|
Packit |
4b2029 |
module Data.Conduit.Binary
|
|
Packit |
4b2029 |
( -- * Files and @Handle@s
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Note that most of these functions live in the @MonadResource@ monad
|
|
Packit |
4b2029 |
-- to ensure resource finalization even in the presence of exceptions. In
|
|
Packit |
4b2029 |
-- order to run such code, you will need to use @runResourceT@.
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- ** Sources
|
|
Packit |
4b2029 |
sourceFile
|
|
Packit |
4b2029 |
, sourceHandle
|
|
Packit |
4b2029 |
, sourceHandleUnsafe
|
|
Packit |
4b2029 |
, sourceIOHandle
|
|
Packit |
4b2029 |
, sourceFileRange
|
|
Packit |
4b2029 |
, sourceHandleRange
|
|
Packit |
4b2029 |
, sourceHandleRangeWithBuffer
|
|
Packit |
4b2029 |
, withSourceFile
|
|
Packit |
4b2029 |
-- ** Sinks
|
|
Packit |
4b2029 |
, sinkFile
|
|
Packit |
4b2029 |
, sinkFileCautious
|
|
Packit |
4b2029 |
, sinkTempFile
|
|
Packit |
4b2029 |
, sinkSystemTempFile
|
|
Packit |
4b2029 |
, sinkHandle
|
|
Packit |
4b2029 |
, sinkIOHandle
|
|
Packit |
4b2029 |
, sinkHandleBuilder
|
|
Packit |
4b2029 |
, sinkHandleFlush
|
|
Packit |
4b2029 |
, withSinkFile
|
|
Packit |
4b2029 |
, withSinkFileBuilder
|
|
Packit |
4b2029 |
, withSinkFileCautious
|
|
Packit |
4b2029 |
-- ** Conduits
|
|
Packit |
4b2029 |
, conduitFile
|
|
Packit |
4b2029 |
, conduitHandle
|
|
Packit |
4b2029 |
-- * Utilities
|
|
Packit |
4b2029 |
-- ** Sources
|
|
Packit |
4b2029 |
, sourceLbs
|
|
Packit |
4b2029 |
-- ** Sinks
|
|
Packit |
4b2029 |
, head
|
|
Packit |
4b2029 |
, dropWhile
|
|
Packit |
4b2029 |
, take
|
|
Packit |
4b2029 |
, drop
|
|
Packit |
4b2029 |
, sinkCacheLength
|
|
Packit |
4b2029 |
, sinkLbs
|
|
Packit |
4b2029 |
, mapM_
|
|
Packit |
4b2029 |
-- *** Storable
|
|
Packit |
4b2029 |
, sinkStorable
|
|
Packit |
4b2029 |
, sinkStorableEx
|
|
Packit |
4b2029 |
-- ** Conduits
|
|
Packit |
4b2029 |
, isolate
|
|
Packit |
4b2029 |
, takeWhile
|
|
Packit |
4b2029 |
, Data.Conduit.Binary.lines
|
|
Packit |
4b2029 |
) where
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
import qualified Data.ByteString.Builder as BB
|
|
Packit |
4b2029 |
import qualified Data.Streaming.FileRead as FR
|
|
Packit |
4b2029 |
import Prelude hiding (head, take, drop, takeWhile, dropWhile, mapM_)
|
|
Packit |
4b2029 |
import qualified Data.ByteString as S
|
|
Packit |
4b2029 |
import Data.ByteString.Unsafe (unsafeUseAsCString)
|
|
Packit |
4b2029 |
import qualified Data.ByteString.Lazy as L
|
|
Packit |
4b2029 |
import Data.Conduit
|
|
Packit |
4b2029 |
import Data.Conduit.List (sourceList, consume)
|
|
Packit |
4b2029 |
import qualified Data.Conduit.List as CL
|
|
Packit |
4b2029 |
import Control.Exception (assert, finally, bracket)
|
|
Packit |
4b2029 |
import Control.Monad (unless, when)
|
|
Packit |
4b2029 |
import Control.Monad.IO.Class (liftIO, MonadIO)
|
|
Packit |
4b2029 |
import Control.Monad.IO.Unlift
|
|
Packit |
4b2029 |
import Control.Monad.Trans.Resource (allocate, release)
|
|
Packit |
4b2029 |
import Control.Monad.Trans.Class (lift)
|
|
Packit |
4b2029 |
import qualified System.IO as IO
|
|
Packit |
4b2029 |
import Data.Word (Word8, Word64)
|
|
Packit |
4b2029 |
#if (__GLASGOW_HASKELL__ < 710)
|
|
Packit |
4b2029 |
import Control.Applicative ((<$>))
|
|
Packit |
4b2029 |
#endif
|
|
Packit |
4b2029 |
import System.Directory (getTemporaryDirectory, removeFile)
|
|
Packit |
4b2029 |
import Data.ByteString.Lazy.Internal (defaultChunkSize)
|
|
Packit |
4b2029 |
import Data.ByteString.Internal (ByteString (PS), inlinePerformIO)
|
|
Packit |
4b2029 |
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
|
|
Packit |
4b2029 |
import Foreign.ForeignPtr (touchForeignPtr)
|
|
Packit |
4b2029 |
import Foreign.Ptr (plusPtr, castPtr)
|
|
Packit |
4b2029 |
import Foreign.Storable (Storable, peek, sizeOf)
|
|
Packit |
4b2029 |
import GHC.ForeignPtr (mallocPlainForeignPtrBytes)
|
|
Packit |
4b2029 |
import Control.Monad.Trans.Resource (MonadResource)
|
|
Packit |
4b2029 |
import Control.Monad.Catch (MonadThrow (..))
|
|
Packit |
4b2029 |
import Control.Exception (Exception)
|
|
Packit |
4b2029 |
import Data.Typeable (Typeable)
|
|
Packit |
4b2029 |
import Foreign.Ptr (Ptr)
|
|
Packit |
4b2029 |
#ifndef ALLOW_UNALIGNED_ACCESS
|
|
Packit |
4b2029 |
import Foreign.Marshal (alloca, copyBytes)
|
|
Packit |
4b2029 |
#endif
|
|
Packit |
4b2029 |
import System.Directory (renameFile)
|
|
Packit |
4b2029 |
import System.FilePath (takeDirectory, takeFileName, (<.>))
|
|
Packit |
4b2029 |
import System.IO (hClose, openBinaryTempFile)
|
|
Packit |
4b2029 |
import Control.Exception (throwIO, catch)
|
|
Packit |
4b2029 |
import System.IO.Error (isDoesNotExistError)
|
|
Packit |
4b2029 |
import qualified Data.ByteString.Builder as BB
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream the contents of a file as binary data.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
sourceFile :: MonadResource m
|
|
Packit |
4b2029 |
=> FilePath
|
|
Packit |
4b2029 |
-> Producer m S.ByteString
|
|
Packit |
4b2029 |
sourceFile fp =
|
|
Packit |
4b2029 |
bracketP
|
|
Packit |
4b2029 |
(FR.openFile fp)
|
|
Packit |
4b2029 |
FR.closeFile
|
|
Packit |
4b2029 |
loop
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
loop h = do
|
|
Packit |
4b2029 |
bs <- liftIO $ FR.readChunk h
|
|
Packit |
4b2029 |
unless (S.null bs) $ do
|
|
Packit |
4b2029 |
yield bs
|
|
Packit |
4b2029 |
loop h
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream the contents of a 'IO.Handle' as binary data. Note that this
|
|
Packit |
4b2029 |
-- function will /not/ automatically close the @Handle@ when processing
|
|
Packit |
4b2029 |
-- completes, since it did not acquire the @Handle@ in the first place.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
sourceHandle :: MonadIO m
|
|
Packit |
4b2029 |
=> IO.Handle
|
|
Packit |
4b2029 |
-> Producer m S.ByteString
|
|
Packit |
4b2029 |
sourceHandle h =
|
|
Packit |
4b2029 |
loop
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
loop = do
|
|
Packit |
4b2029 |
bs <- liftIO (S.hGetSome h defaultChunkSize)
|
|
Packit |
4b2029 |
if S.null bs
|
|
Packit |
4b2029 |
then return ()
|
|
Packit |
4b2029 |
else yield bs >> loop
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Same as @sourceHandle@, but instead of allocating a new buffer for each
|
|
Packit |
4b2029 |
-- incoming chunk of data, reuses the same buffer. Therefore, the @ByteString@s
|
|
Packit |
4b2029 |
-- yielded by this function are not referentially transparent between two
|
|
Packit |
4b2029 |
-- different @yield@s.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- This function will be slightly more efficient than @sourceHandle@ by
|
|
Packit |
4b2029 |
-- avoiding allocations and reducing garbage collections, but should only be
|
|
Packit |
4b2029 |
-- used if you can guarantee that you do not reuse a @ByteString@ (or any slice
|
|
Packit |
4b2029 |
-- thereof) between two calls to @await@.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 1.0.12
|
|
Packit |
4b2029 |
sourceHandleUnsafe :: MonadIO m => IO.Handle -> Source m ByteString
|
|
Packit |
4b2029 |
sourceHandleUnsafe handle = do
|
|
Packit |
4b2029 |
fptr <- liftIO $ mallocPlainForeignPtrBytes defaultChunkSize
|
|
Packit |
4b2029 |
let ptr = unsafeForeignPtrToPtr fptr
|
|
Packit |
4b2029 |
loop = do
|
|
Packit |
4b2029 |
count <- liftIO $ IO.hGetBuf handle ptr defaultChunkSize
|
|
Packit |
4b2029 |
when (count > 0) $ do
|
|
Packit |
4b2029 |
yield (PS fptr 0 count)
|
|
Packit |
4b2029 |
loop
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
loop
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
liftIO $ touchForeignPtr fptr
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | An alternative to 'sourceHandle'.
|
|
Packit |
4b2029 |
-- Instead of taking a pre-opened 'IO.Handle', it takes an action that opens
|
|
Packit |
4b2029 |
-- a 'IO.Handle' (in read mode), so that it can open it only when needed
|
|
Packit |
4b2029 |
-- and close it as soon as possible.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
sourceIOHandle :: MonadResource m
|
|
Packit |
4b2029 |
=> IO IO.Handle
|
|
Packit |
4b2029 |
-> Producer m S.ByteString
|
|
Packit |
4b2029 |
sourceIOHandle alloc = bracketP alloc IO.hClose sourceHandle
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream all incoming data to the given 'IO.Handle'. Note that this function
|
|
Packit |
4b2029 |
-- does /not/ flush and will /not/ close the @Handle@ when processing completes.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
sinkHandle :: MonadIO m
|
|
Packit |
4b2029 |
=> IO.Handle
|
|
Packit |
4b2029 |
-> Consumer S.ByteString m ()
|
|
Packit |
4b2029 |
sinkHandle h = awaitForever (liftIO . S.hPut h)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream incoming builders, executing them directly on the buffer of the
|
|
Packit |
4b2029 |
-- given 'IO.Handle'. Note that this function does /not/ automatically close the
|
|
Packit |
4b2029 |
-- @Handle@ when processing completes.
|
|
Packit |
4b2029 |
-- Pass 'Data.ByteString.Builder.Extra.flush' to flush the buffer.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.2.2
|
|
Packit |
4b2029 |
sinkHandleBuilder :: MonadIO m => IO.Handle -> ConduitM BB.Builder o m ()
|
|
Packit |
4b2029 |
sinkHandleBuilder h = awaitForever (liftIO . BB.hPutBuilder h)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream incoming @Flush@es, executing them on @IO.Handle@
|
|
Packit |
4b2029 |
-- Note that this function does /not/ automatically close the @Handle@ when
|
|
Packit |
4b2029 |
-- processing completes
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.2.2
|
|
Packit |
4b2029 |
sinkHandleFlush :: MonadIO m
|
|
Packit |
4b2029 |
=> IO.Handle
|
|
Packit |
4b2029 |
-> ConduitM (Flush S.ByteString) o m ()
|
|
Packit |
4b2029 |
sinkHandleFlush h =
|
|
Packit |
4b2029 |
awaitForever $ \mbs -> liftIO $
|
|
Packit |
4b2029 |
case mbs of
|
|
Packit |
4b2029 |
Chunk bs -> S.hPut h bs
|
|
Packit |
4b2029 |
Flush -> IO.hFlush h
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | An alternative to 'sinkHandle'.
|
|
Packit |
4b2029 |
-- Instead of taking a pre-opened 'IO.Handle', it takes an action that opens
|
|
Packit |
4b2029 |
-- a 'IO.Handle' (in write mode), so that it can open it only when needed
|
|
Packit |
4b2029 |
-- and close it as soon as possible.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
sinkIOHandle :: MonadResource m
|
|
Packit |
4b2029 |
=> IO IO.Handle
|
|
Packit |
4b2029 |
-> Consumer S.ByteString m ()
|
|
Packit |
4b2029 |
sinkIOHandle alloc = bracketP alloc IO.hClose sinkHandle
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream the contents of a file as binary data, starting from a certain
|
|
Packit |
4b2029 |
-- offset and only consuming up to a certain number of bytes.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
sourceFileRange :: MonadResource m
|
|
Packit |
4b2029 |
=> FilePath
|
|
Packit |
4b2029 |
-> Maybe Integer -- ^ Offset
|
|
Packit |
4b2029 |
-> Maybe Integer -- ^ Maximum count
|
|
Packit |
4b2029 |
-> Producer m S.ByteString
|
|
Packit |
4b2029 |
sourceFileRange fp offset count = bracketP
|
|
Packit |
4b2029 |
(IO.openBinaryFile fp IO.ReadMode)
|
|
Packit |
4b2029 |
IO.hClose
|
|
Packit |
4b2029 |
(\h -> sourceHandleRange h offset count)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream the contents of a handle as binary data, starting from a certain
|
|
Packit |
4b2029 |
-- offset and only consuming up to a certain number of bytes.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 1.0.8
|
|
Packit |
4b2029 |
sourceHandleRange :: MonadIO m
|
|
Packit |
4b2029 |
=> IO.Handle
|
|
Packit |
4b2029 |
-> Maybe Integer -- ^ Offset
|
|
Packit |
4b2029 |
-> Maybe Integer -- ^ Maximum count
|
|
Packit |
4b2029 |
-> Producer m S.ByteString
|
|
Packit |
4b2029 |
sourceHandleRange handle offset count =
|
|
Packit |
4b2029 |
sourceHandleRangeWithBuffer handle offset count defaultChunkSize
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream the contents of a handle as binary data, starting from a certain
|
|
Packit |
4b2029 |
-- offset and only consuming up to a certain number of bytes. This function
|
|
Packit |
4b2029 |
-- consumes chunks as specified by the buffer size.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 1.1.8
|
|
Packit |
4b2029 |
sourceHandleRangeWithBuffer :: MonadIO m
|
|
Packit |
4b2029 |
=> IO.Handle
|
|
Packit |
4b2029 |
-> Maybe Integer -- ^ Offset
|
|
Packit |
4b2029 |
-> Maybe Integer -- ^ Maximum count
|
|
Packit |
4b2029 |
-> Int -- ^ Buffer size
|
|
Packit |
4b2029 |
-> Producer m S.ByteString
|
|
Packit |
4b2029 |
sourceHandleRangeWithBuffer handle offset count buffer = do
|
|
Packit |
4b2029 |
case offset of
|
|
Packit |
4b2029 |
Nothing -> return ()
|
|
Packit |
4b2029 |
Just off -> liftIO $ IO.hSeek handle IO.AbsoluteSeek off
|
|
Packit |
4b2029 |
case count of
|
|
Packit |
4b2029 |
Nothing -> pullUnlimited
|
|
Packit |
4b2029 |
Just c -> pullLimited (fromInteger c)
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
pullUnlimited = do
|
|
Packit |
4b2029 |
bs <- liftIO $ S.hGetSome handle buffer
|
|
Packit |
4b2029 |
if S.null bs
|
|
Packit |
4b2029 |
then return ()
|
|
Packit |
4b2029 |
else do
|
|
Packit |
4b2029 |
yield bs
|
|
Packit |
4b2029 |
pullUnlimited
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
pullLimited c = do
|
|
Packit |
4b2029 |
bs <- liftIO $ S.hGetSome handle (min c buffer)
|
|
Packit |
4b2029 |
let c' = c - S.length bs
|
|
Packit |
4b2029 |
assert (c' >= 0) $
|
|
Packit |
4b2029 |
if S.null bs
|
|
Packit |
4b2029 |
then return ()
|
|
Packit |
4b2029 |
else do
|
|
Packit |
4b2029 |
yield bs
|
|
Packit |
4b2029 |
pullLimited c'
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream all incoming data to the given file.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
sinkFile :: MonadResource m
|
|
Packit |
4b2029 |
=> FilePath
|
|
Packit |
4b2029 |
-> Consumer S.ByteString m ()
|
|
Packit |
4b2029 |
sinkFile fp = sinkIOHandle (IO.openBinaryFile fp IO.WriteMode)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Cautious version of 'sinkFile'. The idea here is to stream the
|
|
Packit |
4b2029 |
-- values to a temporary file in the same directory of the destination
|
|
Packit |
4b2029 |
-- file, and only on successfully writing the entire file, moves it
|
|
Packit |
4b2029 |
-- atomically to the destination path.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- In the event of an exception occurring, the temporary file will be
|
|
Packit |
4b2029 |
-- deleted and no move will be made. If the application shuts down
|
|
Packit |
4b2029 |
-- without running exception handling (such as machine failure or a
|
|
Packit |
4b2029 |
-- SIGKILL), the temporary file will remain and the destination file
|
|
Packit |
4b2029 |
-- will be untouched.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.1.14
|
|
Packit |
4b2029 |
sinkFileCautious
|
|
Packit |
4b2029 |
:: MonadResource m
|
|
Packit |
4b2029 |
=> FilePath
|
|
Packit |
4b2029 |
-> ConduitM S.ByteString o m ()
|
|
Packit |
4b2029 |
sinkFileCautious fp =
|
|
Packit |
4b2029 |
bracketP (cautiousAcquire fp) cautiousCleanup inner
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
inner (tmpFP, h) = do
|
|
Packit |
4b2029 |
sinkHandle h
|
|
Packit |
4b2029 |
liftIO $ do
|
|
Packit |
4b2029 |
hClose h
|
|
Packit |
4b2029 |
renameFile tmpFP fp
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream data into a temporary file in the given directory with the
|
|
Packit |
4b2029 |
-- given filename pattern, and return the temporary filename. The
|
|
Packit |
4b2029 |
-- temporary file will be automatically deleted when exiting the
|
|
Packit |
4b2029 |
-- active 'ResourceT' block, if it still exists.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.1.15
|
|
Packit |
4b2029 |
sinkTempFile :: MonadResource m
|
|
Packit |
4b2029 |
=> FilePath -- ^ temp directory
|
|
Packit |
4b2029 |
-> String -- ^ filename pattern
|
|
Packit |
4b2029 |
-> ConduitM ByteString o m FilePath
|
|
Packit |
4b2029 |
sinkTempFile tmpdir pattern = do
|
|
Packit |
4b2029 |
(_releaseKey, (fp, h)) <- allocate
|
|
Packit |
4b2029 |
(IO.openBinaryTempFile tmpdir pattern)
|
|
Packit |
4b2029 |
(\(fp, h) -> IO.hClose h `finally` (removeFile fp `Control.Exception.catch` \e ->
|
|
Packit |
4b2029 |
if isDoesNotExistError e
|
|
Packit |
4b2029 |
then return ()
|
|
Packit |
4b2029 |
else throwIO e))
|
|
Packit |
4b2029 |
sinkHandle h
|
|
Packit |
4b2029 |
liftIO $ IO.hClose h
|
|
Packit |
4b2029 |
return fp
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Same as 'sinkTempFile', but will use the default temp file
|
|
Packit |
4b2029 |
-- directory for the system as the first argument.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.1.15
|
|
Packit |
4b2029 |
sinkSystemTempFile
|
|
Packit |
4b2029 |
:: MonadResource m
|
|
Packit |
4b2029 |
=> String -- ^ filename pattern
|
|
Packit |
4b2029 |
-> ConduitM ByteString o m FilePath
|
|
Packit |
4b2029 |
sinkSystemTempFile pattern = do
|
|
Packit |
4b2029 |
dir <- liftIO getTemporaryDirectory
|
|
Packit |
4b2029 |
sinkTempFile dir pattern
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream the contents of the input to a file, and also send it along the
|
|
Packit |
4b2029 |
-- pipeline. Similar in concept to the Unix command @tee@.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
conduitFile :: MonadResource m
|
|
Packit |
4b2029 |
=> FilePath
|
|
Packit |
4b2029 |
-> Conduit S.ByteString m S.ByteString
|
|
Packit |
4b2029 |
conduitFile fp = bracketP
|
|
Packit |
4b2029 |
(IO.openBinaryFile fp IO.WriteMode)
|
|
Packit |
4b2029 |
IO.hClose
|
|
Packit |
4b2029 |
conduitHandle
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream the contents of the input to a @Handle@, and also send it along the
|
|
Packit |
4b2029 |
-- pipeline. Similar in concept to the Unix command @tee@. Like @sourceHandle@,
|
|
Packit |
4b2029 |
-- does not close the handle on completion. Related to: @conduitFile@.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 1.0.9
|
|
Packit |
4b2029 |
conduitHandle :: MonadIO m => IO.Handle -> Conduit S.ByteString m S.ByteString
|
|
Packit |
4b2029 |
conduitHandle h = awaitForever $ \bs -> liftIO (S.hPut h bs) >> yield bs
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Ensure that only up to the given number of bytes are consumed by the inner
|
|
Packit |
4b2029 |
-- sink. Note that this does /not/ ensure that all of those bytes are in fact
|
|
Packit |
4b2029 |
-- consumed.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
isolate :: Monad m
|
|
Packit |
4b2029 |
=> Int
|
|
Packit |
4b2029 |
-> Conduit S.ByteString m S.ByteString
|
|
Packit |
4b2029 |
isolate =
|
|
Packit |
4b2029 |
loop
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
loop 0 = return ()
|
|
Packit |
4b2029 |
loop count = do
|
|
Packit |
4b2029 |
mbs <- await
|
|
Packit |
4b2029 |
case mbs of
|
|
Packit |
4b2029 |
Nothing -> return ()
|
|
Packit |
4b2029 |
Just bs -> do
|
|
Packit |
4b2029 |
let (a, b) = S.splitAt count bs
|
|
Packit |
4b2029 |
case count - S.length a of
|
|
Packit |
4b2029 |
0 -> do
|
|
Packit |
4b2029 |
unless (S.null b) $ leftover b
|
|
Packit |
4b2029 |
yield a
|
|
Packit |
4b2029 |
count' -> assert (S.null b) $ yield a >> loop count'
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Return the next byte from the stream, if available.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
head :: Monad m => Consumer S.ByteString m (Maybe Word8)
|
|
Packit |
4b2029 |
head = do
|
|
Packit |
4b2029 |
mbs <- await
|
|
Packit |
4b2029 |
case mbs of
|
|
Packit |
4b2029 |
Nothing -> return Nothing
|
|
Packit |
4b2029 |
Just bs ->
|
|
Packit |
4b2029 |
case S.uncons bs of
|
|
Packit |
4b2029 |
Nothing -> head
|
|
Packit |
4b2029 |
Just (w, bs') -> leftover bs' >> return (Just w)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Return all bytes while the predicate returns @True@.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
takeWhile :: Monad m => (Word8 -> Bool) -> Conduit S.ByteString m S.ByteString
|
|
Packit |
4b2029 |
takeWhile p =
|
|
Packit |
4b2029 |
loop
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
loop = await >>= maybe (return ()) go
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
go bs
|
|
Packit |
4b2029 |
| S.null x = next
|
|
Packit |
4b2029 |
| otherwise = yield x >> next
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
next = if S.null y then loop else leftover y
|
|
Packit |
4b2029 |
(x, y) = S.span p bs
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Ignore all bytes while the predicate returns @True@.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
dropWhile :: Monad m => (Word8 -> Bool) -> Consumer S.ByteString m ()
|
|
Packit |
4b2029 |
dropWhile p =
|
|
Packit |
4b2029 |
loop
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
loop = do
|
|
Packit |
4b2029 |
mbs <- await
|
|
Packit |
4b2029 |
case S.dropWhile p <$> mbs of
|
|
Packit |
4b2029 |
Nothing -> return ()
|
|
Packit |
4b2029 |
Just bs
|
|
Packit |
4b2029 |
| S.null bs -> loop
|
|
Packit |
4b2029 |
| otherwise -> leftover bs
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Take the given number of bytes, if available.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
take :: Monad m => Int -> Consumer S.ByteString m L.ByteString
|
|
Packit |
4b2029 |
take 0 = return L.empty
|
|
Packit |
4b2029 |
take n0 = go n0 id
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
go n front =
|
|
Packit |
4b2029 |
await >>= maybe (return $ L.fromChunks $ front []) go'
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
go' bs =
|
|
Packit |
4b2029 |
case S.length bs `compare` n of
|
|
Packit |
4b2029 |
LT -> go (n - S.length bs) (front . (bs:))
|
|
Packit |
4b2029 |
EQ -> return $ L.fromChunks $ front [bs]
|
|
Packit |
4b2029 |
GT ->
|
|
Packit |
4b2029 |
let (x, y) = S.splitAt n bs
|
|
Packit |
4b2029 |
in assert (not $ S.null y) $ leftover y >> return (L.fromChunks $ front [x])
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Drop up to the given number of bytes.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.5.0
|
|
Packit |
4b2029 |
drop :: Monad m => Int -> Consumer S.ByteString m ()
|
|
Packit |
4b2029 |
drop 0 = return ()
|
|
Packit |
4b2029 |
drop n0 = go n0
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
go n =
|
|
Packit |
4b2029 |
await >>= maybe (return ()) go'
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
go' bs =
|
|
Packit |
4b2029 |
case S.length bs `compare` n of
|
|
Packit |
4b2029 |
LT -> go (n - S.length bs)
|
|
Packit |
4b2029 |
EQ -> return ()
|
|
Packit |
4b2029 |
GT ->
|
|
Packit |
4b2029 |
let y = S.drop n bs
|
|
Packit |
4b2029 |
in assert (not $ S.null y) $ leftover y >> return ()
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Split the input bytes into lines. In other words, split on the LF byte
|
|
Packit |
4b2029 |
-- (10), and strip it from the output.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.3.0
|
|
Packit |
4b2029 |
lines :: Monad m => Conduit S.ByteString m S.ByteString
|
|
Packit |
4b2029 |
lines =
|
|
Packit |
4b2029 |
loop []
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
loop acc = await >>= maybe (finish acc) (go acc)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
finish acc =
|
|
Packit |
4b2029 |
let final = S.concat $ reverse acc
|
|
Packit |
4b2029 |
in unless (S.null final) (yield final)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
go acc more =
|
|
Packit |
4b2029 |
case S.uncons second of
|
|
Packit |
4b2029 |
Just (_, second') -> yield (S.concat $ reverse $ first:acc) >> go [] second'
|
|
Packit |
4b2029 |
Nothing -> loop $ more:acc
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
(first, second) = S.break (== 10) more
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream the chunks from a lazy bytestring.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 0.5.0
|
|
Packit |
4b2029 |
sourceLbs :: Monad m => L.ByteString -> Producer m S.ByteString
|
|
Packit |
4b2029 |
sourceLbs = sourceList . L.toChunks
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Stream the input data into a temp file and count the number of bytes
|
|
Packit |
4b2029 |
-- present. When complete, return a new @Source@ reading from the temp file
|
|
Packit |
4b2029 |
-- together with the length of the input in bytes.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- All resources will be cleaned up automatically.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 1.0.5
|
|
Packit |
4b2029 |
sinkCacheLength :: (MonadResource m1, MonadResource m2)
|
|
Packit |
4b2029 |
=> Sink S.ByteString m1 (Word64, Source m2 S.ByteString)
|
|
Packit |
4b2029 |
sinkCacheLength = do
|
|
Packit |
4b2029 |
tmpdir <- liftIO getTemporaryDirectory
|
|
Packit |
4b2029 |
(releaseKey, (fp, h)) <- allocate
|
|
Packit |
4b2029 |
(IO.openBinaryTempFile tmpdir "conduit.cache")
|
|
Packit |
4b2029 |
(\(fp, h) -> IO.hClose h `finally` removeFile fp)
|
|
Packit |
4b2029 |
len <- sinkHandleLen h
|
|
Packit |
4b2029 |
liftIO $ IO.hClose h
|
|
Packit |
4b2029 |
return (len, sourceFile fp >> release releaseKey)
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
sinkHandleLen :: MonadResource m => IO.Handle -> Sink S.ByteString m Word64
|
|
Packit |
4b2029 |
sinkHandleLen h =
|
|
Packit |
4b2029 |
loop 0
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
loop x =
|
|
Packit |
4b2029 |
await >>= maybe (return x) go
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
go bs = do
|
|
Packit |
4b2029 |
liftIO $ S.hPut h bs
|
|
Packit |
4b2029 |
loop $ x + fromIntegral (S.length bs)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Consume a stream of input into a lazy bytestring. Note that no lazy I\/O
|
|
Packit |
4b2029 |
-- is performed, but rather all content is read into memory strictly.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 1.0.5
|
|
Packit |
4b2029 |
sinkLbs :: Monad m => Sink S.ByteString m L.ByteString
|
|
Packit |
4b2029 |
sinkLbs = fmap L.fromChunks consume
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
mapM_BS :: Monad m => (Word8 -> m ()) -> S.ByteString -> m ()
|
|
Packit |
4b2029 |
mapM_BS f (PS fptr offset len) = do
|
|
Packit |
4b2029 |
let start = unsafeForeignPtrToPtr fptr `plusPtr` offset
|
|
Packit |
4b2029 |
end = start `plusPtr` len
|
|
Packit |
4b2029 |
loop ptr
|
|
Packit |
4b2029 |
| ptr >= end = inlinePerformIO (touchForeignPtr fptr) `seq` return ()
|
|
Packit |
4b2029 |
| otherwise = do
|
|
Packit |
4b2029 |
f (inlinePerformIO (peek ptr))
|
|
Packit |
4b2029 |
loop (ptr `plusPtr` 1)
|
|
Packit |
4b2029 |
loop start
|
|
Packit |
4b2029 |
{-# INLINE mapM_BS #-}
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Perform a computation on each @Word8@ in a stream.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 1.0.10
|
|
Packit |
4b2029 |
mapM_ :: Monad m => (Word8 -> m ()) -> Consumer S.ByteString m ()
|
|
Packit |
4b2029 |
mapM_ f = awaitForever (lift . mapM_BS f)
|
|
Packit |
4b2029 |
{-# INLINE mapM_ #-}
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Consume some instance of @Storable@ from the incoming byte stream. In the
|
|
Packit |
4b2029 |
-- event of insufficient bytes in the stream, returns a @Nothing@ and returns
|
|
Packit |
4b2029 |
-- all unused input as leftovers.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.1.13
|
|
Packit |
4b2029 |
sinkStorable :: (Monad m, Storable a) => Consumer S.ByteString m (Maybe a)
|
|
Packit |
4b2029 |
sinkStorable = sinkStorableHelper Just (return Nothing)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Same as 'sinkStorable', but throws a 'SinkStorableInsufficientBytes'
|
|
Packit |
4b2029 |
-- exception (via 'throwM') in the event of insufficient bytes. This can be
|
|
Packit |
4b2029 |
-- more efficient to use than 'sinkStorable' as it avoids the need to
|
|
Packit |
4b2029 |
-- construct/deconstruct a @Maybe@ wrapper in the success case.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.1.13
|
|
Packit |
4b2029 |
sinkStorableEx :: (MonadThrow m, Storable a) => Consumer S.ByteString m a
|
|
Packit |
4b2029 |
sinkStorableEx = sinkStorableHelper id (throwM SinkStorableInsufficientBytes)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
sinkStorableHelper :: forall m a b. (Monad m, Storable a)
|
|
Packit |
4b2029 |
=> (a -> b)
|
|
Packit |
4b2029 |
-> (Consumer S.ByteString m b)
|
|
Packit |
4b2029 |
-> Consumer S.ByteString m b
|
|
Packit |
4b2029 |
sinkStorableHelper wrap failure = do
|
|
Packit |
4b2029 |
start
|
|
Packit |
4b2029 |
where
|
|
Packit |
4b2029 |
size = sizeOf (undefined :: a)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- try the optimal case: next chunk has all the data we need
|
|
Packit |
4b2029 |
start = do
|
|
Packit |
4b2029 |
mbs <- await
|
|
Packit |
4b2029 |
case mbs of
|
|
Packit |
4b2029 |
Nothing -> failure
|
|
Packit |
4b2029 |
Just bs
|
|
Packit |
4b2029 |
| S.null bs -> start
|
|
Packit |
4b2029 |
| otherwise ->
|
|
Packit |
4b2029 |
case compare (S.length bs) size of
|
|
Packit |
4b2029 |
LT -> do
|
|
Packit |
4b2029 |
-- looks like we're stuck concating
|
|
Packit |
4b2029 |
leftover bs
|
|
Packit |
4b2029 |
lbs <- take size
|
|
Packit |
4b2029 |
let bs = S.concat $ L.toChunks lbs
|
|
Packit |
4b2029 |
case compare (S.length bs) size of
|
|
Packit |
4b2029 |
LT -> do
|
|
Packit |
4b2029 |
leftover bs
|
|
Packit |
4b2029 |
failure
|
|
Packit |
4b2029 |
EQ -> process bs
|
|
Packit |
4b2029 |
GT -> assert False (process bs)
|
|
Packit |
4b2029 |
EQ -> process bs
|
|
Packit |
4b2029 |
GT -> do
|
|
Packit |
4b2029 |
let (x, y) = S.splitAt size bs
|
|
Packit |
4b2029 |
leftover y
|
|
Packit |
4b2029 |
process x
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- Given a bytestring of exactly the correct size, grab the value
|
|
Packit |
4b2029 |
process bs = return $! wrap $! inlinePerformIO $!
|
|
Packit |
4b2029 |
unsafeUseAsCString bs (safePeek undefined . castPtr)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
safePeek :: a -> Ptr a -> IO a
|
|
Packit |
4b2029 |
#ifdef ALLOW_UNALIGNED_ACCESS
|
|
Packit |
4b2029 |
safePeek _ = peek
|
|
Packit |
4b2029 |
#else
|
|
Packit |
4b2029 |
safePeek val ptr = alloca (\t -> copyBytes t ptr (sizeOf val) >> peek t)
|
|
Packit |
4b2029 |
#endif
|
|
Packit |
4b2029 |
{-# INLINE sinkStorableHelper #-}
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
data SinkStorableException = SinkStorableInsufficientBytes
|
|
Packit |
4b2029 |
deriving (Show, Typeable)
|
|
Packit |
4b2029 |
instance Exception SinkStorableException
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Like 'IO.withBinaryFile', but provides a source to read bytes from.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.2.1
|
|
Packit |
4b2029 |
withSourceFile
|
|
Packit |
4b2029 |
:: (MonadUnliftIO m, MonadIO n)
|
|
Packit |
4b2029 |
=> FilePath
|
|
Packit |
4b2029 |
-> (ConduitM i ByteString n () -> m a)
|
|
Packit |
4b2029 |
-> m a
|
|
Packit |
4b2029 |
withSourceFile fp inner =
|
|
Packit |
4b2029 |
withRunInIO $ \run ->
|
|
Packit |
4b2029 |
IO.withBinaryFile fp IO.ReadMode $
|
|
Packit |
4b2029 |
run . inner . sourceHandle
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Like 'IO.withBinaryFile', but provides a sink to write bytes to.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.2.1
|
|
Packit |
4b2029 |
withSinkFile
|
|
Packit |
4b2029 |
:: (MonadUnliftIO m, MonadIO n)
|
|
Packit |
4b2029 |
=> FilePath
|
|
Packit |
4b2029 |
-> (ConduitM ByteString o n () -> m a)
|
|
Packit |
4b2029 |
-> m a
|
|
Packit |
4b2029 |
withSinkFile fp inner =
|
|
Packit |
4b2029 |
withRunInIO $ \run ->
|
|
Packit |
4b2029 |
IO.withBinaryFile fp IO.ReadMode $
|
|
Packit |
4b2029 |
run . inner . sinkHandle
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Same as 'withSinkFile', but lets you use a 'BB.Builder'.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.2.1
|
|
Packit |
4b2029 |
withSinkFileBuilder
|
|
Packit |
4b2029 |
:: (MonadUnliftIO m, MonadIO n)
|
|
Packit |
4b2029 |
=> FilePath
|
|
Packit |
4b2029 |
-> (ConduitM BB.Builder o n () -> m a)
|
|
Packit |
4b2029 |
-> m a
|
|
Packit |
4b2029 |
withSinkFileBuilder fp inner =
|
|
Packit |
4b2029 |
withRunInIO $ \run ->
|
|
Packit |
4b2029 |
IO.withBinaryFile fp IO.WriteMode $ \h ->
|
|
Packit |
4b2029 |
run $ inner $ CL.mapM_ (liftIO . BB.hPutBuilder h)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Like 'sinkFileCautious', but uses the @with@ pattern instead of
|
|
Packit |
4b2029 |
-- @MonadResource@.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.2.2
|
|
Packit |
4b2029 |
withSinkFileCautious
|
|
Packit |
4b2029 |
:: (MonadUnliftIO m, MonadIO n)
|
|
Packit |
4b2029 |
=> FilePath
|
|
Packit |
4b2029 |
-> (ConduitM S.ByteString o n () -> m a)
|
|
Packit |
4b2029 |
-> m a
|
|
Packit |
4b2029 |
withSinkFileCautious fp inner =
|
|
Packit |
4b2029 |
withRunInIO $ \run -> bracket
|
|
Packit |
4b2029 |
(cautiousAcquire fp)
|
|
Packit |
4b2029 |
cautiousCleanup
|
|
Packit |
4b2029 |
(\(tmpFP, h) -> do
|
|
Packit |
4b2029 |
a <- run $ inner $ sinkHandle h
|
|
Packit |
4b2029 |
hClose h
|
|
Packit |
4b2029 |
renameFile tmpFP fp
|
|
Packit |
4b2029 |
return a)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Helper function for Cautious functions above, do not export!
|
|
Packit |
4b2029 |
cautiousAcquire :: FilePath -> IO (FilePath, IO.Handle)
|
|
Packit |
4b2029 |
cautiousAcquire fp = openBinaryTempFile (takeDirectory fp) (takeFileName fp <.> "tmp")
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Helper function for Cautious functions above, do not export!
|
|
Packit |
4b2029 |
cautiousCleanup :: (FilePath, IO.Handle) -> IO ()
|
|
Packit |
4b2029 |
cautiousCleanup (tmpFP, h) = do
|
|
Packit |
4b2029 |
hClose h
|
|
Packit |
4b2029 |
removeFile tmpFP `Control.Exception.catch` \e ->
|
|
Packit |
4b2029 |
if isDoesNotExistError e
|
|
Packit |
4b2029 |
then return ()
|
|
Packit |
4b2029 |
else throwIO e
|