diff --git a/ChangeLog.md b/ChangeLog.md new file mode 100644 index 0000000..d6db7d5 --- /dev/null +++ b/ChangeLog.md @@ -0,0 +1,120 @@ +## 1.2.3.2 + +* Fix withSinkFileBuilder [#344](https://github.com/snoyberg/conduit/pull/344) + +## 1.2.3.1 + +* Fix typo in implementation of `withProcess_` + +## 1.2.3 + +* Added `withLoggedProcess_` + +## 1.2.2.1 + +* Add missing `hClose` to `withSinkFileCautious` + +## 1.2.2 + +* `sinkHandleBuilder`, `sinkHandleFlush`, `BuilderInput`, and `FlushInput` + [#336](https://github.com/snoyberg/conduit/pull/336) +* `withSinkFileCautious` + +## 1.2.1 + +* `Data.Conduit.Process.Typed` +* `withSourceFile`, `withSinkFile`, and `withSinkFileBuilder` + +## 1.2.0 + +* Added the `posOffset` field to the + `Data.Conduit.Attoparsec.Position` data type + [#331](https://github.com/snoyberg/conduit/issues/331). + +## 1.1.17 + +* Speed up `sinkHandle` by not flushing after every output operation. + [#322](https://github.com/snoyberg/conduit/issues/322) + +## 1.1.16 + +* Add `Data.Conduit.Foldl` adapter module for the `foldl` + package. [#312](https://github.com/snoyberg/conduit/pull/312) + +## 1.1.15 + +* `sinkTempFile` and `sinkSystemTempFile` + +## 1.1.14 + +* `sinkFileCautious` + +## 1.1.13.3 + +* `withCheckedProcessCleanup` properly closes opened `Handle`s + [#280](https://github.com/snoyberg/conduit/issues/280) + +## 1.1.13.2 + +* Fix alignment issues on non-X86 archs + +## 1.1.13.1 + +* Fix an incorrect comment + +## 1.1.13 + +* Add `sinkStorable` and `sinkStorableEx` + +## 1.1.12.1 + +* Fix build for GHC `<= 7.8` [#260](https://github.com/snoyberg/conduit/issues/260) +* Fix accidentally breaking change in `sourceProcessWithConsumer` type signature + +## 1.1.12 + +* Add sourceProcessWithStreams [#258](https://github.com/snoyberg/conduit/pull/258) + +## 1.1.11 + +* `withCheckedProcessCleanup` + +## 1.1.10.1 + +* Fix a leftovers bug in helperDecompress #254 + +## 1.1.10 + +* `multiple` combinator for `Data.Conduit.Zlib` [#254](https://github.com/snoyberg/conduit/issues/254) + +## 1.1.9.3 + +* Some typo fixes in docs + +## 1.1.9 + +* detectUtf [#217](https://github.com/snoyberg/conduit/pull/217) + +## 1.1.8 + +* Adding buffer size to sourceHandleRange [#213](https://github.com/snoyberg/conduit/pull/213) + +## 1.1.7.3 + +* Make Binary.lines O(n) instead of O(n^2) [#209](https://github.com/snoyberg/conduit/pull/209) + +## 1.1.7.2 + +* Fix for: Decompressing a specific amount of zlib data "eats" following data [#20](https://github.com/fpco/streaming-commons/issues/20) + +## 1.1.7 + +Add `Data.Conduit.ByteString.Builder` + +## 1.1.6 + +Generalized return type in `runGeneralTCPServer`. + +## 1.1.5 + +Added `sinkParserEither` ([pull request #189](https://github.com/snoyberg/conduit/pull/189)) diff --git a/Data/Conduit/Attoparsec.hs b/Data/Conduit/Attoparsec.hs new file mode 100644 index 0000000..69b02d8 --- /dev/null +++ b/Data/Conduit/Attoparsec.hs @@ -0,0 +1,246 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE RankNTypes #-} + +-- | +-- Copyright: 2011 Michael Snoyman, 2010 John Millikin +-- License: MIT +-- +-- Consume attoparsec parsers via conduit. +-- +-- This code was taken from attoparsec-enumerator and adapted for conduits. +module Data.Conduit.Attoparsec + ( -- * Sink + sinkParser + , sinkParserEither + -- * Conduit + , conduitParser + , conduitParserEither + + -- * Types + , ParseError (..) + , Position (..) + , PositionRange (..) + -- * Classes + , AttoparsecInput + ) where + +import Control.Exception (Exception) +import Control.Monad (unless) +import qualified Data.ByteString as B +import qualified Data.Text as T +import qualified Data.Text.Internal as TI +import Data.Typeable (Typeable) +import Prelude hiding (lines) + +import qualified Data.Attoparsec.ByteString +import qualified Data.Attoparsec.Text +import qualified Data.Attoparsec.Types as A +import Data.Conduit +import Control.Monad.Trans.Resource (MonadThrow, monadThrow) + +-- | The context and message from a 'A.Fail' value. +data ParseError = ParseError + { errorContexts :: [String] + , errorMessage :: String + , errorPosition :: Position + } | DivergentParser + deriving (Show, Typeable) + +instance Exception ParseError + +data Position = Position + { posLine :: {-# UNPACK #-} !Int + , posCol :: {-# UNPACK #-} !Int + , posOffset :: {-# UNPACK #-} !Int + -- ^ @since 1.2.0 + } + deriving (Eq, Ord) + +instance Show Position where + show (Position l c off) = show l ++ ':' : show c ++ " (" ++ show off ++ ")" + +data PositionRange = PositionRange + { posRangeStart :: {-# UNPACK #-} !Position + , posRangeEnd :: {-# UNPACK #-} !Position + } + deriving (Eq, Ord) + +instance Show PositionRange where + show (PositionRange s e) = show s ++ '-' : show e + +-- | A class of types which may be consumed by an Attoparsec parser. +class AttoparsecInput a where + parseA :: A.Parser a b -> a -> A.IResult a b + feedA :: A.IResult a b -> a -> A.IResult a b + empty :: a + isNull :: a -> Bool + notEmpty :: [a] -> [a] + getLinesCols :: a -> Position + + -- | Return the beginning of the first input with the length of + -- the second input removed. Assumes the second string is shorter + -- than the first. + stripFromEnd :: a -> a -> a + +instance AttoparsecInput B.ByteString where + parseA = Data.Attoparsec.ByteString.parse + feedA = Data.Attoparsec.ByteString.feed + empty = B.empty + isNull = B.null + notEmpty = filter (not . B.null) + getLinesCols = B.foldl' f (Position 0 0 0) + where + f (Position l c o) ch + | ch == 10 = Position (l + 1) 0 (o + 1) + | otherwise = Position l (c + 1) (o + 1) + stripFromEnd b1 b2 = B.take (B.length b1 - B.length b2) b1 + +instance AttoparsecInput T.Text where + parseA = Data.Attoparsec.Text.parse + feedA = Data.Attoparsec.Text.feed + empty = T.empty + isNull = T.null + notEmpty = filter (not . T.null) + getLinesCols = T.foldl' f (Position 0 0 0) + where + f (Position l c o) ch + | ch == '\n' = Position (l + 1) 0 (o + 1) + | otherwise = Position l (c + 1) (o + 1) + stripFromEnd (TI.Text arr1 off1 len1) (TI.Text _ _ len2) = + TI.text arr1 off1 (len1 - len2) + +-- | Convert an Attoparsec 'A.Parser' into a 'Sink'. The parser will +-- be streamed bytes until it returns 'A.Done' or 'A.Fail'. +-- +-- If parsing fails, a 'ParseError' will be thrown with 'monadThrow'. +-- +-- Since 0.5.0 +sinkParser :: (AttoparsecInput a, MonadThrow m) => A.Parser a b -> Consumer a m b +sinkParser = fmap snd . sinkParserPosErr (Position 1 1 0) + +-- | Same as 'sinkParser', but we return an 'Either' type instead +-- of raising an exception. +-- +-- Since 1.1.5 +sinkParserEither :: (AttoparsecInput a, Monad m) => A.Parser a b -> Consumer a m (Either ParseError b) +sinkParserEither = (fmap.fmap) snd . sinkParserPos (Position 1 1 0) + + +-- | Consume a stream of parsed tokens, returning both the token and +-- the position it appears at. This function will raise a 'ParseError' +-- on bad input. +-- +-- Since 0.5.0 +conduitParser :: (AttoparsecInput a, MonadThrow m) => A.Parser a b -> Conduit a m (PositionRange, b) +conduitParser parser = + conduit $ Position 1 1 0 + where + conduit !pos = await >>= maybe (return ()) go + where + go x = do + leftover x + (!pos', !res) <- sinkParserPosErr pos parser + yield (PositionRange pos pos', res) + conduit pos' +{-# SPECIALIZE conduitParser + :: MonadThrow m + => A.Parser T.Text b + -> Conduit T.Text m (PositionRange, b) #-} +{-# SPECIALIZE conduitParser + :: MonadThrow m + => A.Parser B.ByteString b + -> Conduit B.ByteString m (PositionRange, b) #-} + + + +-- | Same as 'conduitParser', but we return an 'Either' type instead +-- of raising an exception. +conduitParserEither + :: (Monad m, AttoparsecInput a) + => A.Parser a b + -> Conduit a m (Either ParseError (PositionRange, b)) +conduitParserEither parser = + conduit $ Position 1 1 0 + where + conduit !pos = await >>= maybe (return ()) go + where + go x = do + leftover x + eres <- sinkParserPos pos parser + case eres of + Left e -> yield $ Left e + Right (!pos', !res) -> do + yield $! Right (PositionRange pos pos', res) + conduit pos' +{-# SPECIALIZE conduitParserEither + :: Monad m + => A.Parser T.Text b + -> Conduit T.Text m (Either ParseError (PositionRange, b)) #-} +{-# SPECIALIZE conduitParserEither + :: Monad m + => A.Parser B.ByteString b + -> Conduit B.ByteString m (Either ParseError (PositionRange, b)) #-} + + + + +sinkParserPosErr + :: (AttoparsecInput a, MonadThrow m) + => Position + -> A.Parser a b + -> Consumer a m (Position, b) +sinkParserPosErr pos0 p = sinkParserPos pos0 p >>= f + where + f (Left e) = monadThrow e + f (Right a) = return a +{-# INLINE sinkParserPosErr #-} + + +sinkParserPos + :: (AttoparsecInput a, Monad m) + => Position + -> A.Parser a b + -> Consumer a m (Either ParseError (Position, b)) +sinkParserPos pos0 p = sink empty pos0 (parseA p) + where + sink prev pos parser = await >>= maybe close push + where + push c + | isNull c = sink prev pos parser + | otherwise = go False c $ parser c + + close = go True prev (feedA (parser empty) empty) + + go end c (A.Done lo x) = do + let pos' + | end = pos + | otherwise = addLinesCols prev pos + y = stripFromEnd c lo + pos'' = addLinesCols y pos' + unless (isNull lo) $ leftover lo + pos'' `seq` return $! Right (pos'', x) + go end c (A.Fail rest contexts msg) = + let x = stripFromEnd c rest + pos' + | end = pos + | otherwise = addLinesCols prev pos + pos'' = addLinesCols x pos' + in pos'' `seq` return $! Left (ParseError contexts msg pos'') + go end c (A.Partial parser') + | end = return $! Left DivergentParser + | otherwise = + pos' `seq` sink c pos' parser' + where + pos' = addLinesCols prev pos + + addLinesCols :: AttoparsecInput a => a -> Position -> Position + addLinesCols x (Position lines cols off) = + lines' `seq` cols' `seq` off' `seq` Position lines' cols' off' + where + Position dlines dcols doff = getLinesCols x + lines' = lines + dlines + cols' = (if dlines > 0 then 1 else cols) + dcols + off' = off + doff +{-# INLINE sinkParserPos #-} diff --git a/Data/Conduit/Binary.hs b/Data/Conduit/Binary.hs new file mode 100644 index 0000000..76310a6 --- /dev/null +++ b/Data/Conduit/Binary.hs @@ -0,0 +1,688 @@ +{-# LANGUAGE CPP, RankNTypes #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE ScopedTypeVariables #-} +-- | Functions for interacting with bytes. +-- +-- For many purposes, it's recommended to use the conduit-combinators library, +-- which provides a more complete set of functions. +module Data.Conduit.Binary + ( -- * Files and @Handle@s + + -- | Note that most of these functions live in the @MonadResource@ monad + -- to ensure resource finalization even in the presence of exceptions. In + -- order to run such code, you will need to use @runResourceT@. + + -- ** Sources + sourceFile + , sourceHandle + , sourceHandleUnsafe + , sourceIOHandle + , sourceFileRange + , sourceHandleRange + , sourceHandleRangeWithBuffer + , withSourceFile + -- ** Sinks + , sinkFile + , sinkFileCautious + , sinkTempFile + , sinkSystemTempFile + , sinkHandle + , sinkIOHandle + , sinkHandleBuilder + , sinkHandleFlush + , withSinkFile + , withSinkFileBuilder + , withSinkFileCautious + -- ** Conduits + , conduitFile + , conduitHandle + -- * Utilities + -- ** Sources + , sourceLbs + -- ** Sinks + , head + , dropWhile + , take + , drop + , sinkCacheLength + , sinkLbs + , mapM_ + -- *** Storable + , sinkStorable + , sinkStorableEx + -- ** Conduits + , isolate + , takeWhile + , Data.Conduit.Binary.lines + ) where + +import qualified Data.ByteString.Builder as BB +import qualified Data.Streaming.FileRead as FR +import Prelude hiding (head, take, drop, takeWhile, dropWhile, mapM_) +import qualified Data.ByteString as S +import Data.ByteString.Unsafe (unsafeUseAsCString) +import qualified Data.ByteString.Lazy as L +import Data.Conduit +import Data.Conduit.List (sourceList, consume) +import qualified Data.Conduit.List as CL +import Control.Exception (assert, finally, bracket) +import Control.Monad (unless, when) +import Control.Monad.IO.Class (liftIO, MonadIO) +import Control.Monad.IO.Unlift +import Control.Monad.Trans.Resource (allocate, release) +import Control.Monad.Trans.Class (lift) +import qualified System.IO as IO +import Data.Word (Word8, Word64) +#if (__GLASGOW_HASKELL__ < 710) +import Control.Applicative ((<$>)) +#endif +import System.Directory (getTemporaryDirectory, removeFile) +import Data.ByteString.Lazy.Internal (defaultChunkSize) +import Data.ByteString.Internal (ByteString (PS), inlinePerformIO) +import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr) +import Foreign.ForeignPtr (touchForeignPtr) +import Foreign.Ptr (plusPtr, castPtr) +import Foreign.Storable (Storable, peek, sizeOf) +import GHC.ForeignPtr (mallocPlainForeignPtrBytes) +import Control.Monad.Trans.Resource (MonadResource) +import Control.Monad.Catch (MonadThrow (..)) +import Control.Exception (Exception) +import Data.Typeable (Typeable) +import Foreign.Ptr (Ptr) +#ifndef ALLOW_UNALIGNED_ACCESS +import Foreign.Marshal (alloca, copyBytes) +#endif +import System.Directory (renameFile) +import System.FilePath (takeDirectory, takeFileName, (<.>)) +import System.IO (hClose, openBinaryTempFile) +import Control.Exception (throwIO, catch) +import System.IO.Error (isDoesNotExistError) +import qualified Data.ByteString.Builder as BB + +-- | Stream the contents of a file as binary data. +-- +-- Since 0.3.0 +sourceFile :: MonadResource m + => FilePath + -> Producer m S.ByteString +sourceFile fp = + bracketP + (FR.openFile fp) + FR.closeFile + loop + where + loop h = do + bs <- liftIO $ FR.readChunk h + unless (S.null bs) $ do + yield bs + loop h + +-- | Stream the contents of a 'IO.Handle' as binary data. Note that this +-- function will /not/ automatically close the @Handle@ when processing +-- completes, since it did not acquire the @Handle@ in the first place. +-- +-- Since 0.3.0 +sourceHandle :: MonadIO m + => IO.Handle + -> Producer m S.ByteString +sourceHandle h = + loop + where + loop = do + bs <- liftIO (S.hGetSome h defaultChunkSize) + if S.null bs + then return () + else yield bs >> loop + +-- | Same as @sourceHandle@, but instead of allocating a new buffer for each +-- incoming chunk of data, reuses the same buffer. Therefore, the @ByteString@s +-- yielded by this function are not referentially transparent between two +-- different @yield@s. +-- +-- This function will be slightly more efficient than @sourceHandle@ by +-- avoiding allocations and reducing garbage collections, but should only be +-- used if you can guarantee that you do not reuse a @ByteString@ (or any slice +-- thereof) between two calls to @await@. +-- +-- Since 1.0.12 +sourceHandleUnsafe :: MonadIO m => IO.Handle -> Source m ByteString +sourceHandleUnsafe handle = do + fptr <- liftIO $ mallocPlainForeignPtrBytes defaultChunkSize + let ptr = unsafeForeignPtrToPtr fptr + loop = do + count <- liftIO $ IO.hGetBuf handle ptr defaultChunkSize + when (count > 0) $ do + yield (PS fptr 0 count) + loop + + loop + + liftIO $ touchForeignPtr fptr + +-- | An alternative to 'sourceHandle'. +-- Instead of taking a pre-opened 'IO.Handle', it takes an action that opens +-- a 'IO.Handle' (in read mode), so that it can open it only when needed +-- and close it as soon as possible. +-- +-- Since 0.3.0 +sourceIOHandle :: MonadResource m + => IO IO.Handle + -> Producer m S.ByteString +sourceIOHandle alloc = bracketP alloc IO.hClose sourceHandle + +-- | Stream all incoming data to the given 'IO.Handle'. Note that this function +-- does /not/ flush and will /not/ close the @Handle@ when processing completes. +-- +-- Since 0.3.0 +sinkHandle :: MonadIO m + => IO.Handle + -> Consumer S.ByteString m () +sinkHandle h = awaitForever (liftIO . S.hPut h) + +-- | Stream incoming builders, executing them directly on the buffer of the +-- given 'IO.Handle'. Note that this function does /not/ automatically close the +-- @Handle@ when processing completes. +-- Pass 'Data.ByteString.Builder.Extra.flush' to flush the buffer. +-- +-- @since 1.2.2 +sinkHandleBuilder :: MonadIO m => IO.Handle -> ConduitM BB.Builder o m () +sinkHandleBuilder h = awaitForever (liftIO . BB.hPutBuilder h) + +-- | Stream incoming @Flush@es, executing them on @IO.Handle@ +-- Note that this function does /not/ automatically close the @Handle@ when +-- processing completes +-- +-- @since 1.2.2 +sinkHandleFlush :: MonadIO m + => IO.Handle + -> ConduitM (Flush S.ByteString) o m () +sinkHandleFlush h = + awaitForever $ \mbs -> liftIO $ + case mbs of + Chunk bs -> S.hPut h bs + Flush -> IO.hFlush h + +-- | An alternative to 'sinkHandle'. +-- Instead of taking a pre-opened 'IO.Handle', it takes an action that opens +-- a 'IO.Handle' (in write mode), so that it can open it only when needed +-- and close it as soon as possible. +-- +-- Since 0.3.0 +sinkIOHandle :: MonadResource m + => IO IO.Handle + -> Consumer S.ByteString m () +sinkIOHandle alloc = bracketP alloc IO.hClose sinkHandle + +-- | Stream the contents of a file as binary data, starting from a certain +-- offset and only consuming up to a certain number of bytes. +-- +-- Since 0.3.0 +sourceFileRange :: MonadResource m + => FilePath + -> Maybe Integer -- ^ Offset + -> Maybe Integer -- ^ Maximum count + -> Producer m S.ByteString +sourceFileRange fp offset count = bracketP + (IO.openBinaryFile fp IO.ReadMode) + IO.hClose + (\h -> sourceHandleRange h offset count) + +-- | Stream the contents of a handle as binary data, starting from a certain +-- offset and only consuming up to a certain number of bytes. +-- +-- Since 1.0.8 +sourceHandleRange :: MonadIO m + => IO.Handle + -> Maybe Integer -- ^ Offset + -> Maybe Integer -- ^ Maximum count + -> Producer m S.ByteString +sourceHandleRange handle offset count = + sourceHandleRangeWithBuffer handle offset count defaultChunkSize + +-- | Stream the contents of a handle as binary data, starting from a certain +-- offset and only consuming up to a certain number of bytes. This function +-- consumes chunks as specified by the buffer size. +-- +-- Since 1.1.8 +sourceHandleRangeWithBuffer :: MonadIO m + => IO.Handle + -> Maybe Integer -- ^ Offset + -> Maybe Integer -- ^ Maximum count + -> Int -- ^ Buffer size + -> Producer m S.ByteString +sourceHandleRangeWithBuffer handle offset count buffer = do + case offset of + Nothing -> return () + Just off -> liftIO $ IO.hSeek handle IO.AbsoluteSeek off + case count of + Nothing -> pullUnlimited + Just c -> pullLimited (fromInteger c) + where + pullUnlimited = do + bs <- liftIO $ S.hGetSome handle buffer + if S.null bs + then return () + else do + yield bs + pullUnlimited + + pullLimited c = do + bs <- liftIO $ S.hGetSome handle (min c buffer) + let c' = c - S.length bs + assert (c' >= 0) $ + if S.null bs + then return () + else do + yield bs + pullLimited c' + +-- | Stream all incoming data to the given file. +-- +-- Since 0.3.0 +sinkFile :: MonadResource m + => FilePath + -> Consumer S.ByteString m () +sinkFile fp = sinkIOHandle (IO.openBinaryFile fp IO.WriteMode) + +-- | Cautious version of 'sinkFile'. The idea here is to stream the +-- values to a temporary file in the same directory of the destination +-- file, and only on successfully writing the entire file, moves it +-- atomically to the destination path. +-- +-- In the event of an exception occurring, the temporary file will be +-- deleted and no move will be made. If the application shuts down +-- without running exception handling (such as machine failure or a +-- SIGKILL), the temporary file will remain and the destination file +-- will be untouched. +-- +-- @since 1.1.14 +sinkFileCautious + :: MonadResource m + => FilePath + -> ConduitM S.ByteString o m () +sinkFileCautious fp = + bracketP (cautiousAcquire fp) cautiousCleanup inner + where + inner (tmpFP, h) = do + sinkHandle h + liftIO $ do + hClose h + renameFile tmpFP fp + +-- | Stream data into a temporary file in the given directory with the +-- given filename pattern, and return the temporary filename. The +-- temporary file will be automatically deleted when exiting the +-- active 'ResourceT' block, if it still exists. +-- +-- @since 1.1.15 +sinkTempFile :: MonadResource m + => FilePath -- ^ temp directory + -> String -- ^ filename pattern + -> ConduitM ByteString o m FilePath +sinkTempFile tmpdir pattern = do + (_releaseKey, (fp, h)) <- allocate + (IO.openBinaryTempFile tmpdir pattern) + (\(fp, h) -> IO.hClose h `finally` (removeFile fp `Control.Exception.catch` \e -> + if isDoesNotExistError e + then return () + else throwIO e)) + sinkHandle h + liftIO $ IO.hClose h + return fp + +-- | Same as 'sinkTempFile', but will use the default temp file +-- directory for the system as the first argument. +-- +-- @since 1.1.15 +sinkSystemTempFile + :: MonadResource m + => String -- ^ filename pattern + -> ConduitM ByteString o m FilePath +sinkSystemTempFile pattern = do + dir <- liftIO getTemporaryDirectory + sinkTempFile dir pattern + +-- | Stream the contents of the input to a file, and also send it along the +-- pipeline. Similar in concept to the Unix command @tee@. +-- +-- Since 0.3.0 +conduitFile :: MonadResource m + => FilePath + -> Conduit S.ByteString m S.ByteString +conduitFile fp = bracketP + (IO.openBinaryFile fp IO.WriteMode) + IO.hClose + conduitHandle + +-- | Stream the contents of the input to a @Handle@, and also send it along the +-- pipeline. Similar in concept to the Unix command @tee@. Like @sourceHandle@, +-- does not close the handle on completion. Related to: @conduitFile@. +-- +-- Since 1.0.9 +conduitHandle :: MonadIO m => IO.Handle -> Conduit S.ByteString m S.ByteString +conduitHandle h = awaitForever $ \bs -> liftIO (S.hPut h bs) >> yield bs + +-- | Ensure that only up to the given number of bytes are consumed by the inner +-- sink. Note that this does /not/ ensure that all of those bytes are in fact +-- consumed. +-- +-- Since 0.3.0 +isolate :: Monad m + => Int + -> Conduit S.ByteString m S.ByteString +isolate = + loop + where + loop 0 = return () + loop count = do + mbs <- await + case mbs of + Nothing -> return () + Just bs -> do + let (a, b) = S.splitAt count bs + case count - S.length a of + 0 -> do + unless (S.null b) $ leftover b + yield a + count' -> assert (S.null b) $ yield a >> loop count' + +-- | Return the next byte from the stream, if available. +-- +-- Since 0.3.0 +head :: Monad m => Consumer S.ByteString m (Maybe Word8) +head = do + mbs <- await + case mbs of + Nothing -> return Nothing + Just bs -> + case S.uncons bs of + Nothing -> head + Just (w, bs') -> leftover bs' >> return (Just w) + +-- | Return all bytes while the predicate returns @True@. +-- +-- Since 0.3.0 +takeWhile :: Monad m => (Word8 -> Bool) -> Conduit S.ByteString m S.ByteString +takeWhile p = + loop + where + loop = await >>= maybe (return ()) go + + go bs + | S.null x = next + | otherwise = yield x >> next + where + next = if S.null y then loop else leftover y + (x, y) = S.span p bs + +-- | Ignore all bytes while the predicate returns @True@. +-- +-- Since 0.3.0 +dropWhile :: Monad m => (Word8 -> Bool) -> Consumer S.ByteString m () +dropWhile p = + loop + where + loop = do + mbs <- await + case S.dropWhile p <$> mbs of + Nothing -> return () + Just bs + | S.null bs -> loop + | otherwise -> leftover bs + +-- | Take the given number of bytes, if available. +-- +-- Since 0.3.0 +take :: Monad m => Int -> Consumer S.ByteString m L.ByteString +take 0 = return L.empty +take n0 = go n0 id + where + go n front = + await >>= maybe (return $ L.fromChunks $ front []) go' + where + go' bs = + case S.length bs `compare` n of + LT -> go (n - S.length bs) (front . (bs:)) + EQ -> return $ L.fromChunks $ front [bs] + GT -> + let (x, y) = S.splitAt n bs + in assert (not $ S.null y) $ leftover y >> return (L.fromChunks $ front [x]) + +-- | Drop up to the given number of bytes. +-- +-- Since 0.5.0 +drop :: Monad m => Int -> Consumer S.ByteString m () +drop 0 = return () +drop n0 = go n0 + where + go n = + await >>= maybe (return ()) go' + where + go' bs = + case S.length bs `compare` n of + LT -> go (n - S.length bs) + EQ -> return () + GT -> + let y = S.drop n bs + in assert (not $ S.null y) $ leftover y >> return () + +-- | Split the input bytes into lines. In other words, split on the LF byte +-- (10), and strip it from the output. +-- +-- Since 0.3.0 +lines :: Monad m => Conduit S.ByteString m S.ByteString +lines = + loop [] + where + loop acc = await >>= maybe (finish acc) (go acc) + + finish acc = + let final = S.concat $ reverse acc + in unless (S.null final) (yield final) + + go acc more = + case S.uncons second of + Just (_, second') -> yield (S.concat $ reverse $ first:acc) >> go [] second' + Nothing -> loop $ more:acc + where + (first, second) = S.break (== 10) more + +-- | Stream the chunks from a lazy bytestring. +-- +-- Since 0.5.0 +sourceLbs :: Monad m => L.ByteString -> Producer m S.ByteString +sourceLbs = sourceList . L.toChunks + +-- | Stream the input data into a temp file and count the number of bytes +-- present. When complete, return a new @Source@ reading from the temp file +-- together with the length of the input in bytes. +-- +-- All resources will be cleaned up automatically. +-- +-- Since 1.0.5 +sinkCacheLength :: (MonadResource m1, MonadResource m2) + => Sink S.ByteString m1 (Word64, Source m2 S.ByteString) +sinkCacheLength = do + tmpdir <- liftIO getTemporaryDirectory + (releaseKey, (fp, h)) <- allocate + (IO.openBinaryTempFile tmpdir "conduit.cache") + (\(fp, h) -> IO.hClose h `finally` removeFile fp) + len <- sinkHandleLen h + liftIO $ IO.hClose h + return (len, sourceFile fp >> release releaseKey) + where + sinkHandleLen :: MonadResource m => IO.Handle -> Sink S.ByteString m Word64 + sinkHandleLen h = + loop 0 + where + loop x = + await >>= maybe (return x) go + where + go bs = do + liftIO $ S.hPut h bs + loop $ x + fromIntegral (S.length bs) + +-- | Consume a stream of input into a lazy bytestring. Note that no lazy I\/O +-- is performed, but rather all content is read into memory strictly. +-- +-- Since 1.0.5 +sinkLbs :: Monad m => Sink S.ByteString m L.ByteString +sinkLbs = fmap L.fromChunks consume + +mapM_BS :: Monad m => (Word8 -> m ()) -> S.ByteString -> m () +mapM_BS f (PS fptr offset len) = do + let start = unsafeForeignPtrToPtr fptr `plusPtr` offset + end = start `plusPtr` len + loop ptr + | ptr >= end = inlinePerformIO (touchForeignPtr fptr) `seq` return () + | otherwise = do + f (inlinePerformIO (peek ptr)) + loop (ptr `plusPtr` 1) + loop start +{-# INLINE mapM_BS #-} + +-- | Perform a computation on each @Word8@ in a stream. +-- +-- Since 1.0.10 +mapM_ :: Monad m => (Word8 -> m ()) -> Consumer S.ByteString m () +mapM_ f = awaitForever (lift . mapM_BS f) +{-# INLINE mapM_ #-} + +-- | Consume some instance of @Storable@ from the incoming byte stream. In the +-- event of insufficient bytes in the stream, returns a @Nothing@ and returns +-- all unused input as leftovers. +-- +-- @since 1.1.13 +sinkStorable :: (Monad m, Storable a) => Consumer S.ByteString m (Maybe a) +sinkStorable = sinkStorableHelper Just (return Nothing) + +-- | Same as 'sinkStorable', but throws a 'SinkStorableInsufficientBytes' +-- exception (via 'throwM') in the event of insufficient bytes. This can be +-- more efficient to use than 'sinkStorable' as it avoids the need to +-- construct/deconstruct a @Maybe@ wrapper in the success case. +-- +-- @since 1.1.13 +sinkStorableEx :: (MonadThrow m, Storable a) => Consumer S.ByteString m a +sinkStorableEx = sinkStorableHelper id (throwM SinkStorableInsufficientBytes) + +sinkStorableHelper :: forall m a b. (Monad m, Storable a) + => (a -> b) + -> (Consumer S.ByteString m b) + -> Consumer S.ByteString m b +sinkStorableHelper wrap failure = do + start + where + size = sizeOf (undefined :: a) + + -- try the optimal case: next chunk has all the data we need + start = do + mbs <- await + case mbs of + Nothing -> failure + Just bs + | S.null bs -> start + | otherwise -> + case compare (S.length bs) size of + LT -> do + -- looks like we're stuck concating + leftover bs + lbs <- take size + let bs = S.concat $ L.toChunks lbs + case compare (S.length bs) size of + LT -> do + leftover bs + failure + EQ -> process bs + GT -> assert False (process bs) + EQ -> process bs + GT -> do + let (x, y) = S.splitAt size bs + leftover y + process x + + -- Given a bytestring of exactly the correct size, grab the value + process bs = return $! wrap $! inlinePerformIO $! + unsafeUseAsCString bs (safePeek undefined . castPtr) + + safePeek :: a -> Ptr a -> IO a +#ifdef ALLOW_UNALIGNED_ACCESS + safePeek _ = peek +#else + safePeek val ptr = alloca (\t -> copyBytes t ptr (sizeOf val) >> peek t) +#endif +{-# INLINE sinkStorableHelper #-} + +data SinkStorableException = SinkStorableInsufficientBytes + deriving (Show, Typeable) +instance Exception SinkStorableException + +-- | Like 'IO.withBinaryFile', but provides a source to read bytes from. +-- +-- @since 1.2.1 +withSourceFile + :: (MonadUnliftIO m, MonadIO n) + => FilePath + -> (ConduitM i ByteString n () -> m a) + -> m a +withSourceFile fp inner = + withRunInIO $ \run -> + IO.withBinaryFile fp IO.ReadMode $ + run . inner . sourceHandle + +-- | Like 'IO.withBinaryFile', but provides a sink to write bytes to. +-- +-- @since 1.2.1 +withSinkFile + :: (MonadUnliftIO m, MonadIO n) + => FilePath + -> (ConduitM ByteString o n () -> m a) + -> m a +withSinkFile fp inner = + withRunInIO $ \run -> + IO.withBinaryFile fp IO.ReadMode $ + run . inner . sinkHandle + +-- | Same as 'withSinkFile', but lets you use a 'BB.Builder'. +-- +-- @since 1.2.1 +withSinkFileBuilder + :: (MonadUnliftIO m, MonadIO n) + => FilePath + -> (ConduitM BB.Builder o n () -> m a) + -> m a +withSinkFileBuilder fp inner = + withRunInIO $ \run -> + IO.withBinaryFile fp IO.WriteMode $ \h -> + run $ inner $ CL.mapM_ (liftIO . BB.hPutBuilder h) + +-- | Like 'sinkFileCautious', but uses the @with@ pattern instead of +-- @MonadResource@. +-- +-- @since 1.2.2 +withSinkFileCautious + :: (MonadUnliftIO m, MonadIO n) + => FilePath + -> (ConduitM S.ByteString o n () -> m a) + -> m a +withSinkFileCautious fp inner = + withRunInIO $ \run -> bracket + (cautiousAcquire fp) + cautiousCleanup + (\(tmpFP, h) -> do + a <- run $ inner $ sinkHandle h + hClose h + renameFile tmpFP fp + return a) + +-- | Helper function for Cautious functions above, do not export! +cautiousAcquire :: FilePath -> IO (FilePath, IO.Handle) +cautiousAcquire fp = openBinaryTempFile (takeDirectory fp) (takeFileName fp <.> "tmp") + +-- | Helper function for Cautious functions above, do not export! +cautiousCleanup :: (FilePath, IO.Handle) -> IO () +cautiousCleanup (tmpFP, h) = do + hClose h + removeFile tmpFP `Control.Exception.catch` \e -> + if isDoesNotExistError e + then return () + else throwIO e diff --git a/Data/Conduit/Blaze.hs b/Data/Conduit/Blaze.hs new file mode 100644 index 0000000..ca5d06f --- /dev/null +++ b/Data/Conduit/Blaze.hs @@ -0,0 +1,103 @@ +-- | Convert a stream of blaze-builder @Builder@s into a stream of @ByteString@s. +-- +-- Adapted from blaze-builder-enumerator, written by myself and Simon Meier. +-- +-- Note that the functions here can work in any monad built on top of @IO@ or +-- @ST@. +-- +-- Since 1.1.7.0, the functions here call their counterparts in +-- "Data.Conduit.ByteString.Builder", which work with both +-- 'Data.ByteString.Builder.Builder' and blaze-builder 0.3's +-- 'Blaze.ByteString.Builder.Builder'. +module Data.Conduit.Blaze + ( + + -- * Conduits from builders to bytestrings + builderToByteString + , unsafeBuilderToByteString + , builderToByteStringWith + + -- ** Flush + , builderToByteStringFlush + , builderToByteStringWithFlush + + -- * Buffers + , B.Buffer + + -- ** Status information + , B.freeSize + , B.sliceSize + , B.bufferSize + + -- ** Creation and modification + , B.allocBuffer + , B.reuseBuffer + , B.nextSlice + + -- ** Conversion to bytestings + , B.unsafeFreezeBuffer + , B.unsafeFreezeNonEmptyBuffer + + -- * Buffer allocation strategies + , B.BufferAllocStrategy + , B.allNewBuffersStrategy + , B.reuseBufferStrategy + ) where + +import Data.Conduit + +import qualified Data.ByteString as S + +import Blaze.ByteString.Builder (Builder) +import Control.Monad.Primitive (PrimMonad) +import Control.Monad.Base (MonadBase) +import Data.Streaming.Blaze + +import qualified Data.Conduit.ByteString.Builder as B + +-- | Incrementally execute builders and pass on the filled chunks as +-- bytestrings. +builderToByteString :: (MonadBase base m, PrimMonad base) => Conduit Builder m S.ByteString +builderToByteString = B.builderToByteString +{-# INLINE builderToByteString #-} + +-- | +-- +-- Since 0.0.2 +builderToByteStringFlush :: (MonadBase base m, PrimMonad base) => Conduit (Flush Builder) m (Flush S.ByteString) +builderToByteStringFlush = B.builderToByteStringFlush +{-# INLINE builderToByteStringFlush #-} + +-- | Incrementally execute builders on the given buffer and pass on the filled +-- chunks as bytestrings. Note that, if the given buffer is too small for the +-- execution of a build step, a larger one will be allocated. +-- +-- WARNING: This conduit yields bytestrings that are NOT +-- referentially transparent. Their content will be overwritten as soon +-- as control is returned from the inner sink! +unsafeBuilderToByteString :: (MonadBase base m, PrimMonad base) + => IO Buffer -- action yielding the inital buffer. + -> Conduit Builder m S.ByteString +unsafeBuilderToByteString = B.unsafeBuilderToByteString +{-# INLINE unsafeBuilderToByteString #-} + + +-- | A conduit that incrementally executes builders and passes on the +-- filled chunks as bytestrings to an inner sink. +-- +-- INV: All bytestrings passed to the inner sink are non-empty. +builderToByteStringWith :: (MonadBase base m, PrimMonad base) + => BufferAllocStrategy + -> Conduit Builder m S.ByteString +builderToByteStringWith = B.builderToByteStringWith +{-# INLINE builderToByteStringWith #-} + +-- | +-- +-- Since 0.0.2 +builderToByteStringWithFlush + :: (MonadBase base m, PrimMonad base) + => BufferAllocStrategy + -> Conduit (Flush Builder) m (Flush S.ByteString) +builderToByteStringWithFlush = B.builderToByteStringWithFlush +{-# INLINE builderToByteStringWithFlush #-} diff --git a/Data/Conduit/ByteString/Builder.hs b/Data/Conduit/ByteString/Builder.hs new file mode 100644 index 0000000..4ff7709 --- /dev/null +++ b/Data/Conduit/ByteString/Builder.hs @@ -0,0 +1,147 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE RankNTypes #-} +-- | Convert a stream of blaze-builder @Builder@s into a stream of @ByteString@s. +-- +-- Works with both blaze-builder < 0.4's @Builder@s and +-- 'Data.ByteString.Builder.Builder'. +-- +-- Adapted from blaze-builder-enumerator, written by myself and Simon Meier. +-- +-- Note that the functions here can work in any monad built on top of @IO@ or +-- @ST@. +-- +-- Since 1.1.7.0 +-- +module Data.Conduit.ByteString.Builder + ( + + -- * Conduits from builders to bytestrings + builderToByteString + , unsafeBuilderToByteString + , builderToByteStringWith + + -- ** Flush + , builderToByteStringFlush + , builderToByteStringWithFlush + + -- * Buffers + , Buffer + + -- ** Status information + , freeSize + , sliceSize + , bufferSize + + -- ** Creation and modification + , allocBuffer + , reuseBuffer + , nextSlice + + -- ** Conversion to bytestings + , unsafeFreezeBuffer + , unsafeFreezeNonEmptyBuffer + + -- * Buffer allocation strategies + , BufferAllocStrategy + , allNewBuffersStrategy + , reuseBufferStrategy + ) where + +import Data.Conduit +import Control.Monad (unless, liftM) +import Control.Monad.Trans.Class (lift, MonadTrans) + +import qualified Data.ByteString as S + +import Control.Monad.Primitive (PrimMonad, unsafePrimToPrim) +import Control.Monad.Base (MonadBase, liftBase) +import Data.Streaming.ByteString.Builder.Class + +unsafeLiftIO :: (MonadBase base m, PrimMonad base) => IO a -> m a +unsafeLiftIO = liftBase . unsafePrimToPrim + +-- | Incrementally execute builders and pass on the filled chunks as +-- bytestrings. +builderToByteString :: (MonadBase base m, PrimMonad base, StreamingBuilder b) + => Conduit b m S.ByteString +builderToByteString = + builderToByteStringWith defaultStrategy +{-# INLINE builderToByteString #-} + +-- | +-- +-- Since 0.0.2 +builderToByteStringFlush :: (MonadBase base m, PrimMonad base, StreamingBuilder b) + => Conduit (Flush b) m (Flush S.ByteString) +builderToByteStringFlush = + builderToByteStringWithFlush defaultStrategy +{-# INLINE builderToByteStringFlush #-} + +-- | Incrementally execute builders on the given buffer and pass on the filled +-- chunks as bytestrings. Note that, if the given buffer is too small for the +-- execution of a build step, a larger one will be allocated. +-- +-- WARNING: This conduit yields bytestrings that are NOT +-- referentially transparent. Their content will be overwritten as soon +-- as control is returned from the inner sink! +unsafeBuilderToByteString :: (MonadBase base m, PrimMonad base, StreamingBuilder b) + => IO Buffer -- action yielding the inital buffer. + -> Conduit b m S.ByteString +unsafeBuilderToByteString = builderToByteStringWith . reuseBufferStrategy +{-# INLINE unsafeBuilderToByteString #-} + + +-- | A conduit that incrementally executes builders and passes on the +-- filled chunks as bytestrings to an inner sink. +-- +-- INV: All bytestrings passed to the inner sink are non-empty. +builderToByteStringWith :: (MonadBase base m, PrimMonad base, StreamingBuilder b) + => BufferAllocStrategy + -> Conduit b m S.ByteString +builderToByteStringWith = + helper (liftM (fmap Chunk) await) yield' + where + yield' Flush = return () + yield' (Chunk bs) = yield bs +{-# INLINE builderToByteStringWith #-} + +-- | +-- +-- Since 0.0.2 +builderToByteStringWithFlush + :: (MonadBase base m, PrimMonad base, StreamingBuilder b) + => BufferAllocStrategy + -> Conduit (Flush b) m (Flush S.ByteString) +builderToByteStringWithFlush = helper await yield +{-# INLINE builderToByteStringWithFlush #-} + +helper :: (MonadBase base m, PrimMonad base, Monad (t m), MonadTrans t, StreamingBuilder b) + => t m (Maybe (Flush b)) + -> (Flush S.ByteString -> t m ()) + -> BufferAllocStrategy + -> t m () +helper await' yield' strat = do + (recv, finish) <- lift $ unsafeLiftIO $ newBuilderRecv strat + let loop = await' >>= maybe finish' cont + finish' = do + mbs <- lift $ unsafeLiftIO finish + maybe (return ()) (yield' . Chunk) mbs + cont fbuilder = do + let builder = + case fbuilder of + Flush -> builderFlush + Chunk b -> b + popper <- lift $ unsafeLiftIO $ recv builder + let cont' = do + bs <- lift $ unsafeLiftIO popper + unless (S.null bs) $ do + yield' (Chunk bs) + cont' + cont' + case fbuilder of + Flush -> yield' Flush + Chunk _ -> return () + loop + loop +{-# INLINE helper #-} diff --git a/Data/Conduit/Filesystem.hs b/Data/Conduit/Filesystem.hs new file mode 100644 index 0000000..b023576 --- /dev/null +++ b/Data/Conduit/Filesystem.hs @@ -0,0 +1,66 @@ +{-# LANGUAGE RankNTypes #-} +module Data.Conduit.Filesystem + ( sourceDirectory + , sourceDirectoryDeep + ) where + +import Data.Conduit +import Control.Monad.Trans.Resource (MonadResource) +import Control.Monad.IO.Class (liftIO) +import System.FilePath (()) +import qualified Data.Streaming.Filesystem as F + +-- | Stream the contents of the given directory, without traversing deeply. +-- +-- This function will return /all/ of the contents of the directory, whether +-- they be files, directories, etc. +-- +-- Note that the generated filepaths will be the complete path, not just the +-- filename. In other words, if you have a directory @foo@ containing files +-- @bar@ and @baz@, and you use @sourceDirectory@ on @foo@, the results will be +-- @foo/bar@ and @foo/baz@. +-- +-- Since 1.1.0 +sourceDirectory :: MonadResource m => FilePath -> Producer m FilePath +sourceDirectory dir = + bracketP (F.openDirStream dir) F.closeDirStream go + where + go ds = + loop + where + loop = do + mfp <- liftIO $ F.readDirStream ds + case mfp of + Nothing -> return () + Just fp -> do + yield $ dir fp + loop + +-- | Deeply stream the contents of the given directory. +-- +-- This works the same as @sourceDirectory@, but will not return directories at +-- all. This function also takes an extra parameter to indicate whether +-- symlinks will be followed. +-- +-- Since 1.1.0 +sourceDirectoryDeep :: MonadResource m + => Bool -- ^ Follow directory symlinks + -> FilePath -- ^ Root directory + -> Producer m FilePath +sourceDirectoryDeep followSymlinks = + start + where + start :: MonadResource m => FilePath -> Producer m FilePath + start dir = sourceDirectory dir =$= awaitForever go + + go :: MonadResource m => FilePath -> Producer m FilePath + go fp = do + ft <- liftIO $ F.getFileType fp + case ft of + F.FTFile -> yield fp + F.FTFileSym -> yield fp + F.FTDirectory -> start fp + F.FTDirectorySym + | followSymlinks -> start fp + | otherwise -> return () + F.FTOther -> return () diff --git a/Data/Conduit/Foldl.hs b/Data/Conduit/Foldl.hs new file mode 100644 index 0000000..300e4ab --- /dev/null +++ b/Data/Conduit/Foldl.hs @@ -0,0 +1,26 @@ +{-# LANGUAGE RankNTypes #-} +-- | Adapter module to work with the package. +-- +-- @since 1.1.16 +module Data.Conduit.Foldl where + +import Data.Conduit +import Control.Monad.Trans.Class (lift) +import qualified Data.Conduit.List as CL + +-- | Convert a left fold into a 'Consumer'. This function is intended +-- to be used with @purely@ from the +-- package. +-- +-- @since 1.1.16 +sinkFold :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Consumer a m b +sinkFold combine seed extract = fmap extract (CL.fold combine seed) + +-- | Convert a monadic left fold into a 'Consumer'. This function is +-- intended to be used with @impurely@ from the +-- package. +-- +-- @since 1.1.16 +sinkFoldM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Consumer a m b +sinkFoldM combine seed extract = + lift . extract =<< CL.foldM combine =<< lift seed diff --git a/Data/Conduit/Lazy.hs b/Data/Conduit/Lazy.hs new file mode 100644 index 0000000..d246e25 --- /dev/null +++ b/Data/Conduit/Lazy.hs @@ -0,0 +1,118 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE Trustworthy #-} + +{-# OPTIONS_GHC -fno-warn-deprecations #-} -- Suppress warnings around Control.Monad.Trans.Error +-- | Use lazy I\/O for consuming the contents of a source. Warning: All normal +-- warnings of lazy I\/O apply. In particular, if you are using this with a +-- @ResourceT@ transformer, you must force the list to be evaluated before +-- exiting the @ResourceT@. +module Data.Conduit.Lazy + ( lazyConsume + , MonadActive (..) + ) where + +import Data.Conduit +import Data.Conduit.Internal (Pipe (..), unConduitM) +import System.IO.Unsafe (unsafeInterleaveIO) + +import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp_) +import Control.Monad.Trans.Class (lift) +import Control.Monad.IO.Class (MonadIO, liftIO) + +import Control.Monad.Trans.Identity ( IdentityT) +import Control.Monad.Trans.List ( ListT ) +import Control.Monad.Trans.Maybe ( MaybeT ) +import Control.Monad.Trans.Error ( ErrorT, Error) +import Control.Monad.Trans.Reader ( ReaderT ) +import Control.Monad.Trans.State ( StateT ) +import Control.Monad.Trans.Writer ( WriterT ) +import Control.Monad.Trans.RWS ( RWST ) + +import qualified Control.Monad.Trans.RWS.Strict as Strict ( RWST ) +import qualified Control.Monad.Trans.State.Strict as Strict ( StateT ) +import qualified Control.Monad.Trans.Writer.Strict as Strict ( WriterT ) + +#if (__GLASGOW_HASKELL__ < 710) +import Data.Monoid (Monoid) +#endif +import Control.Monad.ST (ST) +import qualified Control.Monad.ST.Lazy as Lazy +import Data.Functor.Identity (Identity) +import Control.Monad.Trans.Resource.Internal (ResourceT (ResourceT), ReleaseMap (ReleaseMapClosed)) +import qualified Data.IORef as I + +-- | Use lazy I\/O to consume all elements from a @Source@. +-- +-- This function relies on 'monadActive' to determine if the underlying monadic +-- state has been closed. +-- +-- Since 0.3.0 +lazyConsume :: (MonadBaseControl IO m, MonadActive m) => Source m a -> m [a] +lazyConsume = +#if MIN_VERSION_conduit(1, 2, 0) + go . flip unConduitM Done +#else + go . unConduitM +#endif + where + go (Done _) = return [] + go (HaveOutput src _ x) = do + xs <- liftBaseOp_ unsafeInterleaveIO $ go src + return $ x : xs + go (PipeM msrc) = liftBaseOp_ unsafeInterleaveIO $ do + a <- monadActive + if a + then msrc >>= go + else return [] + go (NeedInput _ c) = go (c ()) + go (Leftover p _) = go p + +-- | Determine if some monad is still active. This is intended to prevent usage +-- of a monadic state after it has been closed. This is necessary for such +-- cases as lazy I\/O, where an unevaluated thunk may still refer to a +-- closed @ResourceT@. +-- +-- Since 0.3.0 +class Monad m => MonadActive m where + monadActive :: m Bool + +instance (MonadIO m, MonadActive m) => MonadActive (ResourceT m) where + monadActive = ResourceT $ \rmMap -> do + rm <- liftIO $ I.readIORef rmMap + case rm of + ReleaseMapClosed -> return False + _ -> monadActive -- recurse + +instance MonadActive Identity where + monadActive = return True + +instance MonadActive IO where + monadActive = return True + +instance MonadActive (ST s) where + monadActive = return True + +instance MonadActive (Lazy.ST s) where + monadActive = return True + +#define GO(T) instance MonadActive m => MonadActive (T m) where monadActive = lift monadActive +#define GOX(X, T) instance (X, MonadActive m) => MonadActive (T m) where monadActive = lift monadActive +GO(IdentityT) +GO(ListT) +GO(MaybeT) +GOX(Error e, ErrorT e) +GO(ReaderT r) +GO(StateT s) +GOX(Monoid w, WriterT w) +GOX(Monoid w, RWST r w s) +GOX(Monoid w, Strict.RWST r w s) +GO(Strict.StateT s) +GOX(Monoid w, Strict.WriterT w) +#undef GO +#undef GOX + +instance MonadActive m => MonadActive (Pipe l i o u m) where + monadActive = lift monadActive +instance MonadActive m => MonadActive (ConduitM i o m) where + monadActive = lift monadActive diff --git a/Data/Conduit/Network.hs b/Data/Conduit/Network.hs new file mode 100644 index 0000000..20b2037 --- /dev/null +++ b/Data/Conduit/Network.hs @@ -0,0 +1,157 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE CPP #-} +module Data.Conduit.Network + ( -- * Basic utilities + sourceSocket + , sinkSocket + -- * Simple TCP server/client interface. + , SN.AppData + , appSource + , appSink + , SN.appSockAddr + , SN.appLocalAddr + -- ** Server + , SN.ServerSettings + , serverSettings + , SN.runTCPServer + , SN.runTCPServerWithHandle + , forkTCPServer + , runGeneralTCPServer + -- ** Client + , SN.ClientSettings + , clientSettings + , SN.runTCPClient + , runGeneralTCPClient + -- ** Getters + , SN.getPort + , SN.getHost + , SN.getAfterBind + , SN.getNeedLocalAddr + -- ** Setters + , SN.setPort + , SN.setHost + , SN.setAfterBind + , SN.setNeedLocalAddr + -- * Types + , SN.HostPreference + ) where + +import Prelude +import Data.Conduit +import Network.Socket (Socket) +import Network.Socket.ByteString (sendAll) +import Data.ByteString (ByteString) +import qualified GHC.Conc as Conc (yield) +import qualified Data.ByteString as S +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad (unless, void) +import Control.Monad.Trans.Control (MonadBaseControl, control, liftBaseWith) +import Control.Monad.Trans.Class (lift) +import Control.Concurrent (forkIO, newEmptyMVar, putMVar, takeMVar, MVar, ThreadId) +import qualified Data.Streaming.Network as SN + +-- | Stream data from the socket. +-- +-- This function does /not/ automatically close the socket. +-- +-- Since 0.0.0 +sourceSocket :: MonadIO m => Socket -> Producer m ByteString +sourceSocket socket = + loop + where + loop = do + bs <- lift $ liftIO $ SN.safeRecv socket 4096 + if S.null bs + then return () + else yield bs >> loop + +-- | Stream data to the socket. +-- +-- This function does /not/ automatically close the socket. +-- +-- Since 0.0.0 +sinkSocket :: MonadIO m => Socket -> Consumer ByteString m () +sinkSocket socket = + loop + where + loop = await >>= maybe (return ()) (\bs -> lift (liftIO $ sendAll socket bs) >> loop) + +serverSettings :: Int -> SN.HostPreference -> SN.ServerSettings +serverSettings = SN.serverSettingsTCP + +clientSettings :: Int -> ByteString -> SN.ClientSettings +clientSettings = SN.clientSettingsTCP + +appSource :: (SN.HasReadWrite ad, MonadIO m) => ad -> Producer m ByteString +appSource ad = + loop + where + read' = SN.appRead ad + loop = do + bs <- liftIO read' + unless (S.null bs) $ do + yield bs + loop + +appSink :: (SN.HasReadWrite ad, MonadIO m) => ad -> Consumer ByteString m () +appSink ad = awaitForever $ \d -> liftIO $ SN.appWrite ad d >> Conc.yield + +addBoundSignal::MVar ()-> SN.ServerSettings -> SN.ServerSettings +addBoundSignal isBound set = SN.setAfterBind ( \socket -> originalAfterBind socket >> signalBound socket) set + where originalAfterBind :: Socket -> IO () + originalAfterBind = SN.getAfterBind set + signalBound :: Socket -> IO () + signalBound _socket = putMVar isBound () + +-- | Fork a TCP Server +-- +-- Will fork the runGeneralTCPServer function but will only return from +-- this call when the server is bound to the port and accepting incoming +-- connections. Will return the thread id of the server +-- +-- Since 1.1.4 +forkTCPServer :: MonadBaseControl IO m + => SN.ServerSettings + -> (SN.AppData -> m ()) + -> m ThreadId +forkTCPServer set f = + liftBaseWith $ \run -> do + isBound <- newEmptyMVar + let setWithWaitForBind = addBoundSignal isBound set + threadId <- forkIO . void . run $ runGeneralTCPServer setWithWaitForBind f + takeMVar isBound + return threadId + + + +-- | Run a general TCP server +-- +-- Same as 'SN.runTCPServer', except monad can be any instance of +-- 'MonadBaseControl' 'IO'. +-- +-- Note that any changes to the monadic state performed by individual +-- client handlers will be discarded. If you have mutable state you want +-- to share among multiple handlers, you need to use some kind of mutable +-- variables. +-- +-- Since 1.1.3 +runGeneralTCPServer :: MonadBaseControl IO m + => SN.ServerSettings + -> (SN.AppData -> m ()) + -> m a +runGeneralTCPServer set f = liftBaseWith $ \run -> + SN.runTCPServer set $ void . run . f + +-- | Run a general TCP client +-- +-- Same as 'SN.runTCPClient', except monad can be any instance of 'MonadBaseControl' 'IO'. +-- +-- Since 1.1.3 +runGeneralTCPClient :: MonadBaseControl IO m + => SN.ClientSettings + -> (SN.AppData -> m a) + -> m a +runGeneralTCPClient set f = control $ \run -> + SN.runTCPClient set $ run . f diff --git a/Data/Conduit/Network/UDP.hs b/Data/Conduit/Network/UDP.hs new file mode 100644 index 0000000..61576ba --- /dev/null +++ b/Data/Conduit/Network/UDP.hs @@ -0,0 +1,82 @@ +{-# LANGUAGE RankNTypes #-} +module Data.Conduit.Network.UDP + ( -- * UDP message representation + SN.Message (..) + -- * Basic utilities + , sourceSocket + , sinkSocket + , sinkAllSocket + , sinkToSocket + , sinkAllToSocket + -- * Helper Utilities + , SN.HostPreference + ) where + +import Data.Conduit +import Network.Socket (Socket) +import Network.Socket.ByteString (recvFrom, send, sendAll, sendTo, sendAllTo) +import Data.ByteString (ByteString) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad (void) +import Control.Monad.Trans.Class (lift) +import qualified Data.Streaming.Network as SN + +-- | Stream messages from the socket. +-- +-- The given @len@ defines the maximum packet size. Every produced item +-- contains the message payload and the origin address. +-- +-- This function does /not/ automatically close the socket. +sourceSocket :: MonadIO m => Socket -> Int -> Producer m SN.Message +sourceSocket socket len = loop + where + loop = do + (bs, addr) <- lift $ liftIO $ recvFrom socket len + yield (SN.Message bs addr) >> loop + +-- | Stream messages to the connected socket. +-- +-- The payload is sent using @send@, so some of it might be lost. +-- +-- This function does /not/ automatically close the socket. +sinkSocket :: MonadIO m => Socket -> Consumer ByteString m () +sinkSocket = sinkSocketHelper (\sock bs -> void $ send sock bs) + +-- | Stream messages to the connected socket. +-- +-- The payload is sent using @sendAll@, so it might end up in multiple packets. +-- +-- This function does /not/ automatically close the socket. +sinkAllSocket :: MonadIO m => Socket -> Consumer ByteString m () +sinkAllSocket = sinkSocketHelper sendAll + +-- | Stream messages to the socket. +-- +-- Every handled item contains the message payload and the destination +-- address. The payload is sent using @sendTo@, so some of it might be +-- lost. +-- +-- This function does /not/ automatically close the socket. +sinkToSocket :: MonadIO m => Socket -> Consumer SN.Message m () +sinkToSocket = sinkSocketHelper (\sock (SN.Message bs addr) -> void $ sendTo sock bs addr) + +-- | Stream messages to the socket. +-- +-- Every handled item contains the message payload and the destination +-- address. The payload is sent using @sendAllTo@, so it might end up in +-- multiple packets. +-- +-- This function does /not/ automatically close the socket. +sinkAllToSocket :: MonadIO m => Socket -> Consumer SN.Message m () +sinkAllToSocket = sinkSocketHelper (\sock (SN.Message bs addr) -> sendAllTo sock bs addr) + +-- Internal +sinkSocketHelper :: MonadIO m => (Socket -> a -> IO ()) + -> Socket + -> Consumer a m () +sinkSocketHelper act socket = loop + where + loop = await >>= maybe + (return ()) + (\a -> lift (liftIO $ act socket a) >> loop) +{-# INLINE sinkSocketHelper #-} diff --git a/Data/Conduit/Network/Unix.hs b/Data/Conduit/Network/Unix.hs new file mode 100644 index 0000000..9c0c235 --- /dev/null +++ b/Data/Conduit/Network/Unix.hs @@ -0,0 +1,36 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE PolyKinds #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +module Data.Conduit.Network.Unix + ( -- * Basic utilities + sourceSocket + , sinkSocket + -- * Simple server/client interface + , SN.AppDataUnix + , appSource + , appSink + -- ** Server + , SN.ServerSettingsUnix + , serverSettings + , SN.runUnixServer + -- ** Client + , SN.ClientSettingsUnix + , clientSettings + , SN.runUnixClient + -- ** Getters + , SN.getPath + , SN.getAfterBind + -- ** Setters + , SN.setPath + , SN.setAfterBind + ) where + +import Data.Conduit.Network (appSource, appSink, sourceSocket, sinkSocket) +import qualified Data.Streaming.Network as SN + +clientSettings :: FilePath -> SN.ClientSettingsUnix +clientSettings = SN.clientSettingsUnix + +serverSettings :: FilePath -> SN.ServerSettingsUnix +serverSettings = SN.serverSettingsUnix diff --git a/Data/Conduit/Process.hs b/Data/Conduit/Process.hs new file mode 100644 index 0000000..4293f00 --- /dev/null +++ b/Data/Conduit/Process.hs @@ -0,0 +1,178 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE CPP #-} +{-# OPTIONS_GHC -fno-warn-orphans #-} +-- | A full tutorial for this module is available at: +-- . +-- +-- Note that this is a very thin layer around the @Data.Streaming.Process@ module. In particular, it: +-- +-- * Provides orphan instances for conduit +-- +-- * Provides some useful helper functions +module Data.Conduit.Process + ( -- * Functions + sourceCmdWithConsumer + , sourceProcessWithConsumer + , sourceCmdWithStreams + , sourceProcessWithStreams + , withCheckedProcessCleanup + -- * Reexport + , module Data.Streaming.Process + ) where + +import Data.Streaming.Process +import Data.Streaming.Process.Internal +import System.Exit (ExitCode (..)) +import Control.Monad.IO.Class (MonadIO, liftIO) +import System.IO (hClose) +import Data.Conduit +import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush) +import Data.ByteString (ByteString) +import Data.ByteString.Builder (Builder) +import Control.Concurrent.Async (runConcurrently, Concurrently(..)) +import Control.Monad.Catch (MonadMask, onException, throwM, finally, bracket) +#if (__GLASGOW_HASKELL__ < 710) +import Control.Applicative ((<$>), (<*>)) +#endif + +instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where + isStdStream = (\(Just h) -> return $ sinkHandle h, Just CreatePipe) +instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where + isStdStream = (\(Just h) -> return (sinkHandle h, liftIO $ hClose h), Just CreatePipe) + +-- | Wrapper for input source which accepts 'Data.ByteString.Builder.Builder's. +-- You can pass 'Data.ByteString.Builder.Extra.flush' to flush the input. Note +-- that the pipe will /not/ automatically close when the processing completes. +-- +-- @since 1.2.2 +newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r) + +-- | Wrapper for input source which accepts @Flush@es. Note that the pipe +-- will /not/ automatically close then processing completes. +-- +-- @since 1.2.2 +newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r) + +instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where + isStdStream = (\(Just h) -> return $ BuilderInput $ sinkHandleBuilder h, Just CreatePipe) +instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where + isStdStream = (\(Just h) -> return (BuilderInput $ sinkHandleBuilder h, liftIO $ hClose h), Just CreatePipe) +instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where + isStdStream = (\(Just h) -> return $ FlushInput $ sinkHandleFlush h, Just CreatePipe) +instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where + isStdStream = (\(Just h) -> return (FlushInput $ sinkHandleFlush h, liftIO $ hClose h), Just CreatePipe) + +instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where + osStdStream = (\(Just h) -> return $ sourceHandle h, Just CreatePipe) +instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where + osStdStream = (\(Just h) -> return (sourceHandle h, liftIO $ hClose h), Just CreatePipe) + +-- | Given a @CreateProcess@, run the process, with its output being used as a +-- @Source@ to feed the provided @Consumer@. Once the process has completed, +-- return a tuple of the @ExitCode@ from the process and the output collected +-- from the @Consumer@. +-- +-- Note that, if an exception is raised by the consumer, the process is /not/ +-- terminated. This behavior is different from 'sourceProcessWithStreams' due +-- to historical reasons. +-- +-- Since 1.1.2 +sourceProcessWithConsumer :: MonadIO m + => CreateProcess + -> Consumer ByteString m a -- ^ stdout + -> m (ExitCode, a) +sourceProcessWithConsumer cp consumer = do + (ClosedStream, (source, close), ClosedStream, cph) <- streamingProcess cp + res <- source $$ consumer + close + ec <- waitForStreamingProcess cph + return (ec, res) + +-- | Like @sourceProcessWithConsumer@ but providing the command to be run as +-- a @String@. +-- +-- Since 1.1.2 +sourceCmdWithConsumer :: MonadIO m + => String -- ^command + -> Consumer ByteString m a -- ^stdout + -> m (ExitCode, a) +sourceCmdWithConsumer cmd = sourceProcessWithConsumer (shell cmd) + + +-- | Given a @CreateProcess@, run the process +-- and feed the provided @Producer@ +-- to the stdin @Sink@ of the process. +-- Use the process outputs (stdout, stderr) as @Source@s +-- and feed it to the provided @Consumer@s. +-- Once the process has completed, +-- return a tuple of the @ExitCode@ from the process +-- and the results collected from the @Consumer@s. +-- +-- If an exception is raised by any of the streams, +-- the process is terminated. +-- +-- IO is required because the streams are run concurrently +-- using the package +-- +-- @since 1.1.12 +sourceProcessWithStreams :: CreateProcess + -> Producer IO ByteString -- ^stdin + -> Consumer ByteString IO a -- ^stdout + -> Consumer ByteString IO b -- ^stderr + -> IO (ExitCode, a, b) +sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr = do + ( (sinkStdin, closeStdin) + , (sourceStdout, closeStdout) + , (sourceStderr, closeStderr) + , sph) <- streamingProcess cp + (_, resStdout, resStderr) <- + runConcurrently ( + (,,) + <$> Concurrently ((producerStdin $$ sinkStdin) `finally` closeStdin) + <*> Concurrently (sourceStdout $$ consumerStdout) + <*> Concurrently (sourceStderr $$ consumerStderr)) + `finally` (closeStdout >> closeStderr) + `onException` terminateStreamingProcess sph + ec <- waitForStreamingProcess sph + return (ec, resStdout, resStderr) + +-- | Like @sourceProcessWithStreams@ but providing the command to be run as +-- a @String@. +-- +-- @since 1.1.12 +sourceCmdWithStreams :: String -- ^command + -> Producer IO ByteString -- ^stdin + -> Consumer ByteString IO a -- ^stdout + -> Consumer ByteString IO b -- ^stderr + -> IO (ExitCode, a, b) +sourceCmdWithStreams cmd = sourceProcessWithStreams (shell cmd) + +-- | Same as 'withCheckedProcess', but kills the child process in the case of +-- an exception being thrown by the provided callback function. +-- +-- @since 1.1.11 +withCheckedProcessCleanup + :: ( InputSource stdin + , OutputSink stderr + , OutputSink stdout + , MonadIO m + , MonadMask m + ) + => CreateProcess + -> (stdin -> stdout -> stderr -> m b) + -> m b +withCheckedProcessCleanup cp f = bracket + (streamingProcess cp) + (\(_, _, _, sph) -> closeStreamingProcessHandle sph) + $ \(x, y, z, sph) -> do + res <- f x y z `onException` liftIO (terminateStreamingProcess sph) + ec <- waitForStreamingProcess sph + if ec == ExitSuccess + then return res + else throwM $ ProcessExitedUnsuccessfully cp ec + + +terminateStreamingProcess :: StreamingProcessHandle -> IO () +terminateStreamingProcess = terminateProcess . streamingProcessHandleRaw diff --git a/Data/Conduit/Process/Typed.hs b/Data/Conduit/Process/Typed.hs new file mode 100644 index 0000000..1acbbc4 --- /dev/null +++ b/Data/Conduit/Process/Typed.hs @@ -0,0 +1,113 @@ +{-# LANGUAGE DataKinds #-} + +-- | The "System.Process.Typed" module from @typed-process@, but with +-- added conduit helpers. +module Data.Conduit.Process.Typed + ( -- * Conduit specific stuff + createSink + , createSource + -- * Generalized functions + , withProcess + , withProcess_ + , withLoggedProcess_ + -- * Reexports + , module System.Process.Typed + ) where + +import System.Process.Typed hiding (withProcess, withProcess_) +import qualified System.Process.Typed as P +import Data.Conduit (ConduitM, (.|), runConduit) +import qualified Data.Conduit as C +import qualified Data.Conduit.Binary as CB +import Control.Monad.IO.Unlift +import qualified Data.ByteString as S +import System.IO (hClose) +import qualified Data.Conduit.List as CL +import qualified Data.ByteString.Lazy as BL +import Data.IORef (IORef, newIORef, readIORef, modifyIORef) +import Control.Exception (throwIO, catch) +import Control.Concurrent.Async (concurrently) + +-- | Provide input to a process by writing to a conduit. +-- +-- @since 1.2.1 +createSink :: MonadIO m => StreamSpec 'STInput (ConduitM S.ByteString o m ()) +createSink = + (\h -> C.addCleanup (\_ -> liftIO $ hClose h) (CB.sinkHandle h)) + `fmap` createPipe + +-- | Read output from a process by read from a conduit. +-- +-- @since 1.2.1 +createSource :: MonadIO m => StreamSpec 'STOutput (ConduitM i S.ByteString m ()) +createSource = + (\h -> C.addCleanup (\_ -> liftIO $ hClose h) (CB.sourceHandle h)) + `fmap` createPipe + +-- | Internal function: like 'createSource', but stick all chunks into +-- the 'IORef'. +createSourceLogged + :: MonadIO m + => IORef ([S.ByteString] -> [S.ByteString]) + -> StreamSpec 'STOutput (ConduitM i S.ByteString m ()) +createSourceLogged ref = + -- We do not add a cleanup action to close the handle, since in + -- withLoggedProcess_ we attempt to read from the handle twice + (\h -> + ( CB.sourceHandle h + .| CL.iterM (\bs -> liftIO $ modifyIORef ref (. (bs:)))) + ) + `fmap` createPipe + +-- | Same as 'P.withProcess', but generalized to 'MonadUnliftIO'. +-- +-- @since 1.2.1 +withProcess + :: MonadUnliftIO m + => ProcessConfig stdin stdout stderr + -> (Process stdin stdout stderr -> m a) + -> m a +withProcess pc f = withRunInIO $ \run -> P.withProcess pc (run . f) + +-- | Same as 'P.withProcess_', but generalized to 'MonadUnliftIO'. +-- +-- @since 1.2.1 +withProcess_ + :: MonadUnliftIO m + => ProcessConfig stdin stdout stderr + -> (Process stdin stdout stderr -> m a) + -> m a +withProcess_ pc f = withRunInIO $ \run -> P.withProcess_ pc (run . f) + +-- | Run a process, throwing an exception on a failure exit code. This +-- will store all output from stdout and stderr in memory for better +-- error messages. Note that this will require unbounded memory usage, +-- so caveat emptor. +-- +-- This will ignore any previous settings for the stdout and stderr +-- streams, and instead force them to use 'createSource'. +-- +-- @since 1.2.3 +withLoggedProcess_ + :: MonadUnliftIO m + => ProcessConfig stdin stdoutIgnored stderrIgnored + -> (Process stdin (ConduitM () S.ByteString m ()) (ConduitM () S.ByteString m ()) -> m a) + -> m a +withLoggedProcess_ pc inner = withUnliftIO $ \u -> do + stdoutBuffer <- newIORef id + stderrBuffer <- newIORef id + let pc' = setStdout (createSourceLogged stdoutBuffer) + $ setStderr (createSourceLogged stderrBuffer) pc + P.withProcess pc' $ \p -> do + a <- unliftIO u $ inner p + let drain src = unliftIO u (runConduit (src .| CL.sinkNull)) + ((), ()) <- drain (getStdout p) `concurrently` + drain (getStderr p) + checkExitCode p `catch` \ece -> do + stdout <- readIORef stdoutBuffer + stderr <- readIORef stderrBuffer + throwIO ece + { eceStdout = BL.fromChunks $ stdout [] + , eceStderr = BL.fromChunks $ stderr [] + } + return a diff --git a/Data/Conduit/Text.hs b/Data/Conduit/Text.hs new file mode 100644 index 0000000..9e0291c --- /dev/null +++ b/Data/Conduit/Text.hs @@ -0,0 +1,492 @@ +{-# LANGUAGE DeriveDataTypeable, RankNTypes #-} +-- | +-- Copyright: 2011 Michael Snoyman, 2010-2011 John Millikin +-- License: MIT +-- +-- Handle streams of text. +-- +-- Parts of this code were taken from enumerator and adapted for conduits. +-- +-- For many purposes, it's recommended to use the conduit-combinators library, +-- which provides a more complete set of functions. +module Data.Conduit.Text + ( + + -- * Text codecs + Codec + , encode + , decode + , utf8 + , utf16_le + , utf16_be + , utf32_le + , utf32_be + , ascii + , iso8859_1 + , lines + , linesBounded + , TextException (..) + , takeWhile + , dropWhile + , take + , drop + , foldLines + , withLine + , Data.Conduit.Text.decodeUtf8 + , decodeUtf8Lenient + , encodeUtf8 + , detectUtf + ) where + +import Prelude hiding (head, drop, takeWhile, lines, zip, zip3, zipWith, zipWith3, take, dropWhile) + +import qualified Control.Exception as Exc +import qualified Data.ByteString as B +import qualified Data.ByteString.Char8 as B8 +import Data.Char (ord) +import qualified Data.Text as T +import qualified Data.Text.Encoding as TE +import Data.Word (Word8) +import Data.Typeable (Typeable) + +import Data.Conduit +import qualified Data.Conduit.List as CL +import Control.Monad.Trans.Class (lift) +import Control.Monad.Trans.Resource (MonadThrow, monadThrow) +import Control.Monad (unless) +import Data.Streaming.Text + +-- | A specific character encoding. +-- +-- Since 0.3.0 +data Codec = Codec + { _codecName :: T.Text + , codecEncode + :: T.Text + -> (B.ByteString, Maybe (TextException, T.Text)) + , codecDecode + :: B.ByteString + -> (T.Text, Either + (TextException, B.ByteString) + B.ByteString) + } + | NewCodec T.Text (T.Text -> B.ByteString) (B.ByteString -> DecodeResult) + +instance Show Codec where + showsPrec d c = + let (cnst, name) = case c of + Codec t _ _ -> ("Codec ", t) + NewCodec t _ _ -> ("NewCodec ", t) + in showParen (d > 10) $ showString cnst . shows name + + + +-- | Emit each line separately +-- +-- Since 0.4.1 +lines :: Monad m => Conduit T.Text m T.Text +lines = + awaitText T.empty + where + awaitText buf = await >>= maybe (finish buf) (process buf) + + finish buf = unless (T.null buf) (yield buf) + + process buf text = yieldLines $ buf `T.append` text + + yieldLines buf = + let (line, rest) = T.break (== '\n') buf + in case T.uncons rest of + Just (_, rest') -> yield line >> yieldLines rest' + _ -> awaitText line + + + +-- | Variant of the lines function with an integer parameter. +-- The text length of any emitted line +-- never exceeds the value of the parameter. Whenever +-- this is about to happen a LengthExceeded exception +-- is thrown. This function should be used instead +-- of the lines function whenever we are dealing with +-- user input (e.g. a file upload) because we can't be sure that +-- user input won't have extraordinarily large lines which would +-- require large amounts of memory if consumed. +linesBounded :: MonadThrow m => Int -> Conduit T.Text m T.Text +linesBounded maxLineLen = + awaitText 0 T.empty + where + awaitText len buf = await >>= maybe (finish buf) (process len buf) + + finish buf = unless (T.null buf) (yield buf) + + process len buf text = + let (line, rest) = T.break (== '\n') text + len' = len + T.length line + in if len' > maxLineLen + then lift $ monadThrow (LengthExceeded maxLineLen) + else case T.uncons rest of + Just (_, rest') -> + yield (buf `T.append` line) >> process 0 T.empty rest' + _ -> + awaitText len' $ buf `T.append` text + + + +-- | Convert text into bytes, using the provided codec. If the codec is +-- not capable of representing an input character, an exception will be thrown. +-- +-- Since 0.3.0 +encode :: MonadThrow m => Codec -> Conduit T.Text m B.ByteString +encode (NewCodec _ enc _) = CL.map enc +encode codec = CL.mapM $ \t -> do + let (bs, mexc) = codecEncode codec t + maybe (return bs) (monadThrow . fst) mexc + +decodeNew + :: Monad m + => (Int -> B.ByteString -> T.Text -> B.ByteString -> Conduit B.ByteString m T.Text) + -> t + -> Int + -> (B.ByteString -> DecodeResult) + -> Conduit B.ByteString m T.Text +decodeNew onFailure _name = + loop + where + loop consumed dec = + await >>= maybe finish go + where + finish = + case dec B.empty of + DecodeResultSuccess _ _ -> return () + DecodeResultFailure t rest -> onFailure consumed B.empty t rest + {-# INLINE finish #-} + + go bs | B.null bs = loop consumed dec + go bs = + case dec bs of + DecodeResultSuccess t dec' -> do + let consumed' = consumed + B.length bs + next = do + unless (T.null t) (yield t) + loop consumed' dec' + in consumed' `seq` next + DecodeResultFailure t rest -> onFailure consumed bs t rest + +-- | Decode a stream of UTF8 data, and replace invalid bytes with the Unicode +-- replacement character. +-- +-- Since 1.1.1 +decodeUtf8Lenient :: Monad m => Conduit B.ByteString m T.Text +decodeUtf8Lenient = + decodeNew onFailure "UTF8-lenient" 0 Data.Streaming.Text.decodeUtf8 + where + onFailure _consumed _bs t rest = do + unless (T.null t) (yield t) + case B.uncons rest of + Nothing -> return () + Just (_, rest') -> do + unless (B.null rest') (leftover rest') + yield $ T.singleton '\xFFFD' + decodeUtf8Lenient + +-- | Convert bytes into text, using the provided codec. If the codec is +-- not capable of decoding an input byte sequence, an exception will be thrown. +-- +-- Since 0.3.0 +decode :: MonadThrow m => Codec -> Conduit B.ByteString m T.Text +decode (NewCodec name _ start) = + decodeNew onFailure name 0 start + where + onFailure consumed bs t rest = do + unless (T.null t) (yield t) + leftover rest -- rest will never be null, no need to check + let consumed' = consumed + B.length bs - B.length rest + monadThrow $ NewDecodeException name consumed' (B.take 4 rest) + {-# INLINE onFailure #-} +decode codec = + loop id + where + loop front = await >>= maybe (finish front) (go front) + + finish front = + case B.uncons $ front B.empty of + Nothing -> return () + Just (w, _) -> lift $ monadThrow $ DecodeException codec w + + go front bs' = + case extra of + Left (exc, _) -> lift $ monadThrow exc + Right bs'' -> yield text >> loop (B.append bs'') + where + (text, extra) = codecDecode codec bs + bs = front bs' + +-- | +-- Since 0.3.0 +data TextException = DecodeException Codec Word8 + | EncodeException Codec Char + | LengthExceeded Int + | TextException Exc.SomeException + | NewDecodeException !T.Text !Int !B.ByteString + deriving Typeable +instance Show TextException where + show (DecodeException codec w) = concat + [ "Error decoding legacy Data.Conduit.Text codec " + , show codec + , " when parsing byte: " + , show w + ] + show (EncodeException codec c) = concat + [ "Error encoding legacy Data.Conduit.Text codec " + , show codec + , " when parsing char: " + , show c + ] + show (LengthExceeded i) = "Data.Conduit.Text.linesBounded: line too long: " ++ show i + show (TextException se) = "Data.Conduit.Text.TextException: " ++ show se + show (NewDecodeException codec consumed next) = concat + [ "Data.Conduit.Text.decode: Error decoding stream of " + , T.unpack codec + , " bytes. Error encountered in stream at offset " + , show consumed + , ". Encountered at byte sequence " + , show next + ] +instance Exc.Exception TextException + +-- | +-- Since 0.3.0 +utf8 :: Codec +utf8 = NewCodec (T.pack "UTF-8") TE.encodeUtf8 Data.Streaming.Text.decodeUtf8 + +-- | +-- Since 0.3.0 +utf16_le :: Codec +utf16_le = NewCodec (T.pack "UTF-16-LE") TE.encodeUtf16LE decodeUtf16LE + +-- | +-- Since 0.3.0 +utf16_be :: Codec +utf16_be = NewCodec (T.pack "UTF-16-BE") TE.encodeUtf16BE decodeUtf16BE + +-- | +-- Since 0.3.0 +utf32_le :: Codec +utf32_le = NewCodec (T.pack "UTF-32-LE") TE.encodeUtf32LE decodeUtf32LE + +-- | +-- Since 0.3.0 +utf32_be :: Codec +utf32_be = NewCodec (T.pack "UTF-32-BE") TE.encodeUtf32BE decodeUtf32BE + +-- | +-- Since 0.3.0 +ascii :: Codec +ascii = Codec name enc dec where + name = T.pack "ASCII" + enc text = (bytes, extra) where + (safe, unsafe) = T.span (\c -> ord c <= 0x7F) text + bytes = B8.pack (T.unpack safe) + extra = if T.null unsafe + then Nothing + else Just (EncodeException ascii (T.head unsafe), unsafe) + + dec bytes = (text, extra) where + (safe, unsafe) = B.span (<= 0x7F) bytes + text = T.pack (B8.unpack safe) + extra = if B.null unsafe + then Right B.empty + else Left (DecodeException ascii (B.head unsafe), unsafe) + +-- | +-- Since 0.3.0 +iso8859_1 :: Codec +iso8859_1 = Codec name enc dec where + name = T.pack "ISO-8859-1" + enc text = (bytes, extra) where + (safe, unsafe) = T.span (\c -> ord c <= 0xFF) text + bytes = B8.pack (T.unpack safe) + extra = if T.null unsafe + then Nothing + else Just (EncodeException iso8859_1 (T.head unsafe), unsafe) + + dec bytes = (T.pack (B8.unpack bytes), Right B.empty) + +-- | +-- +-- Since 1.0.8 +takeWhile :: Monad m + => (Char -> Bool) + -> Conduit T.Text m T.Text +takeWhile p = + loop + where + loop = await >>= maybe (return ()) go + go t = + case T.span p t of + (x, y) + | T.null y -> yield x >> loop + | otherwise -> yield x >> leftover y + +-- | +-- +-- Since 1.0.8 +dropWhile :: Monad m + => (Char -> Bool) + -> Consumer T.Text m () +dropWhile p = + loop + where + loop = await >>= maybe (return ()) go + go t + | T.null x = loop + | otherwise = leftover x + where + x = T.dropWhile p t + +-- | +-- +-- Since 1.0.8 +take :: Monad m => Int -> Conduit T.Text m T.Text +take = + loop + where + loop i = await >>= maybe (return ()) (go i) + go i t + | diff == 0 = yield t + | diff < 0 = + let (x, y) = T.splitAt i t + in yield x >> leftover y + | otherwise = yield t >> loop diff + where + diff = i - T.length t + +-- | +-- +-- Since 1.0.8 +drop :: Monad m => Int -> Consumer T.Text m () +drop = + loop + where + loop i = await >>= maybe (return ()) (go i) + go i t + | diff == 0 = return () + | diff < 0 = leftover $ T.drop i t + | otherwise = loop diff + where + diff = i - T.length t + +-- | +-- +-- Since 1.0.8 +foldLines :: Monad m + => (a -> ConduitM T.Text o m a) + -> a + -> ConduitM T.Text o m a +foldLines f = + start + where + start a = CL.peek >>= maybe (return a) (const $ loop $ f a) + + loop consumer = do + a <- takeWhile (/= '\n') =$= do + a <- CL.map (T.filter (/= '\r')) =$= consumer + CL.sinkNull + return a + drop 1 + start a + +-- | +-- +-- Since 1.0.8 +withLine :: Monad m + => Sink T.Text m a + -> Consumer T.Text m (Maybe a) +withLine consumer = toConsumer $ do + mx <- CL.peek + case mx of + Nothing -> return Nothing + Just _ -> do + x <- takeWhile (/= '\n') =$ do + x <- CL.map (T.filter (/= '\r')) =$ consumer + CL.sinkNull + return x + drop 1 + return $ Just x + +-- | Decode a stream of UTF8-encoded bytes into a stream of text, throwing an +-- exception on invalid input. +-- +-- Since 1.0.15 +decodeUtf8 :: MonadThrow m => Conduit B.ByteString m T.Text +decodeUtf8 = decode utf8 + {- no meaningful performance advantage + CI.ConduitM (loop 0 decodeUtf8) + where + loop consumed dec = + CI.NeedInput go finish + where + finish () = + case dec B.empty of + DecodeResultSuccess _ _ -> return () + DecodeResultFailure t rest -> onFailure B.empty t rest + {-# INLINE finish #-} + + go bs | B.null bs = CI.NeedInput go finish + go bs = + case dec bs of + DecodeResultSuccess t dec' -> do + let consumed' = consumed + B.length bs + next' = loop consumed' dec' + next + | T.null t = next' + | otherwise = CI.HaveOutput next' (return ()) t + in consumed' `seq` next + DecodeResultFailure t rest -> onFailure bs t rest + + onFailure bs t rest = do + unless (T.null t) (CI.yield t) + unless (B.null rest) (CI.leftover rest) + let consumed' = consumed + B.length bs - B.length rest + monadThrow $ NewDecodeException (T.pack "UTF-8") consumed' (B.take 4 rest) + {-# INLINE onFailure #-} + -} +{-# INLINE decodeUtf8 #-} + +-- | Encode a stream of text into a stream of bytes. +-- +-- Since 1.0.15 +encodeUtf8 :: Monad m => Conduit T.Text m B.ByteString +encodeUtf8 = CL.map TE.encodeUtf8 +{-# INLINE encodeUtf8 #-} + +-- | Automatically determine which UTF variant is being used. This function +-- checks for BOMs, removing them as necessary. It defaults to assuming UTF-8. +-- +-- Since 1.1.9 +detectUtf :: MonadThrow m => Conduit B.ByteString m T.Text +detectUtf = + go id + where + go front = await >>= maybe (close front) (push front) + + push front bs' + | B.length bs < 4 = go $ B.append bs + | otherwise = leftDecode bs + where bs = front bs' + + close front = leftDecode $ front B.empty + + leftDecode bs = leftover bsOut >> decode codec + where + bsOut = B.append (B.drop toDrop x) y + (x, y) = B.splitAt 4 bs + (toDrop, codec) = + case B.unpack x of + [0x00, 0x00, 0xFE, 0xFF] -> (4, utf32_be) + [0xFF, 0xFE, 0x00, 0x00] -> (4, utf32_le) + 0xFE : 0xFF: _ -> (2, utf16_be) + 0xFF : 0xFE: _ -> (2, utf16_le) + 0xEF : 0xBB: 0xBF : _ -> (3, utf8) + _ -> (0, utf8) -- Assuming UTF-8 +{-# INLINE detectUtf #-} diff --git a/Data/Conduit/Zlib.hs b/Data/Conduit/Zlib.hs new file mode 100644 index 0000000..b5a168b --- /dev/null +++ b/Data/Conduit/Zlib.hs @@ -0,0 +1,238 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE RankNTypes #-} +-- | Streaming compression and decompression using conduits. +-- +-- Parts of this code were taken from zlib-enum and adapted for conduits. +module Data.Conduit.Zlib ( + -- * Conduits + compress, decompress, gzip, ungzip, + -- * Flushing + compressFlush, decompressFlush, + -- * Decompression combinators + multiple, + -- * Re-exported from zlib-bindings + WindowBits (..), defaultWindowBits +) where + +import Data.Streaming.Zlib +import Data.Conduit +import Data.ByteString (ByteString) +import qualified Data.ByteString as S +import Control.Monad (unless, liftM) +import Control.Monad.Trans.Class (lift, MonadTrans) +import Control.Monad.Primitive (PrimMonad, unsafePrimToPrim) +import Control.Monad.Base (MonadBase, liftBase) +import Control.Monad.Trans.Resource (MonadThrow, monadThrow) +import Data.Function (fix) + +-- | Gzip compression with default parameters. +gzip :: (MonadThrow m, MonadBase base m, PrimMonad base) => Conduit ByteString m ByteString +gzip = compress 1 (WindowBits 31) + +-- | Gzip decompression with default parameters. +ungzip :: (MonadBase base m, PrimMonad base, MonadThrow m) => Conduit ByteString m ByteString +ungzip = decompress (WindowBits 31) + +unsafeLiftIO :: (MonadBase base m, PrimMonad base, MonadThrow m) => IO a -> m a +unsafeLiftIO = liftBase . unsafePrimToPrim + +-- | +-- Decompress (inflate) a stream of 'ByteString's. For example: +-- +-- > sourceFile "test.z" $= decompress defaultWindowBits $$ sinkFile "test" + +decompress + :: (MonadBase base m, PrimMonad base, MonadThrow m) + => WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library) + -> Conduit ByteString m ByteString +decompress = + helperDecompress (liftM (fmap Chunk) await) yield' leftover + where + yield' Flush = return () + yield' (Chunk bs) = yield bs + +-- | Same as 'decompress', but allows you to explicitly flush the stream. +decompressFlush + :: (MonadBase base m, PrimMonad base, MonadThrow m) + => WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library) + -> Conduit (Flush ByteString) m (Flush ByteString) +decompressFlush = helperDecompress await yield (leftover . Chunk) + +helperDecompress :: (Monad (t m), MonadBase base m, PrimMonad base, MonadThrow m, MonadTrans t) + => t m (Maybe (Flush ByteString)) + -> (Flush ByteString -> t m ()) + -> (ByteString -> t m ()) + -> WindowBits + -> t m () +helperDecompress await' yield' leftover' config = do + -- Initialize the stateful inflater, which will be used below + -- This inflater is never exposed outside of this function + inf <- lift $ unsafeLiftIO $ initInflate config + + -- Some helper functions used by the main feeder loop below + + let -- Flush any remaining inflated bytes downstream + flush = do + chunk <- lift $ unsafeLiftIO $ flushInflate inf + unless (S.null chunk) $ yield' $ Chunk chunk + + -- Get any input which is unused by the inflater + getUnused = lift $ unsafeLiftIO $ getUnusedInflate inf + + -- If there is any unused data, return it as leftovers to the stream + unused = do + rem' <- getUnused + unless (S.null rem') $ leftover' rem' + + -- Main loop: feed data from upstream into the inflater + fix $ \feeder -> do + mnext <- await' + case mnext of + -- No more data is available from upstream + Nothing -> do + -- Flush any remaining uncompressed data + flush + -- Return the rest of the unconsumed data as leftovers + unused + -- Another chunk of compressed data arrived + Just (Chunk x) -> do + -- Feed the compressed data into the inflater, returning a + -- "popper" which will return chunks of decompressed data + popper <- lift $ unsafeLiftIO $ feedInflate inf x + + -- Loop over the popper grabbing decompressed chunks and + -- yielding them downstream + fix $ \pop -> do + mbs <- lift $ unsafeLiftIO popper + case mbs of + -- No more data from this popper + PRDone -> do + rem' <- getUnused + if S.null rem' + -- No data was unused by the inflater, so let's + -- fill it up again and get more data out of it + then feeder + -- In this case, there is some unconsumed data, + -- meaning the compressed stream is complete. + -- At this point, we need to stop feeding, + -- return the unconsumed data as leftovers, and + -- flush any remaining content (which should be + -- nothing) + else do + flush + leftover' rem' + -- Another chunk available, yield it downstream and + -- loop again + PRNext bs -> do + yield' (Chunk bs) + pop + -- An error occurred inside zlib, throw it + PRError e -> lift $ monadThrow e + -- We've been asked to flush the stream + Just Flush -> do + -- Get any uncompressed data waiting for us + flush + -- Put a Flush in the stream + yield' Flush + -- Feed in more data + feeder + +-- | +-- Compress (deflate) a stream of 'ByteString's. The 'WindowBits' also control +-- the format (zlib vs. gzip). + +compress + :: (MonadBase base m, PrimMonad base, MonadThrow m) + => Int -- ^ Compression level + -> WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library) + -> Conduit ByteString m ByteString +compress = + helperCompress (liftM (fmap Chunk) await) yield' + where + yield' Flush = return () + yield' (Chunk bs) = yield bs + +-- | Same as 'compress', but allows you to explicitly flush the stream. +compressFlush + :: (MonadBase base m, PrimMonad base, MonadThrow m) + => Int -- ^ Compression level + -> WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library) + -> Conduit (Flush ByteString) m (Flush ByteString) +compressFlush = helperCompress await yield + +helperCompress :: (Monad (t m), MonadBase base m, PrimMonad base, MonadThrow m, MonadTrans t) + => t m (Maybe (Flush ByteString)) + -> (Flush ByteString -> t m ()) + -> Int + -> WindowBits + -> t m () +helperCompress await' yield' level config = + await' >>= maybe (return ()) start + where + start input = do + def <- lift $ unsafeLiftIO $ initDeflate level config + push def input + + continue def = await' >>= maybe (close def) (push def) + + goPopper popper = do + mbs <- lift $ unsafeLiftIO popper + case mbs of + PRDone -> return () + PRNext bs -> yield' (Chunk bs) >> goPopper popper + PRError e -> lift $ monadThrow e + + push def (Chunk x) = do + popper <- lift $ unsafeLiftIO $ feedDeflate def x + goPopper popper + continue def + + push def Flush = do + mchunk <- lift $ unsafeLiftIO $ flushDeflate def + case mchunk of + PRDone -> return () + PRNext x -> yield' $ Chunk x + PRError e -> lift $ monadThrow e + yield' Flush + continue def + + close def = do + mchunk <- lift $ unsafeLiftIO $ finishDeflate def + case mchunk of + PRDone -> return () + PRNext chunk -> yield' (Chunk chunk) >> close def + PRError e -> lift $ monadThrow e + +-- | The standard 'decompress' and 'ungzip' functions will only decompress a +-- single compressed entity from the stream. This combinator will exhaust the +-- stream completely of all individual compressed entities. This is useful for +-- cases where you have a concatenated archive, e.g. @cat file1.gz file2.gz > +-- combined.gz@. +-- +-- Usage: +-- +-- > sourceFile "combined.gz" $$ multiple ungzip =$ consume +-- +-- This combinator will not fail on an empty stream. If you want to ensure that +-- at least one compressed entity in the stream exists, consider a usage such +-- as: +-- +-- > sourceFile "combined.gz" $$ (ungzip >> multiple ungzip) =$ consume +-- +-- @since 1.1.10 +multiple :: Monad m + => Conduit ByteString m a + -> Conduit ByteString m a +multiple inner = + loop + where + loop = do + mbs <- await + case mbs of + Nothing -> return () + Just bs + | S.null bs -> loop + | otherwise -> do + leftover bs + inner + loop diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d9f0417 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2012 Michael Snoyman, http://www.yesodweb.com/ + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..aadb600 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +## conduit-extra + +For more information about conduit in general, and how this package in +particular fits into the ecosystem, see [the conduit +homepage](https://github.com/snoyberg/conduit#readme). diff --git a/Setup.lhs b/Setup.lhs new file mode 100755 index 0000000..06e2708 --- /dev/null +++ b/Setup.lhs @@ -0,0 +1,7 @@ +#!/usr/bin/env runhaskell + +> module Main where +> import Distribution.Simple + +> main :: IO () +> main = defaultMain diff --git a/bench/blaze.hs b/bench/blaze.hs new file mode 100644 index 0000000..d7ce8f7 --- /dev/null +++ b/bench/blaze.hs @@ -0,0 +1,72 @@ +{-# LANGUAGE OverloadedStrings #-} +import Data.Conduit +import qualified Data.Conduit.List as CL +import Data.Conduit.Blaze +import Criterion.Main +import Blaze.ByteString.Builder +import Data.Monoid +import qualified Data.ByteString.Builder as BS +import Data.Functor.Identity (runIdentity) +import Control.Monad.ST (runST) +import Data.ByteString.Lazy.Internal (defaultChunkSize) + +count :: Int +count = 100000 + +single :: Builder +single = copyByteString "Hello World!\n" + +oneBuilderLeft :: Builder +oneBuilderLeft = + loop count mempty + where + loop 0 b = b + loop i b = loop (i - 1) (b <> single) + +oneBuilderRight :: Builder +oneBuilderRight = + loop count mempty + where + loop 0 b = b + loop i b = loop (i - 1) (b <> single) + +builderSource :: Monad m => Source m Builder +builderSource = CL.replicate count single + +singleBS :: BS.Builder +singleBS = BS.shortByteString "Hello World!\n" + +oneBSBuilderLeft :: BS.Builder +oneBSBuilderLeft = + loop count mempty + where + loop 0 b = b + loop i b = loop (i - 1) (b <> singleBS) + +oneBSBuilderRight :: BS.Builder +oneBSBuilderRight = + loop count mempty + where + loop 0 b = b + loop i b = loop (i - 1) (b <> singleBS) + +builderBSSource :: Monad m => Source m BS.Builder +builderBSSource = CL.replicate count singleBS + +main :: IO () +main = defaultMain + [ bench "conduit, strict, safe" $ whnfIO $ + builderSource $$ builderToByteString =$ CL.sinkNull + , bench "conduit, strict, unsafe" $ whnfIO $ + builderSource $$ unsafeBuilderToByteString (allocBuffer defaultChunkSize) =$ CL.sinkNull + + , bench "one builder, left" $ nf toLazyByteString oneBuilderLeft + , bench "one builder, right" $ nf toLazyByteString oneBuilderRight + , bench "conduit, lazy" $ flip nf builderSource $ \src -> + toLazyByteString $ runIdentity $ src $$ CL.fold (<>) mempty + + , bench "one bs builder, left" $ nf BS.toLazyByteString oneBSBuilderLeft + , bench "one bs builder, right" $ nf BS.toLazyByteString oneBSBuilderRight + , bench "conduit BS, lazy" $ flip nf builderBSSource $ \src -> + BS.toLazyByteString $ runIdentity $ src $$ CL.fold (<>) mempty + ] diff --git a/conduit-extra.cabal b/conduit-extra.cabal new file mode 100644 index 0000000..15f4f06 --- /dev/null +++ b/conduit-extra.cabal @@ -0,0 +1,127 @@ +Name: conduit-extra +Version: 1.2.3.2 +Synopsis: Batteries included conduit: adapters for common libraries. +Description: + The conduit package itself maintains relative small dependencies. The purpose of this package is to collect commonly used utility functions wrapping other library dependencies, without depending on heavier-weight dependencies. The basic idea is that this package should only depend on haskell-platform packages and conduit. +License: MIT +License-file: LICENSE +Author: Michael Snoyman +Maintainer: michael@snoyman.com +Category: Data, Conduit +Build-type: Simple +Cabal-version: >=1.8 +Homepage: http://github.com/snoyberg/conduit +extra-source-files: + test/random + test/filesystem/*.txt + test/filesystem/bin/*.txt + ChangeLog.md + README.md + +Library + Exposed-modules: Data.Conduit.Attoparsec + Data.Conduit.Binary + Data.Conduit.Blaze + Data.Conduit.ByteString.Builder + Data.Conduit.Filesystem + Data.Conduit.Foldl + Data.Conduit.Lazy + Data.Conduit.Network + Data.Conduit.Network.UDP + Data.Conduit.Process + Data.Conduit.Text + Data.Conduit.Zlib + if !os(windows) + Exposed-modules: Data.Conduit.Network.Unix + if impl(ghc >= 7.8) + Exposed-modules: Data.Conduit.Process.Typed + + if arch(x86_64) || arch(i386) + -- These architectures are able to perform unaligned memory accesses + cpp-options: -DALLOW_UNALIGNED_ACCESS + + Build-depends: base >= 4.5 && < 5 + , conduit >= 1.2.8 && < 1.3 + + , bytestring >= 0.10.2 + , exceptions + , monad-control + , text + , transformers + , transformers-base + + , async + , attoparsec >= 0.10 + , blaze-builder >= 0.3 + , directory + , filepath + , network >= 2.3 + , primitive >= 0.5 + , process + , resourcet >= 1.1 + , stm + , streaming-commons >= 0.1.16 + , unliftio-core + if impl(ghc >= 7.8) + build-depends: typed-process >= 0.2 + + ghc-options: -Wall + +test-suite test + hs-source-dirs: test + main-is: Spec.hs + type: exitcode-stdio-1.0 + ghc-options: -threaded + cpp-options: -DTEST + build-depends: conduit + , conduit-extra + , base + , hspec >= 1.3 + + , async + , attoparsec + , blaze-builder + , bytestring-builder + , bytestring + , exceptions + , process + , resourcet + , QuickCheck + , stm + , streaming-commons + , text + , transformers + , transformers-base + , directory + ghc-options: -Wall + if os(windows) + cpp-options: -DWINDOWS + other-modules: Data.Conduit.AttoparsecSpec + Data.Conduit.BinarySpec + Data.Conduit.ByteString.BuilderSpec + Data.Conduit.ExtraSpec + Data.Conduit.FilesystemSpec + Data.Conduit.LazySpec + Data.Conduit.NetworkSpec + Data.Conduit.ProcessSpec + Data.Conduit.Process.TypedSpec + Data.Conduit.TextSpec + Data.Conduit.ZlibSpec + +benchmark blaze + type: exitcode-stdio-1.0 + hs-source-dirs: bench + build-depends: base + , blaze-builder + , conduit + , conduit-extra + , criterion + , bytestring + , bytestring-builder + , transformers + main-is: blaze.hs + ghc-options: -Wall -O2 -rtsopts + +source-repository head + type: git + location: git://github.com/snoyberg/conduit.git diff --git a/test/Data/Conduit/AttoparsecSpec.hs b/test/Data/Conduit/AttoparsecSpec.hs new file mode 100644 index 0000000..39c30b6 --- /dev/null +++ b/test/Data/Conduit/AttoparsecSpec.hs @@ -0,0 +1,176 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TupleSections #-} +{-# OPTIONS_GHC -fno-warn-incomplete-patterns #-} +module Data.Conduit.AttoparsecSpec (spec) where +import Control.Exception (fromException) +import Test.Hspec + +import Control.Applicative ((<*), (<|>)) +import Control.Monad +import Control.Monad.Trans.Resource (runExceptionT) +import qualified Data.Attoparsec.ByteString.Char8 +import qualified Data.Attoparsec.Text +import Data.Conduit +import Data.Conduit.Attoparsec +import qualified Data.Conduit.List as CL + +spec :: Spec +spec = describe "Data.Conduit.AttoparsecSpec" $ do + describe "error position" $ do + it "works for text" $ do + let input = ["aaa\na", "aaa\n\n", "aaa", "aab\n\naaaa"] + badLine = 4 + badCol = 6 + badOff = 15 + parser = Data.Attoparsec.Text.endOfInput <|> (Data.Attoparsec.Text.notChar 'b' >> parser) + sink = sinkParser parser + sink' = sinkParserEither parser + ea <- runExceptionT $ CL.sourceList input $$ sink + case ea of + Left e -> + case fromException e of + Just pe -> do + errorPosition pe `shouldBe` Position badLine badCol badOff + ea' <- CL.sourceList input $$ sink' + case ea' of + Left pe -> + errorPosition pe `shouldBe` Position badLine badCol badOff + it "works for bytestring" $ do + let input = ["aaa\na", "aaa\n\n", "aaa", "aab\n\naaaa"] + badLine = 4 + badCol = 6 + badOff = 15 + parser = Data.Attoparsec.ByteString.Char8.endOfInput <|> (Data.Attoparsec.ByteString.Char8.notChar 'b' >> parser) + sink = sinkParser parser + sink' = sinkParserEither parser + ea <- runExceptionT $ CL.sourceList input $$ sink + case ea of + Left e -> + case fromException e of + Just pe -> do + errorPosition pe `shouldBe` Position badLine badCol badOff + ea' <- CL.sourceList input $$ sink' + case ea' of + Left pe -> + errorPosition pe `shouldBe` Position badLine badCol badOff + it "works in last chunk" $ do + let input = ["aaa\na", "aaa\n\n", "aaa", "aab\n\naaaa"] + badLine = 6 + badCol = 5 + badOff = 22 + parser = Data.Attoparsec.Text.char 'c' <|> (Data.Attoparsec.Text.anyChar >> parser) + sink = sinkParser parser + sink' = sinkParserEither parser + ea <- runExceptionT $ CL.sourceList input $$ sink + case ea of + Left e -> + case fromException e of + Just pe -> do + errorPosition pe `shouldBe` Position badLine badCol badOff + ea' <- CL.sourceList input $$ sink' + case ea' of + Left pe -> + errorPosition pe `shouldBe` Position badLine badCol badOff + it "works in last chunk" $ do + let input = ["aaa\na", "aaa\n\n", "aaa", "aa\n\naaaab"] + badLine = 6 + badCol = 6 + badOff = 22 + parser = Data.Attoparsec.Text.string "bc" <|> (Data.Attoparsec.Text.anyChar >> parser) + sink = sinkParser parser + sink' = sinkParserEither parser + ea <- runExceptionT $ CL.sourceList input $$ sink + case ea of + Left e -> + case fromException e of + Just pe -> do + errorPosition pe `shouldBe` Position badLine badCol badOff + ea' <- CL.sourceList input $$ sink' + case ea' of + Left pe -> + errorPosition pe `shouldBe` Position badLine badCol badOff + it "works after new line in text" $ do + let input = ["aaa\n", "aaa\n\n", "aaa", "aa\nb\naaaa"] + badLine = 5 + badCol = 1 + badOff = 15 + parser = Data.Attoparsec.Text.endOfInput <|> (Data.Attoparsec.Text.notChar 'b' >> parser) + sink = sinkParser parser + sink' = sinkParserEither parser + ea <- runExceptionT $ CL.sourceList input $$ sink + case ea of + Left e -> + case fromException e of + Just pe -> do + errorPosition pe `shouldBe` Position badLine badCol badOff + ea' <- CL.sourceList input $$ sink' + case ea' of + Left pe -> + errorPosition pe `shouldBe` Position badLine badCol badOff + it "works after new line in bytestring" $ do + let input = ["aaa\n", "aaa\n\n", "aaa", "aa\nb\naaaa"] + badLine = 5 + badCol = 1 + badOff = 15 + parser = Data.Attoparsec.ByteString.Char8.endOfInput <|> (Data.Attoparsec.ByteString.Char8.notChar 'b' >> parser) + sink = sinkParser parser + sink' = sinkParserEither parser + ea <- runExceptionT $ CL.sourceList input $$ sink + case ea of + Left e -> + case fromException e of + Just pe -> do + errorPosition pe `shouldBe` Position badLine badCol badOff + ea' <- CL.sourceList input $$ sink' + case ea' of + Left pe -> + errorPosition pe `shouldBe` Position badLine badCol badOff + it "works for first line" $ do + let input = ["aab\na", "aaa\n\n", "aaa", "aab\n\naaaa"] + badLine = 1 + badCol = 3 + badOff = 2 + parser = Data.Attoparsec.Text.endOfInput <|> (Data.Attoparsec.Text.notChar 'b' >> parser) + sink = sinkParser parser + sink' = sinkParserEither parser + ea <- runExceptionT $ CL.sourceList input $$ sink + case ea of + Left e -> + case fromException e of + Just pe -> do + errorPosition pe `shouldBe` Position badLine badCol badOff + ea' <- CL.sourceList input $$ sink' + case ea' of + Left pe -> + errorPosition pe `shouldBe` Position badLine badCol badOff + + describe "conduitParser" $ do + it "parses a repeated stream" $ do + let input = ["aaa\n", "aaa\naaa\n", "aaa\n"] + parser = Data.Attoparsec.Text.string "aaa" <* Data.Attoparsec.Text.endOfLine + sink = conduitParserEither parser =$= CL.consume + (Right ea) <- runExceptionT $ CL.sourceList input $$ sink + let chk a = case a of + Left{} -> False + Right (_, xs) -> xs == "aaa" + chkp l = PositionRange (Position l 1 ((l - 1) * 4)) (Position (l+1) 1 (l * 4)) + forM_ ea $ \ a -> a `shouldSatisfy` chk :: Expectation + forM_ (zip ea [1..]) $ \ (Right (pos, _), l) -> pos `shouldBe` chkp l + length ea `shouldBe` 4 + + it "positions on first line" $ do + results <- yield "hihihi\nhihi" + $$ conduitParser (Data.Attoparsec.Text.string "\n" <|> Data.Attoparsec.Text.string "hi") + =$ CL.consume + let f (a, b, c, d, e, f, g) = (PositionRange (Position a b c) (Position d e f), g) + results `shouldBe` map f + [ (1, 1, 0, 1, 3, 2, "hi") + , (1, 3, 2, 1, 5, 4, "hi") + , (1, 5, 4, 1, 7, 6, "hi") + + , (1, 7, 6, 2, 1, 7, "\n") + + , (2, 1, 7, 2, 3, 9, "hi") + , (2, 3, 9, 2, 5, 11, "hi") + ] diff --git a/test/Data/Conduit/BinarySpec.hs b/test/Data/Conduit/BinarySpec.hs new file mode 100644 index 0000000..0314e65 --- /dev/null +++ b/test/Data/Conduit/BinarySpec.hs @@ -0,0 +1,333 @@ +{-# LANGUAGE GADTs #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE OverloadedStrings #-} +module Data.Conduit.BinarySpec (spec) where + +import qualified Data.Conduit.Binary as CB +import qualified Data.Conduit as C +import qualified Data.Conduit.List as CL +import Control.Monad.Trans.Resource +import Control.Monad.IO.Class +import Control.Exception (IOException) +import qualified Data.ByteString.Lazy as L +import qualified Blaze.ByteString.Builder.ByteString as Blaze +import Test.Hspec +import Test.Hspec.QuickCheck +import qualified Data.IORef as I +import Control.Monad.Trans.Writer.Strict +import qualified Data.ByteString as S +import qualified Data.ByteString.Char8 as S8 +import Data.Functor.Identity +import Test.QuickCheck.Arbitrary (Arbitrary, arbitrary) +import Test.QuickCheck.Gen (Gen, oneof) +import Data.Word (Word8) +import Foreign.Storable (Storable, sizeOf, pokeByteOff, alignment) +import Data.Typeable (Typeable) +import Data.ByteString.Internal (createAndTrim') +import Foreign.Ptr (alignPtr, minusPtr) +import System.Directory (doesFileExist) +import System.IO.Unsafe (unsafePerformIO) +import Control.Applicative ((<$>), (<*>)) + +spec :: Spec +spec = describe "Data.Conduit.Binary" $ do + + describe "file access" $ do + it "read" $ do + bs <- S.readFile "conduit-extra.cabal" + bss <- runResourceT $ CB.sourceFile "conduit-extra.cabal" C.$$ CL.consume + S.concat bss `shouldBe` bs + + it "read range" $ do + S.writeFile "tmp" "0123456789" + bss <- runResourceT $ CB.sourceFileRange "tmp" (Just 2) (Just 3) C.$$ CL.consume + S.concat bss `shouldBe` "234" + + it "write" $ do + runResourceT $ CB.sourceFile "conduit-extra.cabal" C.$$ CB.sinkFile "tmp" + bs1 <- S.readFile "conduit-extra.cabal" + bs2 <- S.readFile "tmp" + bs2 `shouldBe` bs1 + + + it "write builder (withSinkFileBuilder)" $ do + runResourceT $ CB.withSinkFileBuilder "tmp" $ \sink -> + CB.sourceFile "conduit-extra.cabal" C.=$= CL.map Blaze.fromByteString C.$$ sink + bs1 <- S.readFile "conduit-extra.cabal" + bs2 <- S.readFile "tmp" + bs2 `shouldBe` bs1 + + it "conduit" $ do + runResourceT $ CB.sourceFile "conduit-extra.cabal" + C.$= CB.conduitFile "tmp" + C.$$ CB.sinkFile "tmp2" + bs1 <- S.readFile "conduit-extra.cabal" + bs2 <- S.readFile "tmp" + bs3 <- S.readFile "tmp2" + bs2 `shouldBe` bs1 + bs3 `shouldBe` bs1 + describe "binary isolate" $ do + it "works" $ do + bss <- runResourceT $ CL.sourceList (replicate 1000 "X") + C.$= CB.isolate 6 + C.$$ CL.consume + S.concat bss `shouldBe` "XXXXXX" + + describe "properly using binary file reading" $ do + it "sourceFile" $ do + x <- runResourceT $ CB.sourceFile "test/random" C.$$ CL.consume + lbs <- L.readFile "test/random" + L.fromChunks x `shouldBe` lbs + + describe "binary head" $ do + let go lbs = do + x <- CB.head + case (x, L.uncons lbs) of + (Nothing, Nothing) -> return True + (Just y, Just (z, lbs')) + | y == z -> go lbs' + _ -> return False + + prop "works" $ \bss' -> + let bss = map S.pack bss' + in runIdentity $ + CL.sourceList bss C.$$ go (L.fromChunks bss) + describe "binary takeWhile" $ do + prop "works" $ \bss' -> + let bss = map S.pack bss' + in runIdentity $ do + bss2 <- CL.sourceList bss C.$$ CB.takeWhile (>= 5) C.=$ CL.consume + return $ L.fromChunks bss2 == L.takeWhile (>= 5) (L.fromChunks bss) + prop "leftovers present" $ \bss' -> + let bss = map S.pack bss' + in runIdentity $ do + result <- CL.sourceList bss C.$$ do + x <- CB.takeWhile (>= 5) C.=$ CL.consume + y <- CL.consume + return (S.concat x, S.concat y) + let expected = S.span (>= 5) $ S.concat bss + if result == expected + then return True + else error $ show (S.concat bss, result, expected) + + describe "binary dropWhile" $ do + prop "works" $ \bss' -> + let bss = map S.pack bss' + in runIdentity $ do + bss2 <- CL.sourceList bss C.$$ do + CB.dropWhile (< 5) + CL.consume + return $ L.fromChunks bss2 == L.dropWhile (< 5) (L.fromChunks bss) + + describe "binary take" $ do + let go n l = CL.sourceList l C.$$ do + a <- CB.take n + b <- CL.consume + return (a, b) + + -- Taking nothing should result in an empty Bytestring + it "nothing" $ do + (a, b) <- runResourceT $ go 0 ["abc", "defg"] + a `shouldBe` L.empty + L.fromChunks b `shouldBe` "abcdefg" + + it "normal" $ do + (a, b) <- runResourceT $ go 4 ["abc", "defg"] + a `shouldBe` "abcd" + L.fromChunks b `shouldBe` "efg" + + -- Taking exactly the data that is available should result in no + -- leftover. + it "all" $ do + (a, b) <- runResourceT $ go 7 ["abc", "defg"] + a `shouldBe` "abcdefg" + b `shouldBe` [] + + -- Take as much as possible. + it "more" $ do + (a, b) <- runResourceT $ go 10 ["abc", "defg"] + a `shouldBe` "abcdefg" + b `shouldBe` [] + + describe "binary" $ do + prop "lines" $ \bss' -> runIdentity $ do + let bss = map S.pack bss' + bs = S.concat bss + src = CL.sourceList bss + res <- src C.$$ CB.lines C.=$ CL.consume + return $ S8.lines bs == res + + describe "sinkCacheLength" $ do + it' "works" $ runResourceT $ do + lbs <- liftIO $ L.readFile "test/Data/Conduit/BinarySpec.hs" + (len, src) <- CB.sourceLbs lbs C.$$ CB.sinkCacheLength + lbs' <- src C.$$ CB.sinkLbs + liftIO $ do + fromIntegral len `shouldBe` L.length lbs + lbs' `shouldBe` lbs + fromIntegral len `shouldBe` L.length lbs' + + describe "sinkFileCautious" $ do + it' "success" $ do + runResourceT $ CB.sourceFile "conduit-extra.cabal" C.$$ CB.sinkFileCautious "tmp" + bs1 <- S.readFile "conduit-extra.cabal" + bs2 <- S.readFile "tmp" + bs2 `shouldBe` bs1 + it' "failure" $ do + let bs1 = "This is the original content" + S.writeFile "tmp" bs1 + runResourceT + ( (CB.sourceFile "conduit-extra.cabal" >> error "FIXME") + C.$$ CB.sinkFileCautious "tmp") + `shouldThrow` anyException + bs2 <- S.readFile "tmp" + bs2 `shouldBe` bs1 + + it "sinkSystemTempFile" $ do + let bs = "Hello World!" + fp <- runResourceT $ do + fp <- C.yield bs C.$$ CB.sinkSystemTempFile "temp-file-test" + actual <- liftIO $ S.readFile fp + liftIO $ actual `shouldBe` bs + return fp + exists <- doesFileExist fp + exists `shouldBe` False + + describe "Data.Conduit.Binary.mapM_" $ do + prop "telling works" $ \bytes -> + let lbs = L.pack bytes + src = CB.sourceLbs lbs + sink = CB.mapM_ (tell . return . S.singleton) + bss = execWriter $ src C.$$ sink + in L.fromChunks bss == lbs + + describe "exception handling" $ do + it "catchC" $ do + ref <- I.newIORef 0 + let src = do + C.catchC (CB.sourceFile "some-file-that-does-not-exist") onErr + C.handleC onErr $ CB.sourceFile "conduit-extra.cabal" + onErr :: MonadIO m => IOException -> m () + onErr _ = liftIO $ I.modifyIORef ref (+ 1) + contents <- L.readFile "conduit-extra.cabal" + res <- runResourceT $ src C.$$ CB.sinkLbs + res `shouldBe` contents + errCount <- I.readIORef ref + errCount `shouldBe` (1 :: Int) + it "tryC" $ do + ref <- I.newIORef undefined + let src = do + res1 <- C.tryC $ CB.sourceFile "some-file-that-does-not-exist" + res2 <- C.tryC $ CB.sourceFile "conduit-extra.cabal" + liftIO $ I.writeIORef ref (res1, res2) + contents <- L.readFile "conduit-extra.cabal" + res <- runResourceT $ src C.$$ CB.sinkLbs + res `shouldBe` contents + exc <- I.readIORef ref + case exc :: (Either IOException (), Either IOException ()) of + (Left _, Right ()) -> + return () + _ -> error $ show exc + + describe "normalFuseLeft" $ do + it "does not double close conduit" $ do + x <- runResourceT $ do + let src = CL.sourceList ["foobarbazbin"] + src C.$= CB.isolate 10 C.$$ CL.head + x `shouldBe` Just "foobarbazb" + + describe "Storable" $ do + let test name func = describe name $ do + let test' size = + prop ("chunk size " ++ show size) $ \stores0 -> do + let src = + loop (someStorables stores0) + where + loop bs + | S.null bs = return () + | otherwise = do + let (x, y) = S.splitAt size bs + C.yield x + loop y + + sink :: [SomeStorable] + -> C.Sink S.ByteString IO () + sink [] = do + mw <- CB.head + case mw of + Nothing -> return () + Just _ -> error "trailing bytes" + sink (next:rest) = do + withSomeStorable next checkOne + sink rest + + checkOne :: (Storable a, Eq a, Show a) + => a + -> C.Sink S.ByteString IO () + checkOne expected = do + mactual <- + if func + then CB.sinkStorable + else fmap Just CB.sinkStorableEx + actual <- + case mactual of + Nothing -> error "got Nothing" + Just actual -> return actual + liftIO $ actual `shouldBe` expected + + src C.$$ sink stores0 :: IO () + mapM_ test' [1, 5, 10, 100] + + test "sink Maybe" True + test "sink exception" False + + it' "insufficient bytes are leftovers, one chunk" $ do + let src = C.yield $ S.singleton 1 + src C.$$ do + mactual <- CB.sinkStorable + liftIO $ mactual `shouldBe` (Nothing :: Maybe Int) + lbs <- CB.sinkLbs + liftIO $ lbs `shouldBe` L.singleton 1 + + it' "insufficient bytes are leftovers, multiple chunks" $ do + let src = do + C.yield $ S.singleton 1 + C.yield $ S.singleton 2 + src C.$$ do + mactual <- CB.sinkStorable + liftIO $ mactual `shouldBe` (Nothing :: Maybe Int) + lbs <- CB.sinkLbs + liftIO $ lbs `shouldBe` L.pack [1, 2] + +data SomeStorable where + SomeStorable :: (Storable a, Eq a, Show a, Typeable a) => a -> SomeStorable +instance Show SomeStorable where + show (SomeStorable x) = show x +instance Arbitrary SomeStorable where + arbitrary = oneof + [ SomeStorable <$> (arbitrary :: Gen Int) + , SomeStorable <$> (arbitrary :: Gen Word8) + , SomeStorable <$> (arbitrary :: Gen Double) + ] + +withSomeStorable :: SomeStorable + -> (forall a. (Storable a, Eq a, Show a) => a -> b) + -> b +withSomeStorable (SomeStorable x) f = f x + +someStorable :: SomeStorable -> S.ByteString +someStorable store = + fst $ unsafePerformIO $ createAndTrim' (size + align) start + where + size = withSomeStorable store sizeOf + align = withSomeStorable store alignment + start ptr = do + let off = minusPtr ptr (alignPtr ptr align) + withSomeStorable store (pokeByteOff ptr off) + return (off, size, ()) + +someStorables :: [SomeStorable] -> S.ByteString +someStorables = S.concat . map someStorable + +it' :: String -> IO () -> Spec +it' = it diff --git a/test/Data/Conduit/ByteString/BuilderSpec.hs b/test/Data/Conduit/ByteString/BuilderSpec.hs new file mode 100644 index 0000000..48e8c8b --- /dev/null +++ b/test/Data/Conduit/ByteString/BuilderSpec.hs @@ -0,0 +1,64 @@ +{-# LANGUAGE OverloadedStrings #-} +module Data.Conduit.ByteString.BuilderSpec (spec) where + +import Test.Hspec +import Test.Hspec.QuickCheck (prop) + +import qualified Data.Conduit as C +import qualified Data.Conduit.List as CL +import Data.ByteString.Char8 () +import Data.Conduit.ByteString.Builder (builderToByteString, builderToByteStringFlush) +import Control.Monad.ST (runST) +import Data.Monoid +import qualified Data.ByteString as S +import Data.ByteString.Builder (byteString, toLazyByteString) +import Data.ByteString.Builder.Internal (lazyByteStringInsert, flush) +import qualified Data.ByteString.Lazy as L +import Data.ByteString.Lazy.Char8 () + +spec :: Spec +spec = + describe "Data.Conduit.ByteString.Builder" $ do + prop "idempotent to toLazyByteString" $ \bss' -> runST $ do + let bss = map S.pack bss' + let builders = map byteString bss + let lbs = toLazyByteString $ mconcat builders + let src = mconcat $ map (CL.sourceList . return) builders + outBss <- src C.$= builderToByteString C.$$ CL.consume + return $ lbs == L.fromChunks outBss + + it "works for large input" $ do + let builders = replicate 10000 (byteString "hello world!") + let lbs = toLazyByteString $ mconcat builders + let src = mconcat $ map (CL.sourceList . return) builders + outBss <- src C.$= builderToByteString C.$$ CL.consume + lbs `shouldBe` L.fromChunks outBss + + it "works for lazy bytestring insertion" $ do + let builders = replicate 10000 (lazyByteStringInsert "hello world!") + let lbs = toLazyByteString $ mconcat builders + let src = mconcat $ map (CL.sourceList . return) builders + outBss <- src C.$= builderToByteString C.$$ CL.consume + lbs `shouldBe` L.fromChunks outBss + + it "flush shouldn't bring in empty strings." $ do + let dat = ["hello", "world"] + src = CL.sourceList dat C.$= CL.map ((`mappend` flush) . byteString) + out <- src C.$= builderToByteString C.$$ CL.consume + dat `shouldBe` out + + prop "flushing" $ \bss' -> runST $ do + let bss = concatMap (\bs -> [C.Chunk $ S.pack bs, C.Flush]) $ filter (not . null) bss' + let chunks = map (fmap byteString) bss + let src = CL.sourceList chunks + outBss <- src C.$= builderToByteStringFlush C.$$ CL.consume + if bss == outBss then return () else error (show (bss, outBss)) + return $ bss == outBss + it "large flush input" $ do + let lbs = L.pack $ concat $ replicate 100000 [0..255] + let chunks = map (C.Chunk . byteString) (L.toChunks lbs) + let src = CL.sourceList chunks + bss <- src C.$$ builderToByteStringFlush C.=$ CL.consume + let unFlush (C.Chunk x) = [x] + unFlush C.Flush = [] + L.fromChunks (concatMap unFlush bss) `shouldBe` lbs diff --git a/test/Data/Conduit/ExtraSpec.hs b/test/Data/Conduit/ExtraSpec.hs new file mode 100644 index 0000000..114d7b2 --- /dev/null +++ b/test/Data/Conduit/ExtraSpec.hs @@ -0,0 +1,71 @@ +module Data.Conduit.ExtraSpec where + +import Data.Conduit +import Test.Hspec +import Test.Hspec.QuickCheck +import Data.Conduit.List (isolate, peek, consume) +import qualified Data.Conduit.List as CL +import qualified Data.Text as T +import qualified Data.Text.Encoding as T +import qualified Data.ByteString as S +import qualified Data.Conduit.Text as CT + +spec :: Spec +spec = describe "Data.Conduit.Extra" $ do + it "basic test" $ do + let sink2 :: Sink a IO (Maybe a, Maybe a) + sink2 = do + ma1 <- fuseLeftovers id (isolate 10) peek + ma2 <- peek + return (ma1, ma2) + + source = yield 1 >> yield 2 + res <- source $$ sink2 + res `shouldBe` (Just 1, Just 1) + + it "get leftovers" $ do + let sink2 :: Sink a IO ([a], [a], [a]) + sink2 = do + (x, y) <- fuseReturnLeftovers (isolate 2) peek3 + z <- CL.consume + return (x, y, z) + + peek3 = do + x <- CL.take 3 + mapM_ leftover $ reverse x + return x + + source = mapM_ yield [1..5] + res <- source $$ sink2 + res `shouldBe` ([1..2], [1..2], [3..5]) + + it "multiple values" $ do + let sink2 :: Sink a IO ([a], Maybe a) + sink2 = do + ma1 <- fuseLeftovers id (isolate 10) peek3 + ma2 <- peek + return (ma1, ma2) + + peek3 = do + x <- CL.take 3 + mapM_ leftover $ reverse x + return x + + source = mapM_ yield [1..5] + res <- source $$ sink2 + res `shouldBe` ([1..3], Just 1) + + prop "more complex" $ \ss cnt -> do + let ts = map T.pack ss + src = mapM_ (yield . T.encodeUtf8) ts + conduit = CL.map T.decodeUtf8 + sink = CT.take cnt =$ consume + undo = return . T.encodeUtf8 . T.concat + res <- src $$ do + x <- fuseLeftovers undo conduit sink + y <- consume + return (T.concat x, T.decodeUtf8 $ S.concat y) + res `shouldBe` T.splitAt cnt (T.concat ts) + +main :: IO () +main = hspec spec diff --git a/test/Data/Conduit/FilesystemSpec.hs b/test/Data/Conduit/FilesystemSpec.hs new file mode 100644 index 0000000..1160505 --- /dev/null +++ b/test/Data/Conduit/FilesystemSpec.hs @@ -0,0 +1,38 @@ +module Data.Conduit.FilesystemSpec (spec) where + +import Test.Hspec +import Data.Conduit +import qualified Data.Conduit.List as CL +import Data.Conduit.Filesystem +import Data.List (sort, isSuffixOf) +import Control.Monad.Trans.Resource (runResourceT) + +spec :: Spec +spec = describe "Data.Conduit.Filesystem" $ do + it "sourceDirectory" $ do + res <- runResourceT + $ sourceDirectory "test/filesystem" + $$ CL.filter (not . (".swp" `isSuffixOf`)) + =$ CL.consume + sort res `shouldBe` + [ "test/filesystem/bar.txt" + , "test/filesystem/baz.txt" + , "test/filesystem/bin" + , "test/filesystem/foo.txt" + ] + it "sourceDirectoryDeep" $ do + res1 <- runResourceT + $ sourceDirectoryDeep False "test/filesystem" + $$ CL.filter (not . (".swp" `isSuffixOf`)) + =$ CL.consume + res2 <- runResourceT + $ sourceDirectoryDeep True "test/filesystem" + $$ CL.filter (not . (".swp" `isSuffixOf`)) + =$ CL.consume + sort res1 `shouldBe` + [ "test/filesystem/bar.txt" + , "test/filesystem/baz.txt" + , "test/filesystem/bin/bin.txt" + , "test/filesystem/foo.txt" + ] + sort res1 `shouldBe` sort res2 diff --git a/test/Data/Conduit/LazySpec.hs b/test/Data/Conduit/LazySpec.hs new file mode 100644 index 0000000..a71be24 --- /dev/null +++ b/test/Data/Conduit/LazySpec.hs @@ -0,0 +1,44 @@ +module Data.Conduit.LazySpec (spec) where + +import qualified Data.Conduit.Lazy as CLazy +import Test.Hspec +import Control.Monad.IO.Class +import qualified Data.Conduit as C +import qualified Data.Conduit.Binary as CB +import Control.Monad.Trans.Resource +import Data.Monoid +import qualified Data.IORef as I +import Control.Monad (forever) + +spec :: Spec +spec = describe "Data.Conduit.Lazy" $ do + + describe "lazy" $ do + it' "works inside a ResourceT" $ runResourceT $ do + counter <- liftIO $ I.newIORef 0 + let incr i = do + istate <- liftIO $ I.newIORef $ Just (i :: Int) + let loop = do + res <- liftIO $ I.atomicModifyIORef istate ((,) Nothing) + case res of + Nothing -> return () + Just x -> do + count <- liftIO $ I.atomicModifyIORef counter + (\j -> (j + 1, j + 1)) + liftIO $ count `shouldBe` i + C.yield x + loop + loop + nums <- CLazy.lazyConsume $ mconcat $ map incr [1..10] + liftIO $ nums `shouldBe` [1..10] + + it' "returns nothing outside ResourceT" $ do + bss <- runResourceT $ CLazy.lazyConsume $ CB.sourceFile "test/main.hs" + bss `shouldBe` [] + + it' "works with pure sources" $ do + nums <- CLazy.lazyConsume $ forever $ C.yield 1 + take 100 nums `shouldBe` replicate 100 (1 :: Int) + +it' :: String -> IO () -> Spec +it' = it diff --git a/test/Data/Conduit/NetworkSpec.hs b/test/Data/Conduit/NetworkSpec.hs new file mode 100644 index 0000000..b97e703 --- /dev/null +++ b/test/Data/Conduit/NetworkSpec.hs @@ -0,0 +1,48 @@ +{-# LANGUAGE OverloadedStrings #-} +module Data.Conduit.NetworkSpec (spec) where + +import Data.Conduit +import Data.Conduit.Network +import Control.Concurrent (forkIO, threadDelay, putMVar, newEmptyMVar, takeMVar, killThread) +import Control.Monad (replicateM_) +import Test.Hspec + +spec :: Spec +spec = describe "Data.Conduit.Network" $ do + describe "run general server" $ do + it "running tcp server" $ do + _ <- forkIO $ runTCPServer (serverSettings 4009 "*4") echo + threadDelay 1000000 + replicateM_ 100 + $ runTCPClient (clientSettings 4009 "127.0.0.1") doNothing + describe "fork server" $ do + it "can connect to server" $ do + let set = serverSettings 4010 "*4" + threadId <- forkTCPServer set echo + replicateM_ 100 + $ runTCPClient (clientSettings 4010 "127.0.0.1") doNothing + killThread threadId + + it "fork server also executes custom afterBind" $ do + assertMVar <- newEmptyMVar + let set = serverSettings 4010 "*4" + setWithAfterBind = setAfterBind (\_ -> putMVar assertMVar ()) set + threadId <- forkTCPServer setWithAfterBind echo + takeMVar assertMVar + killThread threadId + + it "fork server really waits for server to be finalized before returning" $ do + let set = serverSettings 4010 "*4" + setWithAfterBind = setAfterBind (\_ -> threadDelay 1000000) set + threadId <- forkTCPServer setWithAfterBind echo + replicateM_ 100 + $ runTCPClient (clientSettings 4010 "127.0.0.1") doNothing + killThread threadId + + + +echo :: AppData -> IO () +echo ad = appSource ad $$ appSink ad + +doNothing :: AppData -> IO () +doNothing _ = return () diff --git a/test/Data/Conduit/Process/TypedSpec.hs b/test/Data/Conduit/Process/TypedSpec.hs new file mode 100644 index 0000000..9d47333 --- /dev/null +++ b/test/Data/Conduit/Process/TypedSpec.hs @@ -0,0 +1,29 @@ +module Data.Conduit.Process.TypedSpec (spec) where + +import Test.Hspec +import Data.Conduit +import Data.Conduit.Process.Typed +import qualified Data.Conduit.List as CL +import qualified Data.ByteString as B + +spec :: Spec +spec = do + it "cat works" $ do + let fp = "ChangeLog.md" + pc = setStdout createSource $ proc "cat" [fp] + bs <- B.readFile fp + bss <- withProcess_ pc $ \p -> runConduit $ getStdout p .| CL.consume + B.concat bss `shouldBe` bs + it "cat works with withLoggedProcess_" $ do + let fp = "ChangeLog.md" + pc = proc "cat" [fp] + bs <- B.readFile fp + bss <- withLoggedProcess_ pc $ \p -> runConduit $ getStdout p .| CL.consume + B.concat bss `shouldBe` bs + it "failing process throws" $ do + (withLoggedProcess_ (proc "cat" ["does not exist"]) $ \p -> do + runConduit $ getStdout p .| CL.mapM_ (error "shouldn't have data")) + `shouldThrow` anyException + it "failing process throws" $ do + (withProcess_ (proc "cat" ["does not exist"]) $ const $ return ()) + `shouldThrow` anyException diff --git a/test/Data/Conduit/ProcessSpec.hs b/test/Data/Conduit/ProcessSpec.hs new file mode 100644 index 0000000..2b79f87 --- /dev/null +++ b/test/Data/Conduit/ProcessSpec.hs @@ -0,0 +1,102 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE OverloadedStrings #-} +module Data.Conduit.ProcessSpec (spec, main) where + +import Test.Hspec +import Test.Hspec.QuickCheck (prop) +import Data.Conduit +import qualified Data.Conduit.List as CL +import Data.Conduit.Process +import Control.Concurrent.Async (concurrently) +import qualified Data.ByteString.Lazy as L +import qualified Data.ByteString as S +import qualified Data.ByteString.Char8 as S8 +import System.Exit +import Control.Concurrent (threadDelay) + +main :: IO () +main = hspec spec + +spec :: Spec +spec = describe "Data.Conduit.Process" $ do +#ifndef WINDOWS + prop "cat" $ \wss -> do + let lbs = L.fromChunks $ map S.pack wss + ((sink, closeStdin), source, Inherited, cph) <- streamingProcess (shell "cat") + ((), bss) <- concurrently + (do + mapM_ yield (L.toChunks lbs) $$ sink + closeStdin) + (source $$ CL.consume) + L.fromChunks bss `shouldBe` lbs + ec <- waitForStreamingProcess cph + ec `shouldBe` ExitSuccess + + it "closed stream" $ do + (ClosedStream, source, Inherited, cph) <- streamingProcess (shell "cat") + bss <- source $$ CL.consume + bss `shouldBe` [] + + ec <- waitForStreamingProcess cph + ec `shouldBe` ExitSuccess + + it "handles sub-process exit code" $ do + (sourceCmdWithConsumer "exit 0" CL.sinkNull) + `shouldReturn` (ExitSuccess, ()) + (sourceCmdWithConsumer "exit 11" CL.sinkNull) + `shouldReturn` (ExitFailure 11, ()) + (sourceCmdWithConsumer "exit 12" CL.sinkNull) + `shouldReturn` (ExitFailure 12, ()) + (sourceCmdWithStreams "exit 0" CL.sourceNull CL.sinkNull CL.sinkNull) + `shouldReturn` (ExitSuccess, (), ()) + (sourceCmdWithStreams "exit 11" CL.sourceNull CL.sinkNull CL.sinkNull) + `shouldReturn` (ExitFailure 11, (), ()) + (sourceCmdWithStreams "exit 12" CL.sourceNull CL.sinkNull CL.sinkNull) + `shouldReturn` (ExitFailure 12, (), ()) + + it "consumes stdout" $ do + let mystr = "this is a test string" :: String + sourceCmdWithStreams ("bash -c \"echo -n " ++ mystr ++ "\"") + CL.sourceNull + CL.consume -- stdout + CL.consume -- stderr + `shouldReturn` (ExitSuccess, [S8.pack mystr], []) + + it "consumes stderr" $ do + let mystr = "this is a test string" :: String + sourceCmdWithStreams ("bash -c \">&2 echo -n " ++ mystr ++ "\"") + CL.sourceNull + CL.consume -- stdout + CL.consume -- stderr + `shouldReturn` (ExitSuccess, [], [S8.pack mystr]) + + it "feeds stdin" $ do + let mystr = "this is a test string" :: S.ByteString + sourceCmdWithStreams "cat" + (yield mystr) + CL.consume -- stdout + CL.consume -- stderr + `shouldReturn` (ExitSuccess, [mystr], []) +#endif + it "blocking vs non-blocking" $ do + (ClosedStream, ClosedStream, ClosedStream, cph) <- streamingProcess (shell "sleep 1") + + mec1 <- getStreamingProcessExitCode cph + mec1 `shouldBe` Nothing + + threadDelay 1500000 + + -- For slow systems where sleep may take longer than 1.5 seconds, do + -- this in a loop. + let loop 0 = error "Took too long for sleep to exit, your system is acting funny" + loop i = do + mec2 <- getStreamingProcessExitCode cph + case mec2 of + Nothing -> do + threadDelay 500000 + loop (pred i) + Just _ -> mec2 `shouldBe` Just ExitSuccess + loop (5 :: Int) + + ec <- waitForStreamingProcess cph + ec `shouldBe` ExitSuccess diff --git a/test/Data/Conduit/TextSpec.hs b/test/Data/Conduit/TextSpec.hs new file mode 100644 index 0000000..b86309e --- /dev/null +++ b/test/Data/Conduit/TextSpec.hs @@ -0,0 +1,229 @@ +{-# LANGUAGE FlexibleContexts, OverloadedStrings #-} +module Data.Conduit.TextSpec (spec) where + +import qualified Data.Conduit.Text as CT +import qualified Data.Conduit as C +import qualified Data.Conduit.Lift as C +import qualified Data.Conduit.List as CL +import Test.Hspec +import Test.Hspec.QuickCheck +import Data.Monoid +import Control.Monad.ST +import qualified Data.Text as T +import qualified Data.Text.Encoding as TE +import qualified Data.Text.Encoding.Error as TEE +import qualified Data.Text.Lazy.Encoding as TLE +import Data.Functor.Identity +import Control.Arrow +import Control.Applicative +import Control.Monad.Trans.Resource +import qualified Data.ByteString as S +import qualified Data.Text.Lazy as TL +import qualified Data.ByteString.Lazy as L +import Control.Monad.Trans.Resource (runExceptionT_) + +spec :: Spec +spec = describe "Data.Conduit.Text" $ do + describe "text" $ do + let go enc tenc tdec cenc = describe enc $ do + prop "single chunk" $ \chars -> runST $ runExceptionT_ $ do + let tl = TL.pack chars + lbs = tenc tl + src = CL.sourceList $ L.toChunks lbs + ts <- src C.$= CT.decode cenc C.$$ CL.consume + return $ TL.fromChunks ts == tl + prop "many chunks" $ \chars -> runIdentity $ runExceptionT_ $ do + let tl = TL.pack chars + lbs = tenc tl + src = mconcat $ map (CL.sourceList . return . S.singleton) $ L.unpack lbs + + ts <- src C.$= CT.decode cenc C.$$ CL.consume + return $ TL.fromChunks ts == tl + + -- Check whether raw bytes are decoded correctly, in + -- particular that Text decoding produces an error if + -- and only if Conduit does. + prop "raw bytes" $ \bytes -> + let lbs = L.pack bytes + src = CL.sourceList $ L.toChunks lbs + etl = runException $ src C.$= CT.decode cenc C.$$ CL.consume + tl' = tdec lbs + in case etl of + (Left _) -> (return $! TL.toStrict tl') `shouldThrow` anyException + (Right tl) -> TL.fromChunks tl `shouldBe` tl' + prop "encoding" $ \chars -> runIdentity $ runExceptionT_ $ do + let tss = map T.pack chars + lbs = tenc $ TL.fromChunks tss + src = mconcat $ map (CL.sourceList . return) tss + bss <- src C.$= CT.encode cenc C.$$ CL.consume + return $ L.fromChunks bss == lbs + prop "valid then invalid" $ \x y chars -> runIdentity $ runExceptionT_ $ do + let tss = map T.pack ([x, y]:chars) + ts = T.concat tss + lbs = tenc (TL.fromChunks tss) `L.append` "\0\0\0\0\0\0\0" + src = mapM_ C.yield $ L.toChunks lbs + Just x' <- src C.$$ CT.decode cenc C.=$ C.await + return $ x' `T.isPrefixOf` ts + go "utf8" TLE.encodeUtf8 TLE.decodeUtf8 CT.utf8 + go "utf16_le" TLE.encodeUtf16LE TLE.decodeUtf16LE CT.utf16_le + go "utf16_be" TLE.encodeUtf16BE TLE.decodeUtf16BE CT.utf16_be + go "utf32_le" TLE.encodeUtf32LE TLE.decodeUtf32LE CT.utf32_le + go "utf32_be" TLE.encodeUtf32BE TLE.decodeUtf32BE CT.utf32_be + it "mixed utf16 and utf8" $ do + let bs = "8\NUL:\NULu\NUL\215\216\217\218" + src = C.yield bs C.$= CT.decode CT.utf16_le + text <- src C.$$ C.await + text `shouldBe` Just "8:u" + (src C.$$ CL.sinkNull) `shouldThrow` anyException + it "invalid utf8" $ do + let bs = S.pack [0..255] + src = C.yield bs C.$= CT.decode CT.utf8 + text <- src C.$$ C.await + text `shouldBe` Just (T.pack $ map toEnum [0..127]) + (src C.$$ CL.sinkNull) `shouldThrow` anyException + it "catch UTF8 exceptions" $ do + let badBS = "this is good\128\128\0that was bad" + + grabExceptions inner = C.catchC + (inner C.=$= CL.map Right) + (\e -> C.yield (Left (e :: CT.TextException))) + + res <- C.yield badBS C.$$ (,) + <$> (grabExceptions (CT.decode CT.utf8) C.=$ CL.consume) + <*> CL.consume + + first (map (either (Left . show) Right)) res `shouldBe` + ( [ Right "this is good" + , Left $ show $ CT.NewDecodeException "UTF-8" 12 "\128\128\0t" + ] + , ["\128\128\0that was bad"] + ) + it "catch UTF8 exceptions, pure" $ do + let badBS = "this is good\128\128\0that was bad" + + grabExceptions inner = do + res <- C.runCatchC $ inner C.=$= CL.map Right + case res of + Left e -> C.yield $ Left e + Right () -> return () + + let res = runIdentity $ C.yield badBS C.$$ (,) + <$> (grabExceptions (CT.decode CT.utf8) C.=$ CL.consume) + <*> CL.consume + + first (map (either (Left . show) Right)) res `shouldBe` + ( [ Right "this is good" + , Left $ show $ CT.NewDecodeException "UTF-8" 12 "\128\128\0t" + ] + , ["\128\128\0that was bad"] + ) + it "catch UTF8 exceptions, catchExceptionC" $ do + let badBS = "this is good\128\128\0that was bad" + + grabExceptions inner = C.catchCatchC + (inner C.=$= CL.map Right) + (\e -> C.yield $ Left e) + + let res = runException_ $ C.yield badBS C.$$ (,) + <$> (grabExceptions (CT.decode CT.utf8) C.=$ CL.consume) + <*> CL.consume + + first (map (either (Left . show) Right)) res `shouldBe` + ( [ Right "this is good" + , Left $ show $ CT.NewDecodeException "UTF-8" 12 "\128\128\0t" + ] + , ["\128\128\0that was bad"] + ) + it "catch UTF8 exceptions, catchExceptionC, decodeUtf8" $ do + let badBS = "this is good\128\128\0that was bad" + + grabExceptions inner = C.catchCatchC + (inner C.=$= CL.map Right) + (\e -> C.yield $ Left e) + + let res = runException_ $ C.yield badBS C.$$ (,) + <$> (grabExceptions CT.decodeUtf8 C.=$ CL.consume) + <*> CL.consume + + first (map (either (Left . show) Right)) res `shouldBe` + ( [ Right "this is good" + , Left $ show $ CT.NewDecodeException "UTF-8" 12 "\128\128\0t" + ] + , ["\128\128\0that was bad"] + ) + prop "lenient UTF8 decoding" $ \good1 good2 -> do + let bss = [TE.encodeUtf8 $ T.pack good1, "\128\129\130", TE.encodeUtf8 $ T.pack good2] + bs = S.concat bss + expected = TE.decodeUtf8With TEE.lenientDecode bs + actual = runIdentity $ mapM_ C.yield bss C.$$ CT.decodeUtf8Lenient C.=$ CL.consume + T.concat actual `shouldBe` expected + + describe "text lines" $ do + it "yields nothing given nothing" $ + (CL.sourceList [] C.$= CT.lines C.$$ CL.consume) == + [[]] + it "yields nothing given only empty text" $ + (CL.sourceList [""] C.$= CT.lines C.$$ CL.consume) == + [[]] + it "works across split lines" $ + (CL.sourceList ["abc", "d\nef"] C.$= CT.lines C.$$ CL.consume) == + [["abcd", "ef"]] + it "works with multiple lines in an item" $ + (CL.sourceList ["ab\ncd\ne"] C.$= CT.lines C.$$ CL.consume) == + [["ab", "cd", "e"]] + it "works with ending on a newline" $ + (CL.sourceList ["ab\n"] C.$= CT.lines C.$$ CL.consume) == + [["ab"]] + it "works with ending a middle item on a newline" $ + (CL.sourceList ["ab\n", "cd\ne"] C.$= CT.lines C.$$ CL.consume) == + [["ab", "cd", "e"]] + it "works with empty text" $ + (CL.sourceList ["ab", "", "cd"] C.$= CT.lines C.$$ CL.consume) == + [["abcd"]] + it "works with empty lines" $ + (CL.sourceList ["\n\n"] C.$= CT.lines C.$$ CL.consume) == + [["", ""]] + + describe "text lines bounded" $ do + it "yields nothing given nothing" $ + (CL.sourceList [] C.$= CT.linesBounded 80 C.$$ CL.consume) == + [[]] + it "yields nothing given only empty text" $ + (CL.sourceList [""] C.$= CT.linesBounded 80 C.$$ CL.consume) == + [[]] + it "works across split lines" $ + (CL.sourceList ["abc", "d\nef"] C.$= CT.linesBounded 80 C.$$ CL.consume) == + [["abcd", "ef"]] + it "works with multiple lines in an item" $ + (CL.sourceList ["ab\ncd\ne"] C.$= CT.linesBounded 80 C.$$ CL.consume) == + [["ab", "cd", "e"]] + it "works with ending on a newline" $ + (CL.sourceList ["ab\n"] C.$= CT.linesBounded 80 C.$$ CL.consume) == + [["ab"]] + it "works with ending a middle item on a newline" $ + (CL.sourceList ["ab\n", "cd\ne"] C.$= CT.linesBounded 80 C.$$ CL.consume) == + [["ab", "cd", "e"]] + it "works with empty text" $ + (CL.sourceList ["ab", "", "cd"] C.$= CT.linesBounded 80 C.$$ CL.consume) == + [["abcd"]] + it "works with empty lines" $ + (CL.sourceList ["\n\n"] C.$= CT.linesBounded 80 C.$$ CL.consume) == + [["", ""]] + it "throws an exception when lines are too long" $ do + x <- runExceptionT $ CL.sourceList ["hello\nworld"] C.$$ CT.linesBounded 4 C.=$ CL.consume + show x `shouldBe` show (Left $ CT.LengthExceeded 4 :: Either CT.TextException ()) + it "works with infinite input" $ do + x <- runExceptionT $ CL.sourceList (cycle ["hello"]) C.$$ CT.linesBounded 256 C.=$ CL.consume + show x `shouldBe` show (Left $ CT.LengthExceeded 256 :: Either CT.TextException ()) + describe "text decode" $ do + it' "doesn't throw runtime exceptions" $ do + let x = runIdentity $ runExceptionT $ C.yield "\x89\x243" C.$$ CT.decode CT.utf8 C.=$ CL.consume + case x of + Left _ -> return () + Right t -> error $ "This should have failed: " ++ show t + it "is not too eager" $ do + x <- CL.sourceList ["foobarbaz", error "ignore me"] C.$$ CT.decode CT.utf8 C.=$ CL.head + x `shouldBe` Just "foobarbaz" + +it' :: String -> IO () -> Spec +it' = it diff --git a/test/Data/Conduit/ZlibSpec.hs b/test/Data/Conduit/ZlibSpec.hs new file mode 100644 index 0000000..91fcfd9 --- /dev/null +++ b/test/Data/Conduit/ZlibSpec.hs @@ -0,0 +1,98 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE OverloadedStrings #-} +module Data.Conduit.ZlibSpec (spec) where + +import Test.Hspec +import Test.Hspec.QuickCheck (prop) + +import qualified Data.Conduit as C +import qualified Data.Conduit.List as CL +import qualified Data.Conduit.Zlib as CZ +import Control.Monad.ST (runST) +import Data.Monoid +import qualified Data.ByteString as S +import qualified Data.ByteString.Lazy as L +import Data.ByteString.Char8 () +import Data.ByteString.Lazy.Char8 () +import Control.Monad.Trans.Resource (runExceptionT_) +import Control.Monad.Trans.Class +import Control.Monad.Catch.Pure +import Control.Monad.Base +import Control.Monad (replicateM_) + +instance MonadBase base m => MonadBase base (CatchT m) where + liftBase = lift . liftBase + +spec :: Spec +spec = describe "Data.Conduit.Zlib" $ do + prop "idempotent" $ \bss' -> runST $ do + let bss = map S.pack bss' + lbs = L.fromChunks bss + src = mconcat $ map (CL.sourceList . return) bss + outBss <- runExceptionT_ $ src C.$= CZ.gzip C.$= CZ.ungzip C.$$ CL.consume + return $ lbs == L.fromChunks outBss + prop "flush" $ \bss' -> do + let bss = map S.pack $ filter (not . null) bss' + bssC = concatMap (\bs -> [C.Chunk bs, C.Flush]) bss + src = mconcat $ map (CL.sourceList . return) bssC + outBssC <- src C.$= CZ.compressFlush 5 (CZ.WindowBits 31) + C.$= CZ.decompressFlush (CZ.WindowBits 31) + C.$$ CL.consume + outBssC `shouldBe` bssC + it "compressFlush large data" $ do + let content = L.pack $ map (fromIntegral . fromEnum) $ concat $ ["BEGIN"] ++ map show [1..100000 :: Int] ++ ["END"] + src = CL.sourceList $ map C.Chunk $ L.toChunks content + bssC <- src C.$$ CZ.compressFlush 5 (CZ.WindowBits 31) C.=$ CL.consume + let unChunk (C.Chunk x) = [x] + unChunk C.Flush = [] + bss <- CL.sourceList bssC C.$$ CL.concatMap unChunk C.=$ CZ.ungzip C.=$ CL.consume + L.fromChunks bss `shouldBe` content + + it "uncompressed after compressed" $ do + let c = "This data is stored compressed." + u = "This data isn't." + let src1 = do + C.yield c C.$= CZ.gzip + C.yield u + encoded <- src1 C.$$ CL.consume + let src2 = mapM_ C.yield encoded + (c', u') <- src2 C.$$ do + c' <- CZ.ungzip C.=$ CL.consume + u' <- CL.consume + return (S.concat c', S.concat u') + c' `shouldBe` c + u' `shouldBe` u + + it "multiple compressed values" $ do + let s1 = "hello" + s2 = "world" + src = do + C.yield s1 C.$= CZ.gzip + C.yield s2 C.$= CZ.gzip + actual <- src C.$$ CZ.multiple CZ.ungzip C.=$ CL.consume + S.concat actual `shouldBe` S.concat [s1, s2] + + it "single compressed, multiple uncompressed chunks" $ do + let s1 = "hello" + s2 = "there" + s3 = "world" + s1Z <- fmap S.concat $ C.yield s1 C.$= CZ.gzip C.$$ CL.consume + let src = do + C.yield $ S.append s1Z s2 + C.yield s3 + actual <- src C.$$ do + x <- fmap S.concat $ CZ.ungzip C.=$ CL.consume + y <- CL.consume + return (x, y) + actual `shouldBe` (s1, [s2, s3]) + + it "multiple, over 32k" $ do + let str = "One line" + cnt = 30000 + src = replicateM_ cnt $ C.yield str C.$= CZ.gzip + actual <- fmap S.concat $ src C.$$ CZ.multiple CZ.ungzip C.=$ CL.consume + let expected = S.concat (replicate cnt str) + S.length actual `shouldBe` S.length expected + actual `shouldBe` expected diff --git a/test/Spec.hs b/test/Spec.hs new file mode 100644 index 0000000..a824f8c --- /dev/null +++ b/test/Spec.hs @@ -0,0 +1 @@ +{-# OPTIONS_GHC -F -pgmF hspec-discover #-} diff --git a/test/filesystem/bar.txt b/test/filesystem/bar.txt new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/filesystem/bar.txt diff --git a/test/filesystem/baz.txt b/test/filesystem/baz.txt new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/filesystem/baz.txt diff --git a/test/filesystem/bin/bin.txt b/test/filesystem/bin/bin.txt new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/filesystem/bin/bin.txt diff --git a/test/filesystem/foo.txt b/test/filesystem/foo.txt new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/filesystem/foo.txt diff --git a/test/random b/test/random new file mode 100644 index 0000000..2f6a5db Binary files /dev/null and b/test/random differ