diff --git a/ChangeLog.md b/ChangeLog.md new file mode 100644 index 0000000..b407422 --- /dev/null +++ b/ChangeLog.md @@ -0,0 +1,98 @@ +## 1.2.12.1 + +* Fix `pass` in `ConduitM` `MonadWriter` instance + +## 1.2.12 + +* Add `exceptC`, `runExceptC` and `catchExceptC` to `Data.Conduit.Lift` + +## 1.2.11 + +* Add `unfoldEither` and `unfoldEitherM` to `Data.Conduit.List` + +## 1.2.10 + +* Add `PrimMonad` instances for `ConduitM` and `Pipe` + [#306](https://github.com/snoyberg/conduit/pull/306) + +## 1.2.9.1 + +* Ensure downstream and inner sink receive same inputs in + `passthroughSink` + [#304](https://github.com/snoyberg/conduit/issues/304) + +## 1.2.9 + +* `chunksOf` [#296](https://github.com/snoyberg/conduit/pull/296) + +## 1.2.8 + +* Implement + [the reskinning idea](http://www.snoyman.com/blog/2016/09/proposed-conduit-reskin): + * `.|` + * `runConduitPure` + * `runConduitRes` + +## 1.2.7 + +* Expose yieldM for ConduitM [#270](https://github.com/snoyberg/conduit/pull/270) + +## 1.2.6.6 + +* Fix test suite compilation on older GHCs + +## 1.2.6.5 + +* In zipConduitApp, left bias not respected mixing monadic and non-monadic conduits [#263](https://github.com/snoyberg/conduit/pull/263) + +## 1.2.6.4 + +* Fix benchmark by adding a type signature + +## 1.2.6.3 + +* Doc updates + +## 1.2.6.2 + +* resourcet cannot be built with GHC 8 [#242](https://github.com/snoyberg/conduit/issues/242) +* Remove upper bound on transformers [#253](https://github.com/snoyberg/conduit/issues/253) + +## 1.2.6 + +* `sourceToList` +* Canonicalise Monad instances [#237](https://github.com/snoyberg/conduit/pull/237) + +## 1.2.5 + +* mapAccum and mapAccumM should be strict in their state [#218](https://github.com/snoyberg/conduit/issues/218) + +## 1.2.4.1 + +* Some documentation improvements + +## 1.2.4 + +* [fuseBothMaybe](https://github.com/snoyberg/conduit/issues/199) + +__1.2.3__ Expose `connect` and `fuse` as synonyms for `$$` and `=$=`, respectively. + +__1.2.2__ Lots more stream fusion. + +__1.2__ Two performance optimizations added. (1) A stream fusion framework. This is a non-breaking change. (2) Codensity transform applied to the `ConduitM` datatype. This only affects users importing the `.Internal` module. Both changes are thoroughly described in the following to blog posts: [Speeding up conduit](https://www.fpcomplete.com/blog/2014/08/iap-speeding-up-conduit), and [conduit stream fusion](https://www.fpcomplete.com/blog/2014/08/conduit-stream-fusion). + +__1.1__ Refactoring into conduit and conduit-extra packages. Core functionality is now in conduit, whereas most common helper modules (including Text, Binary, Zlib, etc) are in conduit-extra. To upgrade to this version, there should only be import list and conduit file changes necessary. + +__1.0__ Simplified the user-facing interface back to the Source, Sink, and Conduit types, with Producer and Consumer for generic code. Error messages have been simplified, and optional leftovers and upstream terminators have been removed from the external API. Some long-deprecated functions were finally removed. + +__0.5__ The internals of the package are now separated to the .Internal module, leaving only the higher-level interface in the advertised API. Internally, switched to a `Leftover` constructor and slightly tweaked the finalization semantics. + +__0.4__ Inspired by the design of the pipes package: we now have a single unified type underlying `Source`, `Sink`, and `Conduit`. This type is named `Pipe`. There are type synonyms provided for the other three types. Additionally, `BufferedSource` is no longer provided. Instead, the connect-and-resume operator, `$$+`, can be used for the same purpose. + +__0.3__ ResourceT has been greatly simplified, specialized for IO, and moved into a separate package. Instead of hard-coding ResourceT into the conduit datatypes, they can now live around any monad. The Conduit datatype has been enhanced to better allow generation of streaming output. The SourceResult, SinkResult, and ConduitResult datatypes have been removed entirely. + +__0.2__ Instead of storing state in mutable variables, we now use CPS. A `Source` returns the next `Source`, and likewise for `Sink`s and `Conduit`s. Not only does this take better advantage of GHC\'s optimizations (about a 20% speedup), but it allows some operations to have a reduction in algorithmic complexity from exponential to linear. This also allowed us to remove the `Prepared` set of types. Also, the `State` functions (e.g., `sinkState`) use better constructors for return types, avoiding the need for a dummy state on completion. + +__0.1__ `BufferedSource` is now an abstract type, and has a much more efficient internal representation. The result was a 41% speedup on microbenchmarks (note: do not expect speedups anywhere near that in real usage). In general, we are moving towards `BufferedSource` being a specific tool used internally as needed, but using `Source` for all external APIs. + +__0.0__ Initial release. diff --git a/Data/Conduit.hs b/Data/Conduit.hs new file mode 100644 index 0000000..5088ab2 --- /dev/null +++ b/Data/Conduit.hs @@ -0,0 +1,155 @@ +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE Safe #-} +-- | If this is your first time with conduit, you should probably start with +-- the tutorial: +-- . +module Data.Conduit + ( -- * Core interface + -- ** Types + Source + , Conduit + , Sink + , ConduitM + -- ** Connect/fuse operators + , (.|) + , ($$) + , ($=) + , (=$) + , (=$=) + , connect + , fuse + + -- *** Fuse with upstream results + , fuseBoth + , fuseBothMaybe + , fuseUpstream + + -- ** Primitives + , await + , yield + , yieldM + , leftover + , runConduit + , runConduitPure + , runConduitRes + + -- ** Finalization + , bracketP + , addCleanup + , yieldOr + + -- ** Exception handling + , catchC + , handleC + , tryC + + -- * Generalized conduit types + , Producer + , Consumer + , toProducer + , toConsumer + + -- * Utility functions + , awaitForever + , transPipe + , mapOutput + , mapOutputMaybe + , mapInput + , mergeSource + , passthroughSink + , sourceToList + + -- * Connect-and-resume + , ResumableSource + , newResumableSource + , ($$+) + , ($$++) + , ($$+-) + , ($=+) + , unwrapResumable + , closeResumableSource + + -- ** For @Conduit@s + , ResumableConduit + , newResumableConduit + , (=$$+) + , (=$$++) + , (=$$+-) + , unwrapResumableConduit + + -- * Fusion with leftovers + , fuseLeftovers + , fuseReturnLeftovers + + -- * Flushing + , Flush (..) + + -- * Newtype wrappers + -- ** ZipSource + , ZipSource (..) + , sequenceSources + + -- ** ZipSink + , ZipSink (..) + , sequenceSinks + + -- ** ZipConduit + , ZipConduit (..) + , sequenceConduits + ) where + +import Data.Conduit.Internal.Conduit +import Data.Void (Void) +import Data.Functor.Identity (Identity, runIdentity) +import Control.Monad.Trans.Resource (ResourceT, runResourceT) +import Control.Monad.Trans.Control (MonadBaseControl) + +-- | Named function synonym for '$$'. +-- +-- Since 1.2.3 +connect :: Monad m => Source m a -> Sink a m b -> m b +connect = ($$) + +-- | Named function synonym for '=$='. +-- +-- Since 1.2.3 +fuse :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r +fuse = (=$=) + +infixr 2 .| +-- | Combine two @Conduit@s together into a new @Conduit@ (aka 'fuse'). +-- +-- Output from the upstream (left) conduit will be fed into the +-- downstream (right) conduit. Processing will terminate when +-- downstream (right) returns. Leftover data returned from the right +-- @Conduit@ will be discarded. +-- +-- @since 1.2.8 +(.|) :: Monad m + => ConduitM a b m () -- ^ upstream + -> ConduitM b c m r -- ^ downstream + -> ConduitM a c m r +(.|) = fuse +{-# INLINE (.|) #-} + +-- | Run a pure pipeline until processing completes, i.e. a pipeline +-- with @Identity@ as the base monad. This is equivalient to +-- @runIdentity . runConduit@. +-- +-- @since 1.2.8 +runConduitPure :: ConduitM () Void Identity r -> r +runConduitPure = runIdentity . runConduit +{-# INLINE runConduitPure #-} + +-- | Run a pipeline which acquires resources with @ResourceT@, and +-- then run the @ResourceT@ transformer. This is equivalent to +-- @runResourceT . runConduit@. +-- +-- @since 1.2.8 +runConduitRes :: MonadBaseControl IO m + => ConduitM () Void (ResourceT m) r + -> m r +runConduitRes = runResourceT . runConduit +{-# INLINE runConduitRes #-} diff --git a/Data/Conduit/Internal.hs b/Data/Conduit/Internal.hs new file mode 100644 index 0000000..002ef44 --- /dev/null +++ b/Data/Conduit/Internal.hs @@ -0,0 +1,18 @@ +{-# LANGUAGE Safe #-} +{-# OPTIONS_HADDOCK not-home #-} +module Data.Conduit.Internal + ( -- * Pipe + module Data.Conduit.Internal.Pipe + -- * Conduit + , module Data.Conduit.Internal.Conduit + -- * Fusion (highly experimental!!!) + , module Data.Conduit.Internal.Fusion + ) where + +import Data.Conduit.Internal.Conduit hiding (addCleanup, await, + awaitForever, bracketP, + leftover, mapInput, mapOutput, + mapOutputMaybe, transPipe, + yield, yieldM, yieldOr) +import Data.Conduit.Internal.Pipe +import Data.Conduit.Internal.Fusion diff --git a/Data/Conduit/Internal/Conduit.hs b/Data/Conduit/Internal/Conduit.hs new file mode 100644 index 0000000..c41dd62 --- /dev/null +++ b/Data/Conduit/Internal/Conduit.hs @@ -0,0 +1,1315 @@ +{-# OPTIONS_HADDOCK not-home #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE Trustworthy #-} +{-# LANGUAGE TypeFamilies #-} +module Data.Conduit.Internal.Conduit + ( -- ** Types + ConduitM (..) + , Source + , Producer + , Sink + , Consumer + , Conduit + , ResumableSource (..) + , ResumableConduit (..) + , Flush (..) + -- *** Newtype wrappers + , ZipSource (..) + , ZipSink (..) + , ZipConduit (..) + -- ** Primitives + , await + , awaitForever + , yield + , yieldM + , yieldOr + , leftover + , runConduit + -- ** Composition + , connectResume + , connectResumeConduit + , fuseLeftovers + , fuseReturnLeftovers + , ($$+) + , ($$++) + , ($$+-) + , ($=+) + , (=$$+) + , (=$$++) + , (=$$+-) + , ($$) + , ($=) + , (=$) + , (=$=) + -- ** Generalizing + , sourceToPipe + , sinkToPipe + , conduitToPipe + , toProducer + , toConsumer + -- ** Cleanup + , bracketP + , addCleanup + -- ** Exceptions + , catchC + , handleC + , tryC + -- ** Utilities + , Data.Conduit.Internal.Conduit.transPipe + , Data.Conduit.Internal.Conduit.mapOutput + , Data.Conduit.Internal.Conduit.mapOutputMaybe + , Data.Conduit.Internal.Conduit.mapInput + , Data.Conduit.Internal.Conduit.closeResumableSource + , unwrapResumable + , unwrapResumableConduit + , newResumableSource + , newResumableConduit + , zipSinks + , zipSources + , zipSourcesApp + , zipConduitApp + , mergeSource + , passthroughSink + , sourceToList + , fuseBoth + , fuseBothMaybe + , fuseUpstream + , sequenceSources + , sequenceSinks + , sequenceConduits + ) where + +import Prelude hiding (catch) +import Control.Applicative (Applicative (..)) +import Control.Exception.Lifted as E (Exception) +import qualified Control.Exception.Lifted as E (catch) +import Control.Monad (liftM, when, liftM2, ap) +import Control.Monad.Error.Class(MonadError(..)) +import Control.Monad.Reader.Class(MonadReader(..)) +import Control.Monad.RWS.Class(MonadRWS()) +import Control.Monad.Writer.Class(MonadWriter(..), censor) +import Control.Monad.State.Class(MonadState(..)) +import Control.Monad.Trans.Class (MonadTrans (lift)) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad.Base (MonadBase (liftBase)) +import Control.Monad.Primitive (PrimMonad, PrimState, primitive) +import Data.Void (Void, absurd) +import Data.Monoid (Monoid (mappend, mempty)) +import Control.Monad.Trans.Resource +import qualified Data.IORef as I +import Control.Monad.Morph (MFunctor (..)) +import Data.Conduit.Internal.Pipe hiding (yield, mapOutput, leftover, yieldM, yieldOr, await, awaitForever, addCleanup, bracketP) +import qualified Data.Conduit.Internal.Pipe as CI +import Control.Monad (forever) +import Data.Traversable (Traversable (..)) +import Control.Monad.Catch (MonadCatch, catch) + +-- | Core datatype of the conduit package. This type represents a general +-- component which can consume a stream of input values @i@, produce a stream +-- of output values @o@, perform actions in the @m@ monad, and produce a final +-- result @r@. The type synonyms provided here are simply wrappers around this +-- type. +-- +-- Since 1.0.0 +newtype ConduitM i o m r = ConduitM + { unConduitM :: forall b. + (r -> Pipe i i o () m b) -> Pipe i i o () m b + } + +instance Functor (ConduitM i o m) where + fmap f (ConduitM c) = ConduitM $ \rest -> c (rest . f) + +instance Applicative (ConduitM i o m) where + pure x = ConduitM ($ x) + {-# INLINE pure #-} + (<*>) = ap + {-# INLINE (<*>) #-} + +instance Monad (ConduitM i o m) where + return = pure + ConduitM f >>= g = ConduitM $ \h -> f $ \a -> unConduitM (g a) h + +instance MonadThrow m => MonadThrow (ConduitM i o m) where + throwM = lift . throwM + +instance MFunctor (ConduitM i o) where + hoist f (ConduitM c0) = ConduitM $ \rest -> let + go (HaveOutput p c o) = HaveOutput (go p) (f c) o + go (NeedInput p c) = NeedInput (go . p) (go . c) + go (Done r) = rest r + go (PipeM mp) = + PipeM (f $ liftM go $ collapse mp) + where + -- Combine a series of monadic actions into a single action. Since we + -- throw away side effects between different actions, an arbitrary break + -- between actions will lead to a violation of the monad transformer laws. + -- Example available at: + -- + -- http://hpaste.org/75520 + collapse mpipe = do + pipe' <- mpipe + case pipe' of + PipeM mpipe' -> collapse mpipe' + _ -> return pipe' + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) + +instance MonadCatch m => MonadCatch (ConduitM i o m) where + catch (ConduitM p0) onErr = ConduitM $ \rest -> let + go (Done r) = rest r + go (PipeM mp) = PipeM $ catch (liftM go mp) (return . flip unConduitM rest . onErr) + go (Leftover p i) = Leftover (go p) i + go (NeedInput x y) = NeedInput (go . x) (go . y) + go (HaveOutput p c o) = HaveOutput (go p) c o + in go (p0 Done) + +instance MonadIO m => MonadIO (ConduitM i o m) where + liftIO = lift . liftIO + {-# INLINE liftIO #-} + +instance MonadReader r m => MonadReader r (ConduitM i o m) where + ask = lift ask + {-# INLINE ask #-} + + local f (ConduitM c0) = ConduitM $ \rest -> + let go (HaveOutput p c o) = HaveOutput (go p) c o + go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u)) + go (Done x) = rest x + go (PipeM mp) = PipeM (liftM go $ local f mp) + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) + +#ifndef MIN_VERSION_mtl +#define MIN_VERSION_mtl(x, y, z) 0 +#endif + +instance MonadWriter w m => MonadWriter w (ConduitM i o m) where +#if MIN_VERSION_mtl(2, 1, 0) + writer = lift . writer +#endif + tell = lift . tell + + listen (ConduitM c0) = ConduitM $ \rest -> + let go front (HaveOutput p c o) = HaveOutput (go front p) c o + go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u)) + go front (Done x) = rest (x, front) + go front (PipeM mp) = PipeM $ do + (p,w) <- listen mp + return $ go (front `mappend` w) p + go front (Leftover p i) = Leftover (go front p) i + in go mempty (c0 Done) + + pass (ConduitM c0) = ConduitM $ \rest -> + let go front (HaveOutput p c o) = HaveOutput (go front p) c o + go front (NeedInput p c) = NeedInput (\i -> go front (p i)) (\u -> go front (c u)) + go front (PipeM mp) = PipeM $ do + (p,w) <- censor (const mempty) (listen mp) + return $ go (front `mappend` w) p + go front (Done (x,f)) = PipeM $ do + tell (f front) + return $ rest x + go front (Leftover p i) = Leftover (go front p) i + in go mempty (c0 Done) + +instance MonadState s m => MonadState s (ConduitM i o m) where + get = lift get + put = lift . put +#if MIN_VERSION_mtl(2, 1, 0) + state = lift . state +#endif + +instance MonadRWS r w s m => MonadRWS r w s (ConduitM i o m) + +instance MonadError e m => MonadError e (ConduitM i o m) where + throwError = lift . throwError + catchError (ConduitM c0) f = ConduitM $ \rest -> + let go (HaveOutput p c o) = HaveOutput (go p) c o + go (NeedInput p c) = NeedInput (\i -> go (p i)) (\u -> go (c u)) + go (Done x) = rest x + go (PipeM mp) = + PipeM $ catchError (liftM go mp) $ \e -> do + return $ unConduitM (f e) rest + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) + +instance MonadBase base m => MonadBase base (ConduitM i o m) where + liftBase = lift . liftBase + {-# INLINE liftBase #-} + +instance MonadTrans (ConduitM i o) where + lift mr = ConduitM $ \rest -> PipeM (liftM rest mr) + {-# INLINE [1] lift #-} + +instance MonadResource m => MonadResource (ConduitM i o m) where + liftResourceT = lift . liftResourceT + {-# INLINE liftResourceT #-} + +instance Monad m => Monoid (ConduitM i o m ()) where + mempty = return () + {-# INLINE mempty #-} + mappend = (>>) + {-# INLINE mappend #-} + +instance PrimMonad m => PrimMonad (ConduitM i o m) where + type PrimState (ConduitM i o m) = PrimState m + primitive = lift . primitive + +-- | Provides a stream of output values, without consuming any input or +-- producing a final result. +-- +-- Since 0.5.0 +type Source m o = ConduitM () o m () + +-- | A component which produces a stream of output values, regardless of the +-- input stream. A @Producer@ is a generalization of a @Source@, and can be +-- used as either a @Source@ or a @Conduit@. +-- +-- Since 1.0.0 +type Producer m o = forall i. ConduitM i o m () + +-- | Consumes a stream of input values and produces a final result, without +-- producing any output. +-- +-- > type Sink i m r = ConduitM i Void m r +-- +-- Since 0.5.0 +type Sink i = ConduitM i Void + +-- | A component which consumes a stream of input values and produces a final +-- result, regardless of the output stream. A @Consumer@ is a generalization of +-- a @Sink@, and can be used as either a @Sink@ or a @Conduit@. +-- +-- Since 1.0.0 +type Consumer i m r = forall o. ConduitM i o m r + +-- | Consumes a stream of input values and produces a stream of output values, +-- without producing a final result. +-- +-- Since 0.5.0 +type Conduit i m o = ConduitM i o m () + +-- | A @Source@ which has been started, but has not yet completed. +-- +-- This type contains both the current state of the @Source@, and the finalizer +-- to be run to close it. +-- +-- Since 0.5.0 +data ResumableSource m o = ResumableSource (Pipe () () o () m ()) (m ()) + +-- | Since 1.0.13 +instance MFunctor ResumableSource where + hoist nat (ResumableSource src m) = ResumableSource (hoist nat src) (nat m) + +-- | Connect a @Source@ to a @Sink@ until the latter closes. Returns both the +-- most recent state of the @Source@ and the result of the @Sink@. +-- +-- We use a @ResumableSource@ to keep track of the most recent finalizer +-- provided by the @Source@. +-- +-- Since 0.5.0 +connectResume :: Monad m + => ResumableSource m o + -> Sink o m r + -> m (ResumableSource m o, r) +connectResume (ResumableSource left0 leftFinal0) (ConduitM right0) = + goRight leftFinal0 left0 (right0 Done) + where + goRight leftFinal left right = + case right of + HaveOutput _ _ o -> absurd o + NeedInput rp rc -> goLeft rp rc leftFinal left + Done r2 -> return (ResumableSource left leftFinal, r2) + PipeM mp -> mp >>= goRight leftFinal left + Leftover p i -> goRight leftFinal (HaveOutput left leftFinal i) p + + goLeft rp rc leftFinal left = + case left of + HaveOutput left' leftFinal' o -> goRight leftFinal' left' (rp o) + NeedInput _ lc -> recurse (lc ()) + Done () -> goRight (return ()) (Done ()) (rc ()) + PipeM mp -> mp >>= recurse + Leftover p () -> recurse p + where + recurse = goLeft rp rc leftFinal + +sourceToPipe :: Monad m => Source m o -> Pipe l i o u m () +sourceToPipe = + go . flip unConduitM Done + where + go (HaveOutput p c o) = HaveOutput (go p) c o + go (NeedInput _ c) = go $ c () + go (Done ()) = Done () + go (PipeM mp) = PipeM (liftM go mp) + go (Leftover p ()) = go p + +sinkToPipe :: Monad m => Sink i m r -> Pipe l i o u m r +sinkToPipe = + go . injectLeftovers . flip unConduitM Done + where + go (HaveOutput _ _ o) = absurd o + go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ()) + go (Done r) = Done r + go (PipeM mp) = PipeM (liftM go mp) + go (Leftover _ l) = absurd l + +conduitToPipe :: Monad m => Conduit i m o -> Pipe l i o u m () +conduitToPipe = + go . injectLeftovers . flip unConduitM Done + where + go (HaveOutput p c o) = HaveOutput (go p) c o + go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ()) + go (Done ()) = Done () + go (PipeM mp) = PipeM (liftM go mp) + go (Leftover _ l) = absurd l + +-- | Unwraps a @ResumableSource@ into a @Source@ and a finalizer. +-- +-- A @ResumableSource@ represents a @Source@ which has already been run, and +-- therefore has a finalizer registered. As a result, if we want to turn it +-- into a regular @Source@, we need to ensure that the finalizer will be run +-- appropriately. By appropriately, I mean: +-- +-- * If a new finalizer is registered, the old one should not be called. +-- +-- * If the old one is called, it should not be called again. +-- +-- This function returns both a @Source@ and a finalizer which ensures that the +-- above two conditions hold. Once you call that finalizer, the @Source@ is +-- invalidated and cannot be used. +-- +-- Since 0.5.2 +unwrapResumable :: MonadIO m => ResumableSource m o -> m (Source m o, m ()) +unwrapResumable (ResumableSource src final) = do + ref <- liftIO $ I.newIORef True + let final' = do + x <- liftIO $ I.readIORef ref + when x final + return (liftIO (I.writeIORef ref False) >> (ConduitM (src >>=)), final') + +-- | Turn a @Source@ into a @ResumableSource@ with no attached finalizer. +-- +-- Since 1.1.4 +newResumableSource :: Monad m => Source m o -> ResumableSource m o +newResumableSource (ConduitM s) = ResumableSource (s Done) (return ()) + +-- | Generalize a 'Source' to a 'Producer'. +-- +-- Since 1.0.0 +toProducer :: Monad m => Source m a -> Producer m a +toProducer (ConduitM c0) = ConduitM $ \rest -> let + go (HaveOutput p c o) = HaveOutput (go p) c o + go (NeedInput _ c) = go (c ()) + go (Done r) = rest r + go (PipeM mp) = PipeM (liftM go mp) + go (Leftover p ()) = go p + in go (c0 Done) + +-- | Generalize a 'Sink' to a 'Consumer'. +-- +-- Since 1.0.0 +toConsumer :: Monad m => Sink a m b -> Consumer a m b +toConsumer (ConduitM c0) = ConduitM $ \rest -> let + go (HaveOutput _ _ o) = absurd o + go (NeedInput p c) = NeedInput (go . p) (go . c) + go (Done r) = rest r + go (PipeM mp) = PipeM (liftM go mp) + go (Leftover p l) = Leftover (go p) l + in go (c0 Done) + +-- | Catch all exceptions thrown by the current component of the pipeline. +-- +-- Note: this will /not/ catch exceptions thrown by other components! For +-- example, if an exception is thrown in a @Source@ feeding to a @Sink@, and +-- the @Sink@ uses @catchC@, the exception will /not/ be caught. +-- +-- Due to this behavior (as well as lack of async exception safety), you +-- should not try to implement combinators such as @onException@ in terms of this +-- primitive function. +-- +-- Note also that the exception handling will /not/ be applied to any +-- finalizers generated by this conduit. +-- +-- Since 1.0.11 +catchC :: (MonadBaseControl IO m, Exception e) + => ConduitM i o m r + -> (e -> ConduitM i o m r) + -> ConduitM i o m r +catchC (ConduitM p0) onErr = ConduitM $ \rest -> let + go (Done r) = rest r + go (PipeM mp) = PipeM $ E.catch (liftM go mp) + (return . flip unConduitM rest . onErr) + go (Leftover p i) = Leftover (go p) i + go (NeedInput x y) = NeedInput (go . x) (go . y) + go (HaveOutput p c o) = HaveOutput (go p) c o + in go (p0 Done) +{-# INLINE catchC #-} + +-- | The same as @flip catchC@. +-- +-- Since 1.0.11 +handleC :: (MonadBaseControl IO m, Exception e) + => (e -> ConduitM i o m r) + -> ConduitM i o m r + -> ConduitM i o m r +handleC = flip catchC +{-# INLINE handleC #-} + +-- | A version of @try@ for use within a pipeline. See the comments in @catchC@ +-- for more details. +-- +-- Since 1.0.11 +tryC :: (MonadBaseControl IO m, Exception e) + => ConduitM i o m r + -> ConduitM i o m (Either e r) +tryC (ConduitM c0) = ConduitM $ \rest -> let + go (Done r) = rest (Right r) + go (PipeM mp) = PipeM $ E.catch (liftM go mp) (return . rest . Left) + go (Leftover p i) = Leftover (go p) i + go (NeedInput x y) = NeedInput (go . x) (go . y) + go (HaveOutput p c o) = HaveOutput (go p) c o + in go (c0 Done) +{-# INLINE tryC #-} + +-- | Combines two sinks. The new sink will complete when both input sinks have +-- completed. +-- +-- Any leftovers are discarded. +-- +-- Since 0.4.1 +zipSinks :: Monad m => Sink i m r -> Sink i m r' -> Sink i m (r, r') +zipSinks (ConduitM x0) (ConduitM y0) = ConduitM $ \rest -> let + Leftover _ i >< _ = absurd i + _ >< Leftover _ i = absurd i + HaveOutput _ _ o >< _ = absurd o + _ >< HaveOutput _ _ o = absurd o + + PipeM mx >< y = PipeM (liftM (>< y) mx) + x >< PipeM my = PipeM (liftM (x ><) my) + Done x >< Done y = rest (x, y) + NeedInput px cx >< NeedInput py cy = NeedInput (\i -> px i >< py i) (\() -> cx () >< cy ()) + NeedInput px cx >< y@Done{} = NeedInput (\i -> px i >< y) (\u -> cx u >< y) + x@Done{} >< NeedInput py cy = NeedInput (\i -> x >< py i) (\u -> x >< cy u) + in injectLeftovers (x0 Done) >< injectLeftovers (y0 Done) + +-- | Combines two sources. The new source will stop producing once either +-- source has been exhausted. +-- +-- Since 1.0.13 +zipSources :: Monad m => Source m a -> Source m b -> Source m (a, b) +zipSources (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let + go (Leftover left ()) right = go left right + go left (Leftover right ()) = go left right + go (Done ()) (Done ()) = rest () + go (Done ()) (HaveOutput _ close _) = PipeM (close >> return (rest ())) + go (HaveOutput _ close _) (Done ()) = PipeM (close >> return (rest ())) + go (Done ()) (PipeM _) = rest () + go (PipeM _) (Done ()) = rest () + go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my) + go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx) + go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my) + go (HaveOutput srcx closex x) (HaveOutput srcy closey y) = HaveOutput (go srcx srcy) (closex >> closey) (x, y) + go (NeedInput _ c) right = go (c ()) right + go left (NeedInput _ c) = go left (c ()) + in go (left0 Done) (right0 Done) + +-- | Combines two sources. The new source will stop producing once either +-- source has been exhausted. +-- +-- Since 1.0.13 +zipSourcesApp :: Monad m => Source m (a -> b) -> Source m a -> Source m b +zipSourcesApp (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let + go (Leftover left ()) right = go left right + go left (Leftover right ()) = go left right + go (Done ()) (Done ()) = rest () + go (Done ()) (HaveOutput _ close _) = PipeM (close >> return (rest ())) + go (HaveOutput _ close _) (Done ()) = PipeM (close >> return (rest ())) + go (Done ()) (PipeM _) = rest () + go (PipeM _) (Done ()) = rest () + go (PipeM mx) (PipeM my) = PipeM (liftM2 go mx my) + go (PipeM mx) y@HaveOutput{} = PipeM (liftM (\x -> go x y) mx) + go x@HaveOutput{} (PipeM my) = PipeM (liftM (go x) my) + go (HaveOutput srcx closex x) (HaveOutput srcy closey y) = HaveOutput (go srcx srcy) (closex >> closey) (x y) + go (NeedInput _ c) right = go (c ()) right + go left (NeedInput _ c) = go left (c ()) + in go (left0 Done) (right0 Done) + +-- | +-- +-- Since 1.0.17 +zipConduitApp + :: Monad m + => ConduitM i o m (x -> y) + -> ConduitM i o m x + -> ConduitM i o m y +zipConduitApp (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let + go _ _ (Done f) (Done x) = rest (f x) + go finalX finalY (PipeM mx) y = PipeM (flip (go finalX finalY) y `liftM` mx) + go finalX finalY x (PipeM my) = PipeM (go finalX finalY x `liftM` my) + go _ finalY (HaveOutput x finalX o) y = HaveOutput + (go finalX finalY x y) + (finalX >> finalY) + o + go finalX _ x (HaveOutput y finalY o) = HaveOutput + (go finalX finalY x y) + (finalX >> finalY) + o + go _ _ (Leftover _ i) _ = absurd i + go _ _ _ (Leftover _ i) = absurd i + go finalX finalY (NeedInput px cx) (NeedInput py cy) = NeedInput + (\i -> go finalX finalY (px i) (py i)) + (\u -> go finalX finalY (cx u) (cy u)) + go finalX finalY (NeedInput px cx) (Done y) = NeedInput + (\i -> go finalX finalY (px i) (Done y)) + (\u -> go finalX finalY (cx u) (Done y)) + go finalX finalY (Done x) (NeedInput py cy) = NeedInput + (\i -> go finalX finalY (Done x) (py i)) + (\u -> go finalX finalY (Done x) (cy u)) + in go (return ()) (return ()) (injectLeftovers $ left0 Done) (injectLeftovers $ right0 Done) + +-- | Same as normal fusion (e.g. @=$=@), except instead of discarding leftovers +-- from the downstream component, return them. +-- +-- Since 1.0.17 +fuseReturnLeftovers :: Monad m + => ConduitM a b m () + -> ConduitM b c m r + -> ConduitM a c m (r, [b]) +fuseReturnLeftovers (ConduitM left0) (ConduitM right0) = ConduitM $ \rest -> let + goRight final bs left right = + case right of + HaveOutput p c o -> HaveOutput (recurse p) (c >> final) o + NeedInput rp rc -> + case bs of + [] -> goLeft rp rc final left + b:bs' -> goRight final bs' left (rp b) + Done r2 -> PipeM (final >> return (rest (r2, bs))) + PipeM mp -> PipeM (liftM recurse mp) + Leftover p b -> goRight final (b:bs) left p + where + recurse = goRight final bs left + + goLeft rp rc final left = + case left of + HaveOutput left' final' o -> goRight final' [] left' (rp o) + NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) + Done r1 -> goRight (return ()) [] (Done r1) (rc r1) + PipeM mp -> PipeM (liftM recurse mp) + Leftover left' i -> Leftover (recurse left') i + where + recurse = goLeft rp rc final + in goRight (return ()) [] (left0 Done) (right0 Done) + +-- | Similar to @fuseReturnLeftovers@, but use the provided function to convert +-- downstream leftovers to upstream leftovers. +-- +-- Since 1.0.17 +fuseLeftovers + :: Monad m + => ([b] -> [a]) + -> ConduitM a b m () + -> ConduitM b c m r + -> ConduitM a c m r +fuseLeftovers f left right = do + (r, bs) <- fuseReturnLeftovers left right + mapM_ leftover $ reverse $ f bs + return r + +-- | A generalization of 'ResumableSource'. Allows to resume an arbitrary +-- conduit, keeping its state and using it later (or finalizing it). +-- +-- Since 1.0.17 +data ResumableConduit i m o = + ResumableConduit (Pipe i i o () m ()) (m ()) + +-- | Connect a 'ResumableConduit' to a sink and return the output of the sink +-- together with a new 'ResumableConduit'. +-- +-- Since 1.0.17 +connectResumeConduit + :: Monad m + => ResumableConduit i m o + -> Sink o m r + -> Sink i m (ResumableConduit i m o, r) +connectResumeConduit (ResumableConduit left0 leftFinal0) (ConduitM right0) = ConduitM $ \rest -> let + goRight leftFinal left right = + case right of + HaveOutput _ _ o -> absurd o + NeedInput rp rc -> goLeft rp rc leftFinal left + Done r2 -> rest (ResumableConduit left leftFinal, r2) + PipeM mp -> PipeM (liftM (goRight leftFinal left) mp) + Leftover p i -> goRight leftFinal (HaveOutput left leftFinal i) p + + goLeft rp rc leftFinal left = + case left of + HaveOutput left' leftFinal' o -> goRight leftFinal' left' (rp o) + NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) + Done () -> goRight (return ()) (Done ()) (rc ()) + PipeM mp -> PipeM (liftM recurse mp) + Leftover left' i -> Leftover (recurse left') i -- recurse p + where + recurse = goLeft rp rc leftFinal + in goRight leftFinal0 left0 (right0 Done) + +-- | Unwraps a @ResumableConduit@ into a @Conduit@ and a finalizer. +-- +-- Since 'unwrapResumable' for more information. +-- +-- Since 1.0.17 +unwrapResumableConduit :: MonadIO m => ResumableConduit i m o -> m (Conduit i m o, m ()) +unwrapResumableConduit (ResumableConduit src final) = do + ref <- liftIO $ I.newIORef True + let final' = do + x <- liftIO $ I.readIORef ref + when x final + return (ConduitM ((liftIO (I.writeIORef ref False) >> src) >>=), final') + +-- | Turn a @Conduit@ into a @ResumableConduit@ with no attached finalizer. +-- +-- Since 1.1.4 +newResumableConduit :: Monad m => Conduit i m o -> ResumableConduit i m o +newResumableConduit (ConduitM c) = ResumableConduit (c Done) (return ()) + + +-- | Merge a @Source@ into a @Conduit@. +-- The new conduit will stop processing once either source or upstream have been exhausted. +mergeSource + :: Monad m + => Source m i + -> Conduit a m (i, a) +mergeSource = loop . newResumableSource + where + loop :: Monad m => ResumableSource m i -> Conduit a m (i, a) + loop src0 = await >>= maybe (lift $ closeResumableSource src0) go + where + go a = do + (src1, mi) <- lift $ src0 $$++ await + case mi of + Nothing -> lift $ closeResumableSource src1 + Just i -> yield (i, a) >> loop src1 + + +-- | Turn a @Sink@ into a @Conduit@ in the following way: +-- +-- * All input passed to the @Sink@ is yielded downstream. +-- +-- * When the @Sink@ finishes processing, the result is passed to the provided to the finalizer function. +-- +-- Note that the @Sink@ will stop receiving input as soon as the downstream it +-- is connected to shuts down. +-- +-- An example usage would be to write the result of a @Sink@ to some mutable +-- variable while allowing other processing to continue. +-- +-- Since 1.1.0 +passthroughSink :: Monad m + => Sink i m r + -> (r -> m ()) -- ^ finalizer + -> Conduit i m i +passthroughSink (ConduitM sink0) final = ConduitM $ \rest -> let + -- A bit of explanation is in order, this function is + -- non-obvious. The purpose of go is to keep track of the sink + -- we're passing values to, and then yield values downstream. The + -- third argument to go is the current state of that sink. That's + -- relatively straightforward. + -- + -- The second value is the leftover buffer. These are values that + -- the sink itself has called leftover on, and must be provided + -- back to the sink the next time it awaits. _However_, these + -- values should _not_ be reyielded downstream: we have already + -- yielded them downstream ourself, and it is the responsibility + -- of the functions wrapping around passthroughSink to handle the + -- leftovers from downstream. + -- + -- The trickiest bit is the first argument, which is a solution to + -- bug https://github.com/snoyberg/conduit/issues/304. The issue + -- is that, once we get a value, we need to provide it to both the + -- inner sink _and_ yield it downstream. The obvious thing to do + -- is yield first and then recursively call go. Unfortunately, + -- this doesn't work in all cases: if the downstream component + -- never calls await again, our yield call will never return, and + -- our sink will not get the last value. This results is confusing + -- behavior where the sink and downstream component receive a + -- different number of values. + -- + -- Solution: keep a buffer of the next value to yield downstream, + -- and only yield it downstream in one of two cases: our sink is + -- asking for another value, or our sink is done. This way, we + -- ensure that, in all cases, we pass exactly the same number of + -- values to the inner sink as to downstream. + + go mbuf _ (Done r) = do + maybe (return ()) CI.yield mbuf + lift $ final r + unConduitM (awaitForever yield) rest + go mbuf is (Leftover sink i) = go mbuf (i:is) sink + go _ _ (HaveOutput _ _ o) = absurd o + go mbuf is (PipeM mx) = do + x <- lift mx + go mbuf is x + go mbuf (i:is) (NeedInput next _) = go mbuf is (next i) + go mbuf [] (NeedInput next done) = do + maybe (return ()) CI.yield mbuf + mx <- CI.await + case mx of + Nothing -> go Nothing [] (done ()) + Just x -> go (Just x) [] (next x) + in go Nothing [] (sink0 Done) + +-- | Convert a @Source@ into a list. The basic functionality can be explained as: +-- +-- > sourceToList src = src $$ Data.Conduit.List.consume +-- +-- However, @sourceToList@ is able to produce its results lazily, which cannot +-- be done when running a conduit pipeline in general. Unlike the +-- @Data.Conduit.Lazy@ module (in conduit-extra), this function performs no +-- unsafe I\/O operations, and therefore can only be as lazily as the +-- underlying monad. +-- +-- Since 1.2.6 +sourceToList :: Monad m => Source m a -> m [a] +sourceToList = + go . flip unConduitM Done + where + go (Done _) = return [] + go (HaveOutput src _ x) = liftM (x:) (go src) + go (PipeM msrc) = msrc >>= go + go (NeedInput _ c) = go (c ()) + go (Leftover p _) = go p + +-- Define fixity of all our operators +infixr 0 $$ +infixl 1 $= +infixr 2 =$ +infixr 2 =$= +infixr 0 $$+ +infixr 0 $$++ +infixr 0 $$+- +infixl 1 $=+ + +-- | The connect operator, which pulls data from a source and pushes to a sink. +-- If you would like to keep the @Source@ open to be used for other +-- operations, use the connect-and-resume operator '$$+'. +-- +-- Since 0.4.0 +($$) :: Monad m => Source m a -> Sink a m b -> m b +src $$ sink = do + (rsrc, res) <- src $$+ sink + rsrc $$+- return () + return res +{-# INLINE [1] ($$) #-} + +-- | A synonym for '=$=' for backwards compatibility. +-- +-- Since 0.4.0 +($=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r +($=) = (=$=) +{-# INLINE [0] ($=) #-} +{-# RULES "conduit: $= is =$=" ($=) = (=$=) #-} + +-- | A synonym for '=$=' for backwards compatibility. +-- +-- Since 0.4.0 +(=$) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r +(=$) = (=$=) +{-# INLINE [0] (=$) #-} +{-# RULES "conduit: =$ is =$=" (=$) = (=$=) #-} + +-- | Fusion operator, combining two @Conduit@s together into a new @Conduit@. +-- +-- Both @Conduit@s will be closed when the newly-created @Conduit@ is closed. +-- +-- Leftover data returned from the right @Conduit@ will be discarded. +-- +-- Since 0.4.0 +(=$=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r +ConduitM left0 =$= ConduitM right0 = ConduitM $ \rest -> + let goRight final left right = + case right of + HaveOutput p c o -> HaveOutput (recurse p) (c >> final) o + NeedInput rp rc -> goLeft rp rc final left + Done r2 -> PipeM (final >> return (rest r2)) + PipeM mp -> PipeM (liftM recurse mp) + Leftover right' i -> goRight final (HaveOutput left final i) right' + where + recurse = goRight final left + + goLeft rp rc final left = + case left of + HaveOutput left' final' o -> goRight final' left' (rp o) + NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) + Done r1 -> goRight (return ()) (Done r1) (rc r1) + PipeM mp -> PipeM (liftM recurse mp) + Leftover left' i -> Leftover (recurse left') i + where + recurse = goLeft rp rc final + in goRight (return ()) (left0 Done) (right0 Done) + where +{-# INLINE [1] (=$=) #-} + +-- | Wait for a single input value from upstream. If no data is available, +-- returns @Nothing@. Once @await@ returns @Nothing@, subsequent calls will +-- also return @Nothing@. +-- +-- Since 0.5.0 +await :: Monad m => Consumer i m (Maybe i) +await = ConduitM $ \f -> NeedInput (f . Just) (const $ f Nothing) +{-# INLINE [0] await #-} + +await' :: Monad m + => ConduitM i o m r + -> (i -> ConduitM i o m r) + -> ConduitM i o m r +await' f g = ConduitM $ \rest -> NeedInput + (\i -> unConduitM (g i) rest) + (const $ unConduitM f rest) +{-# INLINE await' #-} +{-# RULES "conduit: await >>= maybe" forall x y. await >>= maybe x y = await' x y #-} + +-- | Send a value downstream to the next component to consume. If the +-- downstream component terminates, this call will never return control. If you +-- would like to register a cleanup function, please use 'yieldOr' instead. +-- +-- Since 0.5.0 +yield :: Monad m + => o -- ^ output value + -> ConduitM i o m () +yield o = yieldOr o (return ()) +{-# INLINE yield #-} + +-- | Send a monadic value downstream for the next component to consume. +-- +-- @since 1.2.7 +yieldM :: Monad m => m o -> ConduitM i o m () +yieldM mo = lift mo >>= yield +{-# INLINE yieldM #-} + + -- FIXME rule won't fire, see FIXME in .Pipe; "mapM_ yield" mapM_ yield = ConduitM . sourceList + +-- | Provide a single piece of leftover input to be consumed by the next +-- component in the current monadic binding. +-- +-- /Note/: it is highly encouraged to only return leftover values from input +-- already consumed from upstream. +-- +-- @since 0.5.0 +leftover :: i -> ConduitM i o m () +leftover i = ConduitM $ \rest -> Leftover (rest ()) i +{-# INLINE leftover #-} + +-- | Run a pipeline until processing completes. +-- +-- Since 1.2.1 +runConduit :: Monad m => ConduitM () Void m r -> m r +runConduit (ConduitM p) = runPipe $ injectLeftovers $ p Done +{-# INLINE [0] runConduit #-} + +-- | Bracket a conduit computation between allocation and release of a +-- resource. Two guarantees are given about resource finalization: +-- +-- 1. It will be /prompt/. The finalization will be run as early as possible. +-- +-- 2. It is exception safe. Due to usage of @resourcet@, the finalization will +-- be run in the event of any exceptions. +-- +-- Since 0.5.0 +bracketP :: MonadResource m + + => IO a + -- ^ computation to run first (\"acquire resource\") + -> (a -> IO ()) + -- ^ computation to run last (\"release resource\") + -> (a -> ConduitM i o m r) + -- ^ computation to run in-between + -> ConduitM i o m r + -- returns the value from the in-between computation +bracketP alloc free inside = ConduitM $ \rest -> PipeM $ do + (key, seed) <- allocate alloc free + return $ unConduitM (addCleanup (const $ release key) (inside seed)) rest + +-- | Add some code to be run when the given component cleans up. +-- +-- The supplied cleanup function will be given a @True@ if the component ran to +-- completion, or @False@ if it terminated early due to a downstream component +-- terminating. +-- +-- Note that this function is not exception safe. For that, please use +-- 'bracketP'. +-- +-- Since 0.4.1 +addCleanup :: Monad m + => (Bool -> m ()) + -> ConduitM i o m r + -> ConduitM i o m r +addCleanup cleanup (ConduitM c0) = ConduitM $ \rest -> let + go (Done r) = PipeM (cleanup True >> return (rest r)) + go (HaveOutput src close x) = HaveOutput + (go src) + (cleanup False >> close) + x + go (PipeM msrc) = PipeM (liftM (go) msrc) + go (NeedInput p c) = NeedInput + (go . p) + (go . c) + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) + +-- | Similar to 'yield', but additionally takes a finalizer to be run if the +-- downstream component terminates. +-- +-- Since 0.5.0 +yieldOr :: Monad m + => o + -> m () -- ^ finalizer + -> ConduitM i o m () +yieldOr o m = ConduitM $ \rest -> HaveOutput (rest ()) m o +{-# INLINE yieldOr #-} + +-- | Wait for input forever, calling the given inner component for each piece of +-- new input. +-- +-- This function is provided as a convenience for the common pattern of +-- @await@ing input, checking if it's @Just@ and then looping. +-- +-- Since 0.5.0 +awaitForever :: Monad m => (i -> ConduitM i o m r) -> ConduitM i o m () +awaitForever f = ConduitM $ \rest -> + let go = NeedInput (\i -> unConduitM (f i) (const go)) rest + in go + +-- | Transform the monad that a @ConduitM@ lives in. +-- +-- Note that the monad transforming function will be run multiple times, +-- resulting in unintuitive behavior in some cases. For a fuller treatment, +-- please see: +-- +-- +-- +-- This function is just a synonym for 'hoist'. +-- +-- Since 0.4.0 +transPipe :: Monad m => (forall a. m a -> n a) -> ConduitM i o m r -> ConduitM i o n r +transPipe = hoist + +-- | Apply a function to all the output values of a @ConduitM@. +-- +-- This mimics the behavior of `fmap` for a `Source` and `Conduit` in pre-0.4 +-- days. It can also be simulated by fusing with the @map@ conduit from +-- "Data.Conduit.List". +-- +-- Since 0.4.1 +mapOutput :: Monad m => (o1 -> o2) -> ConduitM i o1 m r -> ConduitM i o2 m r +mapOutput f (ConduitM c0) = ConduitM $ \rest -> let + go (HaveOutput p c o) = HaveOutput (go p) c (f o) + go (NeedInput p c) = NeedInput (go . p) (go . c) + go (Done r) = rest r + go (PipeM mp) = PipeM (liftM (go) mp) + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) + +-- | Same as 'mapOutput', but use a function that returns @Maybe@ values. +-- +-- Since 0.5.0 +mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> ConduitM i o1 m r -> ConduitM i o2 m r +mapOutputMaybe f (ConduitM c0) = ConduitM $ \rest -> let + go (HaveOutput p c o) = maybe id (\o' p' -> HaveOutput p' c o') (f o) (go p) + go (NeedInput p c) = NeedInput (go . p) (go . c) + go (Done r) = rest r + go (PipeM mp) = PipeM (liftM (go) mp) + go (Leftover p i) = Leftover (go p) i + in go (c0 Done) + +-- | Apply a function to all the input values of a @ConduitM@. +-- +-- Since 0.5.0 +mapInput :: Monad m + => (i1 -> i2) -- ^ map initial input to new input + -> (i2 -> Maybe i1) -- ^ map new leftovers to initial leftovers + -> ConduitM i2 o m r + -> ConduitM i1 o m r +mapInput f f' (ConduitM c0) = ConduitM $ \rest -> let + go (HaveOutput p c o) = HaveOutput (go p) c o + go (NeedInput p c) = NeedInput (go . p . f) (go . c) + go (Done r) = rest r + go (PipeM mp) = PipeM $ liftM go mp + go (Leftover p i) = maybe id (flip Leftover) (f' i) (go p) + in go (c0 Done) + +-- | The connect-and-resume operator. This does not close the @Source@, but +-- instead returns it to be used again. This allows a @Source@ to be used +-- incrementally in a large program, without forcing the entire program to live +-- in the @Sink@ monad. +-- +-- Mnemonic: connect + do more. +-- +-- Since 0.5.0 +($$+) :: Monad m => Source m a -> Sink a m b -> m (ResumableSource m a, b) +ConduitM src $$+ sink = + connectResume (ResumableSource (src Done) (return ())) sink +{-# INLINE ($$+) #-} + +-- | Continue processing after usage of @$$+@. +-- +-- Since 0.5.0 +($$++) :: Monad m => ResumableSource m a -> Sink a m b -> m (ResumableSource m a, b) +($$++) = connectResume +{-# INLINE ($$++) #-} + +-- | Complete processing of a @ResumableSource@. This will run the finalizer +-- associated with the @ResumableSource@. In order to guarantee process resource +-- finalization, you /must/ use this operator after using @$$+@ and @$$++@. +-- +-- Since 0.5.0 +($$+-) :: Monad m => ResumableSource m a -> Sink a m b -> m b +rsrc $$+- sink = do + (ResumableSource _ final, res) <- connectResume rsrc sink + final + return res +{-# INLINE ($$+-) #-} + +-- | Left fusion for a resumable source. +-- +-- Since 1.0.16 +($=+) :: Monad m => ResumableSource m a -> Conduit a m b -> ResumableSource m b +ResumableSource src final $=+ ConduitM sink = + ResumableSource (src `pipeL` sink Done) final + +-- | Execute the finalizer associated with a @ResumableSource@, rendering the +-- @ResumableSource@ invalid for further use. +-- +-- This is just a more explicit version of @$$+- return ()@. +-- +-- Since 1.1.3 +closeResumableSource :: Monad m => ResumableSource m a -> m () +closeResumableSource = ($$+- return ()) + +-- | Provide for a stream of data that can be flushed. +-- +-- A number of @Conduit@s (e.g., zlib compression) need the ability to flush +-- the stream at some point. This provides a single wrapper datatype to be used +-- in all such circumstances. +-- +-- Since 0.3.0 +data Flush a = Chunk a | Flush + deriving (Show, Eq, Ord) +instance Functor Flush where + fmap _ Flush = Flush + fmap f (Chunk a) = Chunk (f a) + +-- | A wrapper for defining an 'Applicative' instance for 'Source's which allows +-- to combine sources together, generalizing 'zipSources'. A combined source +-- will take input yielded from each of its @Source@s until any of them stop +-- producing output. +-- +-- Since 1.0.13 +newtype ZipSource m o = ZipSource { getZipSource :: Source m o } + +instance Monad m => Functor (ZipSource m) where + fmap f = ZipSource . mapOutput f . getZipSource +instance Monad m => Applicative (ZipSource m) where + pure = ZipSource . forever . yield + (ZipSource f) <*> (ZipSource x) = ZipSource $ zipSourcesApp f x + +-- | Coalesce all values yielded by all of the @Source@s. +-- +-- Implemented on top of @ZipSource@, see that data type for more details. +-- +-- Since 1.0.13 +sequenceSources :: (Traversable f, Monad m) => f (Source m o) -> Source m (f o) +sequenceSources = getZipSource . sequenceA . fmap ZipSource + +-- | A wrapper for defining an 'Applicative' instance for 'Sink's which allows +-- to combine sinks together, generalizing 'zipSinks'. A combined sink +-- distributes the input to all its participants and when all finish, produces +-- the result. This allows to define functions like +-- +-- @ +-- sequenceSinks :: (Monad m) +-- => [Sink i m r] -> Sink i m [r] +-- sequenceSinks = getZipSink . sequenceA . fmap ZipSink +-- @ +-- +-- Note that the standard 'Applicative' instance for conduits works +-- differently. It feeds one sink with input until it finishes, then switches +-- to another, etc., and at the end combines their results. +-- +-- This newtype is in fact a type constrained version of 'ZipConduit', and has +-- the same behavior. It's presented as a separate type since (1) it +-- historically predates @ZipConduit@, and (2) the type constraining can make +-- your code clearer (and thereby make your error messages more easily +-- understood). +-- +-- Since 1.0.13 +newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r } + +instance Monad m => Functor (ZipSink i m) where + fmap f (ZipSink x) = ZipSink (liftM f x) +instance Monad m => Applicative (ZipSink i m) where + pure = ZipSink . return + (ZipSink f) <*> (ZipSink x) = + ZipSink $ liftM (uncurry ($)) $ zipSinks f x + +-- | Send incoming values to all of the @Sink@ providing, and ultimately +-- coalesce together all return values. +-- +-- Implemented on top of @ZipSink@, see that data type for more details. +-- +-- Since 1.0.13 +sequenceSinks :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r) +sequenceSinks = getZipSink . sequenceA . fmap ZipSink + +-- | The connect-and-resume operator. This does not close the @Conduit@, but +-- instead returns it to be used again. This allows a @Conduit@ to be used +-- incrementally in a large program, without forcing the entire program to live +-- in the @Sink@ monad. +-- +-- Leftover data returned from the @Sink@ will be discarded. +-- +-- Mnemonic: connect + do more. +-- +-- Since 1.0.17 +(=$$+) :: Monad m => Conduit a m b -> Sink b m r -> Sink a m (ResumableConduit a m b, r) +(=$$+) (ConduitM conduit) = connectResumeConduit (ResumableConduit (conduit Done) (return ())) +{-# INLINE (=$$+) #-} + +-- | Continue processing after usage of '=$$+'. Connect a 'ResumableConduit' to +-- a sink and return the output of the sink together with a new +-- 'ResumableConduit'. +-- +-- Since 1.0.17 +(=$$++) :: Monad m => ResumableConduit i m o -> Sink o m r -> Sink i m (ResumableConduit i m o, r) +(=$$++) = connectResumeConduit +{-# INLINE (=$$++) #-} + +-- | Complete processing of a 'ResumableConduit'. This will run the finalizer +-- associated with the @ResumableConduit@. In order to guarantee process +-- resource finalization, you /must/ use this operator after using '=$$+' and +-- '=$$++'. +-- +-- Since 1.0.17 +(=$$+-) :: Monad m => ResumableConduit i m o -> Sink o m r -> Sink i m r +rsrc =$$+- sink = do + (ResumableConduit _ final, res) <- connectResumeConduit rsrc sink + lift final + return res +{-# INLINE (=$$+-) #-} + + +infixr 0 =$$+ +infixr 0 =$$++ +infixr 0 =$$+- + +-- | Provides an alternative @Applicative@ instance for @ConduitM@. In this instance, +-- every incoming value is provided to all @ConduitM@s, and output is coalesced together. +-- Leftovers from individual @ConduitM@s will be used within that component, and then discarded +-- at the end of their computation. Output and finalizers will both be handled in a left-biased manner. +-- +-- As an example, take the following program: +-- +-- @ +-- main :: IO () +-- main = do +-- let src = mapM_ yield [1..3 :: Int] +-- conduit1 = CL.map (+1) +-- conduit2 = CL.concatMap (replicate 2) +-- conduit = getZipConduit $ ZipConduit conduit1 <* ZipConduit conduit2 +-- sink = CL.mapM_ print +-- src $$ conduit =$ sink +-- @ +-- +-- It will produce the output: 2, 1, 1, 3, 2, 2, 4, 3, 3 +-- +-- Since 1.0.17 +newtype ZipConduit i o m r = ZipConduit { getZipConduit :: ConduitM i o m r } + deriving Functor +instance Monad m => Applicative (ZipConduit i o m) where + pure = ZipConduit . pure + ZipConduit left <*> ZipConduit right = ZipConduit (zipConduitApp left right) + +-- | Provide identical input to all of the @Conduit@s and combine their outputs +-- into a single stream. +-- +-- Implemented on top of @ZipConduit@, see that data type for more details. +-- +-- Since 1.0.17 +sequenceConduits :: (Traversable f, Monad m) => f (ConduitM i o m r) -> ConduitM i o m (f r) +sequenceConduits = getZipConduit . sequenceA . fmap ZipConduit + +-- | Fuse two @ConduitM@s together, and provide the return value of both. Note +-- that this will force the entire upstream @ConduitM@ to be run to produce the +-- result value, even if the downstream terminates early. +-- +-- Since 1.1.5 +fuseBoth :: Monad m => ConduitM a b m r1 -> ConduitM b c m r2 -> ConduitM a c m (r1, r2) +fuseBoth (ConduitM up) (ConduitM down) = + ConduitM (pipeL (up Done) (withUpstream $ generalizeUpstream $ down Done) >>=) +{-# INLINE fuseBoth #-} + +-- | Like 'fuseBoth', but does not force consumption of the @Producer@. +-- In the case that the @Producer@ terminates, the result value is +-- provided as a @Just@ value. If it does not terminate, then a +-- @Nothing@ value is returned. +-- +-- One thing to note here is that "termination" here only occurs if the +-- @Producer@ actually yields a @Nothing@ value. For example, with the +-- @Producer@ @mapM_ yield [1..5]@, if five values are requested, the +-- @Producer@ has not yet terminated. Termination only occurs when the +-- sixth value is awaited for and the @Producer@ signals termination. +-- +-- Since 1.2.4 +fuseBothMaybe + :: Monad m + => ConduitM a b m r1 + -> ConduitM b c m r2 + -> ConduitM a c m (Maybe r1, r2) +fuseBothMaybe (ConduitM up) (ConduitM down) = + ConduitM (pipeL (up Done) (go Nothing $ down Done) >>=) + where + go mup (Done r) = Done (mup, r) + go mup (PipeM mp) = PipeM $ liftM (go mup) mp + go mup (HaveOutput p c o) = HaveOutput (go mup p) c o + go _ (NeedInput p c) = NeedInput + (\i -> go Nothing (p i)) + (\u -> go (Just u) (c ())) + go mup (Leftover p i) = Leftover (go mup p) i +{-# INLINABLE fuseBothMaybe #-} + +-- | Same as @fuseBoth@, but ignore the return value from the downstream +-- @Conduit@. Same caveats of forced consumption apply. +-- +-- Since 1.1.5 +fuseUpstream :: Monad m => ConduitM a b m r -> Conduit b m c -> ConduitM a c m r +fuseUpstream up down = fmap fst (fuseBoth up down) +{-# INLINE fuseUpstream #-} + +-- Rewrite rules + +{- FIXME +{-# RULES "conduit: ConduitM: lift x >>= f" forall m f. lift m >>= f = ConduitM (PipeM (liftM (unConduitM . f) m)) #-} +{-# RULES "conduit: ConduitM: lift x >> f" forall m f. lift m >> f = ConduitM (PipeM (liftM (\_ -> unConduitM f) m)) #-} + +{-# RULES "conduit: ConduitM: liftIO x >>= f" forall m (f :: MonadIO m => a -> ConduitM i o m r). liftIO m >>= f = ConduitM (PipeM (liftM (unConduitM . f) (liftIO m))) #-} +{-# RULES "conduit: ConduitM: liftIO x >> f" forall m (f :: MonadIO m => ConduitM i o m r). liftIO m >> f = ConduitM (PipeM (liftM (\_ -> unConduitM f) (liftIO m))) #-} + +{-# RULES "conduit: ConduitM: liftBase x >>= f" forall m (f :: MonadBase b m => a -> ConduitM i o m r). liftBase m >>= f = ConduitM (PipeM (liftM (unConduitM . f) (liftBase m))) #-} +{-# RULES "conduit: ConduitM: liftBase x >> f" forall m (f :: MonadBase b m => ConduitM i o m r). liftBase m >> f = ConduitM (PipeM (liftM (\_ -> unConduitM f) (liftBase m))) #-} + +{-# RULES + "yield o >> p" forall o (p :: ConduitM i o m r). yield o >> p = ConduitM (HaveOutput (unConduitM p) (return ()) o) + ; "yieldOr o c >> p" forall o c (p :: ConduitM i o m r). yieldOr o c >> p = + ConduitM (HaveOutput (unConduitM p) c o) + ; "when yield next" forall b o p. when b (yield o) >> p = + if b then ConduitM (HaveOutput (unConduitM p) (return ()) o) else p + ; "unless yield next" forall b o p. unless b (yield o) >> p = + if b then p else ConduitM (HaveOutput (unConduitM p) (return ()) o) + ; "lift m >>= yield" forall m. lift m >>= yield = yieldM m + #-} +{-# RULES "conduit: leftover l >> p" forall l (p :: ConduitM i o m r). leftover l >> p = + ConduitM (Leftover (unConduitM p) l) #-} + -} diff --git a/Data/Conduit/Internal/Fusion.hs b/Data/Conduit/Internal/Fusion.hs new file mode 100644 index 0000000..cd2b344 --- /dev/null +++ b/Data/Conduit/Internal/Fusion.hs @@ -0,0 +1,213 @@ +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE Trustworthy #-} +module Data.Conduit.Internal.Fusion + ( -- ** Types + Step (..) + , Stream (..) + , ConduitWithStream + , StreamConduitM + , StreamConduit + , StreamSource + , StreamProducer + , StreamSink + , StreamConsumer + -- ** Functions + , streamConduit + , streamSource + , streamSourcePure + , unstream + ) where + +import Data.Conduit.Internal.Conduit +import Data.Conduit.Internal.Pipe (Pipe (..)) +import Data.Functor.Identity (Identity (runIdentity)) +import Data.Void (Void, absurd) + +-- | This is the same as stream fusion\'s Step. Constructors are renamed to +-- avoid confusion with conduit names. +data Step s o r + = Emit s o + | Skip s + | Stop r + deriving Functor + +data Stream m o r = forall s. Stream + (s -> m (Step s o r)) + (m s) + +data ConduitWithStream i o m r = ConduitWithStream + (ConduitM i o m r) + (StreamConduitM i o m r) + +type StreamConduitM i o m r = Stream m i () -> Stream m o r + +type StreamConduit i m o = StreamConduitM i o m () + +type StreamSource m o = StreamConduitM () o m () + +type StreamProducer m o = forall i. StreamConduitM i o m () + +type StreamSink i m r = StreamConduitM i Void m r + +type StreamConsumer i m r = forall o. StreamConduitM i o m r + +unstream :: ConduitWithStream i o m r -> ConduitM i o m r +unstream (ConduitWithStream c _) = c +{-# INLINE [0] unstream #-} + +fuseStream :: Monad m + => ConduitWithStream a b m () + -> ConduitWithStream b c m r + -> ConduitWithStream a c m r +fuseStream (ConduitWithStream a x) (ConduitWithStream b y) = ConduitWithStream (a =$= b) (y . x) +{-# INLINE fuseStream #-} + +{-# RULES "conduit: fuseStream" forall left right. + unstream left =$= unstream right = unstream (fuseStream left right) + #-} + +runStream :: Monad m + => ConduitWithStream () Void m r + -> m r +runStream (ConduitWithStream _ f) = + run $ f $ Stream emptyStep (return ()) + where + emptyStep _ = return $ Stop () + run (Stream step ms0) = + ms0 >>= loop + where + loop s = do + res <- step s + case res of + Stop r -> return r + Skip s' -> loop s' + Emit _ o -> absurd o +{-# INLINE runStream #-} + +{-# RULES "conduit: runStream" forall stream. + runConduit (unstream stream) = runStream stream + #-} + +connectStream :: Monad m + => ConduitWithStream () i m () + -> ConduitWithStream i Void m r + -> m r +connectStream (ConduitWithStream _ stream) (ConduitWithStream _ f) = + run $ f $ stream $ Stream emptyStep (return ()) + where + emptyStep _ = return $ Stop () + run (Stream step ms0) = + ms0 >>= loop + where + loop s = do + res <- step s + case res of + Stop r -> return r + Skip s' -> loop s' + Emit _ o -> absurd o +{-# INLINE connectStream #-} + +{-# RULES "conduit: connectStream" forall left right. + unstream left $$ unstream right = connectStream left right + #-} + +connectStream1 :: Monad m + => ConduitWithStream () i m () + -> ConduitM i Void m r + -> m r +connectStream1 (ConduitWithStream _ fstream) (ConduitM sink0) = + case fstream $ Stream (const $ return $ Stop ()) (return ()) of + Stream step ms0 -> + let loop _ (Done r) _ = return r + loop ls (PipeM mp) s = mp >>= flip (loop ls) s + loop ls (Leftover p l) s = loop (l:ls) p s + loop _ (HaveOutput _ _ o) _ = absurd o + loop (l:ls) (NeedInput p _) s = loop ls (p l) s + loop [] (NeedInput p c) s = do + res <- step s + case res of + Stop () -> loop [] (c ()) s + Skip s' -> loop [] (NeedInput p c) s' + Emit s' i -> loop [] (p i) s' + in ms0 >>= loop [] (sink0 Done) +{-# INLINE connectStream1 #-} + +{-# RULES "conduit: connectStream1" forall left right. + unstream left $$ right = connectStream1 left right + #-} + +{- + +Not only will this rule not fire reliably, but due to finalizers, it can change +behavior unless implemented very carefully. Odds are that the careful +implementation won't be any faster, so leaving this commented out for now. + +connectStream2 :: Monad m + => ConduitM () i m () + -> ConduitWithStream i Void m r + -> m r +connectStream2 (ConduitM src0) (ConduitWithStream _ fstream) = + run $ fstream $ Stream step' $ return (return (), src0 Done) + where + step' (_, Done ()) = return $ Stop () + {-# INLINE step' #-} + + run (Stream step ms0) = + ms0 >>= loop + where + loop s = do + res <- step s + case res of + Stop r -> return r + Emit _ o -> absurd o + Skip s' -> loop s' +{-# INLINE connectStream2 #-} + +{-# RULES "conduit: connectStream2" forall left right. + left $$ unstream right = connectStream2 left right + #-} +-} + +streamConduit :: ConduitM i o m r + -> (Stream m i () -> Stream m o r) + -> ConduitWithStream i o m r +streamConduit = ConduitWithStream +{-# INLINE CONLIKE streamConduit #-} + +streamSource + :: Monad m + => Stream m o () + -> ConduitWithStream i o m () +streamSource str@(Stream step ms0) = + ConduitWithStream con (const str) + where + con = ConduitM $ \rest -> PipeM $ do + s0 <- ms0 + let loop s = do + res <- step s + case res of + Stop () -> return $ rest () + Emit s' o -> return $ HaveOutput (PipeM $ loop s') (return ()) o + Skip s' -> loop s' + loop s0 +{-# INLINE streamSource #-} + +streamSourcePure + :: Monad m + => Stream Identity o () + -> ConduitWithStream i o m () +streamSourcePure (Stream step ms0) = + ConduitWithStream con (const $ Stream (return . runIdentity . step) (return s0)) + where + s0 = runIdentity ms0 + con = ConduitM $ \rest -> + let loop s = + case runIdentity $ step s of + Stop () -> rest () + Emit s' o -> HaveOutput (loop s') (return ()) o + Skip s' -> loop s' + in loop s0 +{-# INLINE streamSourcePure #-} diff --git a/Data/Conduit/Internal/List/Stream.hs b/Data/Conduit/Internal/List/Stream.hs new file mode 100644 index 0000000..4cacfc1 --- /dev/null +++ b/Data/Conduit/Internal/List/Stream.hs @@ -0,0 +1,502 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE Trustworthy #-} +module Data.Conduit.Internal.List.Stream where + +import Control.Monad (liftM) +import Data.Conduit.Internal.Fusion +import qualified Data.Foldable as F + +--FIXME: Should streamSource / streamSourcePure be used for sources? + +unfoldS :: Monad m + => (b -> Maybe (a, b)) + -> b + -> StreamProducer m a +unfoldS f s0 _ = + Stream step (return s0) + where + step s = return $ + case f s of + Nothing -> Stop () + Just (x, s') -> Emit s' x +{-# INLINE unfoldS #-} + +unfoldEitherS :: Monad m + => (b -> Either r (a, b)) + -> b + -> StreamConduitM i a m r +unfoldEitherS f s0 _ = + Stream step (return s0) + where + step s = return $ + case f s of + Left r -> Stop r + Right (x, s') -> Emit s' x +{-# INLINE unfoldEitherS #-} + +unfoldMS :: Monad m + => (b -> m (Maybe (a, b))) + -> b + -> StreamProducer m a +unfoldMS f s0 _ = + Stream step (return s0) + where + step s = do + ms' <- f s + return $ case ms' of + Nothing -> Stop () + Just (x, s') -> Emit s' x +{-# INLINE unfoldMS #-} + +unfoldEitherMS :: Monad m + => (b -> m (Either r (a, b))) + -> b + -> StreamConduitM i a m r +unfoldEitherMS f s0 _ = + Stream step (return s0) + where + step s = do + ms' <- f s + return $ case ms' of + Left r -> Stop r + Right (x, s') -> Emit s' x +{-# INLINE unfoldEitherMS #-} +sourceListS :: Monad m => [a] -> StreamProducer m a +sourceListS xs0 _ = + Stream (return . step) (return xs0) + where + step [] = Stop () + step (x:xs) = Emit xs x +{-# INLINE sourceListS #-} + +enumFromToS :: (Enum a, Prelude.Ord a, Monad m) + => a + -> a + -> StreamProducer m a +enumFromToS x0 y _ = + Stream step (return x0) + where + step x = return $ if x Prelude.> y + then Stop () + else Emit (Prelude.succ x) x +{-# INLINE [0] enumFromToS #-} + +enumFromToS_int :: (Prelude.Integral a, Monad m) + => a + -> a + -> StreamProducer m a +enumFromToS_int x0 y _ = x0 `seq` y `seq` Stream step (return x0) + where + step x | x <= y = return $ Emit (x Prelude.+ 1) x + | otherwise = return $ Stop () +{-# INLINE enumFromToS_int #-} + +{-# RULES "conduit: enumFromTo" forall f t. + enumFromToS f t = enumFromToS_int f t :: Monad m => StreamProducer m Int + #-} + +iterateS :: Monad m => (a -> a) -> a -> StreamProducer m a +iterateS f x0 _ = + Stream (return . step) (return x0) + where + step x = Emit x' x + where + x' = f x +{-# INLINE iterateS #-} + +replicateS :: Monad m => Int -> a -> StreamProducer m a +replicateS cnt0 a _ = + Stream step (return cnt0) + where + step cnt + | cnt <= 0 = return $ Stop () + | otherwise = return $ Emit (cnt - 1) a +{-# INLINE replicateS #-} + +replicateMS :: Monad m => Int -> m a -> StreamProducer m a +replicateMS cnt0 ma _ = + Stream step (return cnt0) + where + step cnt + | cnt <= 0 = return $ Stop () + | otherwise = Emit (cnt - 1) `liftM` ma +{-# INLINE replicateMS #-} + +foldS :: Monad m => (b -> a -> b) -> b -> StreamConsumer a m b +foldS f b0 (Stream step ms0) = + Stream step' (liftM (b0, ) ms0) + where + step' (!b, s) = do + res <- step s + return $ case res of + Stop () -> Stop b + Skip s' -> Skip (b, s') + Emit s' a -> Skip (f b a, s') +{-# INLINE foldS #-} + +foldMS :: Monad m => (b -> a -> m b) -> b -> StreamConsumer a m b +foldMS f b0 (Stream step ms0) = + Stream step' (liftM (b0, ) ms0) + where + step' (!b, s) = do + res <- step s + case res of + Stop () -> return $ Stop b + Skip s' -> return $ Skip (b, s') + Emit s' a -> do + b' <- f b a + return $ Skip (b', s') +{-# INLINE foldMS #-} + +mapM_S :: Monad m + => (a -> m ()) + -> StreamConsumer a m () +mapM_S f (Stream step ms0) = + Stream step' ms0 + where + step' s = do + res <- step s + case res of + Stop () -> return $ Stop () + Skip s' -> return $ Skip s' + Emit s' x -> f x >> return (Skip s') +{-# INLINE [1] mapM_S #-} + +dropS :: Monad m + => Int + -> StreamConsumer a m () +dropS n0 (Stream step ms0) = + Stream step' (liftM (, n0) ms0) + where + step' (_, n) | n <= 0 = return $ Stop () + step' (s, n) = do + res <- step s + return $ case res of + Stop () -> Stop () + Skip s' -> Skip (s', n) + Emit s' _ -> Skip (s', n - 1) +{-# INLINE dropS #-} + +takeS :: Monad m + => Int + -> StreamConsumer a m [a] +takeS n0 (Stream step s0) = + Stream step' (liftM (id, n0,) s0) + where + step' (output, n, _) | n <= 0 = return $ Stop (output []) + step' (output, n, s) = do + res <- step s + return $ case res of + Stop () -> Stop (output []) + Skip s' -> Skip (output, n, s') + Emit s' x -> Skip (output . (x:), n - 1, s') +{-# INLINE takeS #-} + +headS :: Monad m => StreamConsumer a m (Maybe a) +headS (Stream step s0) = + Stream step' s0 + where + step' s = do + res <- step s + return $ case res of + Stop () -> Stop Nothing + Skip s' -> Skip s' + Emit _ x -> Stop (Just x) +{-# INLINE headS #-} + +mapS :: Monad m => (a -> b) -> StreamConduit a m b +mapS f (Stream step ms0) = + Stream step' ms0 + where + step' s = do + res <- step s + return $ case res of + Stop r -> Stop r + Emit s' a -> Emit s' (f a) + Skip s' -> Skip s' +{-# INLINE mapS #-} + +mapMS :: Monad m => (a -> m b) -> StreamConduit a m b +mapMS f (Stream step ms0) = + Stream step' ms0 + where + step' s = do + res <- step s + case res of + Stop r -> return $ Stop r + Emit s' a -> Emit s' `liftM` f a + Skip s' -> return $ Skip s' +{-# INLINE mapMS #-} + +iterMS :: Monad m => (a -> m ()) -> StreamConduit a m a +iterMS f (Stream step ms0) = + Stream step' ms0 + where + step' s = do + res <- step s + case res of + Stop () -> return $ Stop () + Skip s' -> return $ Skip s' + Emit s' x -> f x >> return (Emit s' x) +{-# INLINE iterMS #-} + +mapMaybeS :: Monad m => (a -> Maybe b) -> StreamConduit a m b +mapMaybeS f (Stream step ms0) = + Stream step' ms0 + where + step' s = do + res <- step s + return $ case res of + Stop () -> Stop () + Skip s' -> Skip s' + Emit s' x -> + case f x of + Just y -> Emit s' y + Nothing -> Skip s' +{-# INLINE mapMaybeS #-} + +mapMaybeMS :: Monad m => (a -> m (Maybe b)) -> StreamConduit a m b +mapMaybeMS f (Stream step ms0) = + Stream step' ms0 + where + step' s = do + res <- step s + case res of + Stop () -> return $ Stop () + Skip s' -> return $ Skip s' + Emit s' x -> do + my <- f x + case my of + Just y -> return $ Emit s' y + Nothing -> return $ Skip s' +{-# INLINE mapMaybeMS #-} + +catMaybesS :: Monad m => StreamConduit (Maybe a) m a +catMaybesS (Stream step ms0) = + Stream step' ms0 + where + step' s = do + res <- step s + return $ case res of + Stop () -> Stop () + Skip s' -> Skip s' + Emit s' Nothing -> Skip s' + Emit s' (Just x) -> Emit s' x +{-# INLINE catMaybesS #-} + +concatS :: (Monad m, F.Foldable f) => StreamConduit (f a) m a +concatS (Stream step ms0) = + Stream step' (liftM ([], ) ms0) + where + step' ([], s) = do + res <- step s + return $ case res of + Stop () -> Stop () + Skip s' -> Skip ([], s') + Emit s' x -> Skip (F.toList x, s') + step' ((x:xs), s) = return (Emit (xs, s) x) +{-# INLINE concatS #-} + +concatMapS :: Monad m => (a -> [b]) -> StreamConduit a m b +concatMapS f (Stream step ms0) = + Stream step' (liftM ([], ) ms0) + where + step' ([], s) = do + res <- step s + return $ case res of + Stop () -> Stop () + Skip s' -> Skip ([], s') + Emit s' x -> Skip (f x, s') + step' ((x:xs), s) = return (Emit (xs, s) x) +{-# INLINE concatMapS #-} + +concatMapMS :: Monad m => (a -> m [b]) -> StreamConduit a m b +concatMapMS f (Stream step ms0) = + Stream step' (liftM ([], ) ms0) + where + step' ([], s) = do + res <- step s + case res of + Stop () -> return $ Stop () + Skip s' -> return $ Skip ([], s') + Emit s' x -> do + xs <- f x + return $ Skip (xs, s') + step' ((x:xs), s) = return (Emit (xs, s) x) +{-# INLINE concatMapMS #-} + +concatMapAccumS :: Monad m => (a -> accum -> (accum, [b])) -> accum -> StreamConduit a m b +concatMapAccumS f initial (Stream step ms0) = + Stream step' (liftM (initial, [], ) ms0) + where + step' (accum, [], s) = do + res <- step s + return $ case res of + Stop () -> Stop () + Skip s' -> Skip (accum, [], s') + Emit s' x -> + let (accum', xs) = f x accum + in Skip (accum', xs, s') + step' (accum, (x:xs), s) = return (Emit (accum, xs, s) x) +{-# INLINE concatMapAccumS #-} + +mapAccumS :: Monad m => (a -> s -> (s, b)) -> s -> StreamConduitM a b m s +mapAccumS f initial (Stream step ms0) = + Stream step' (liftM (initial, ) ms0) + where + step' (accum, s) = do + res <- step s + return $ case res of + Stop () -> Stop accum + Skip s' -> Skip (accum, s') + Emit s' x -> + let (accum', r) = f x accum + in Emit (accum', s') r +{-# INLINE mapAccumS #-} + +mapAccumMS :: Monad m => (a -> s -> m (s, b)) -> s -> StreamConduitM a b m s +mapAccumMS f initial (Stream step ms0) = + Stream step' (liftM (initial, ) ms0) + where + step' (accum, s) = do + res <- step s + case res of + Stop () -> return $ Stop accum + Skip s' -> return $ Skip (accum, s') + Emit s' x -> do + (accum', r) <- f x accum + return $ Emit (accum', s') r +{-# INLINE mapAccumMS #-} + +concatMapAccumMS :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> StreamConduit a m b +concatMapAccumMS f initial (Stream step ms0) = + Stream step' (liftM (initial, [], ) ms0) + where + step' (accum, [], s) = do + res <- step s + case res of + Stop () -> return $ Stop () + Skip s' -> return $ Skip (accum, [], s') + Emit s' x -> do + (accum', xs) <- f x accum + return $ Skip (accum', xs, s') + step' (accum, (x:xs), s) = return (Emit (accum, xs, s) x) +{-# INLINE concatMapAccumMS #-} + +mapFoldableS :: (Monad m, F.Foldable f) => (a -> f b) -> StreamConduit a m b +mapFoldableS f (Stream step ms0) = + Stream step' (liftM ([], ) ms0) + where + step' ([], s) = do + res <- step s + return $ case res of + Stop () -> Stop () + Skip s' -> Skip ([], s') + Emit s' x -> Skip (F.toList (f x), s') + step' ((x:xs), s) = return (Emit (xs, s) x) +{-# INLINE mapFoldableS #-} + +mapFoldableMS :: (Monad m, F.Foldable f) => (a -> m (f b)) -> StreamConduit a m b +mapFoldableMS f (Stream step ms0) = + Stream step' (liftM ([], ) ms0) + where + step' ([], s) = do + res <- step s + case res of + Stop () -> return $ Stop () + Skip s' -> return $ Skip ([], s') + Emit s' x -> do + y <- f x + return $ Skip (F.toList y, s') + step' ((x:xs), s) = return (Emit (xs, s) x) +{-# INLINE mapFoldableMS #-} + +consumeS :: Monad m => StreamConsumer a m [a] +consumeS (Stream step ms0) = + Stream step' (liftM (id,) ms0) + where + step' (front, s) = do + res <- step s + return $ case res of + Stop () -> Stop (front []) + Skip s' -> Skip (front, s') + Emit s' a -> Skip (front . (a:), s') +{-# INLINE consumeS #-} + +groupByS :: Monad m => (a -> a -> Bool) -> StreamConduit a m [a] +groupByS f = mapS (Prelude.uncurry (:)) . groupBy1S id f +{-# INLINE groupByS #-} + +groupOn1S :: (Monad m, Eq b) => (a -> b) -> StreamConduit a m (a, [a]) +groupOn1S f = groupBy1S f (==) +{-# INLINE groupOn1S #-} + +data GroupByState a b s + = GBStart s + | GBLoop ([a] -> [a]) a b s + | GBDone + +groupBy1S :: Monad m => (a -> b) -> (b -> b -> Bool) -> StreamConduit a m (a, [a]) +groupBy1S f eq (Stream step ms0) = + Stream step' (liftM GBStart ms0) + where + step' (GBStart s) = do + res <- step s + return $ case res of + Stop () -> Stop () + Skip s' -> Skip (GBStart s') + Emit s' x0 -> Skip (GBLoop id x0 (f x0) s') + step' (GBLoop rest x0 fx0 s) = do + res <- step s + return $ case res of + Stop () -> Emit GBDone (x0, rest []) + Skip s' -> Skip (GBLoop rest x0 fx0 s') + Emit s' x + | fx0 `eq` f x -> Skip (GBLoop (rest . (x:)) x0 fx0 s') + | otherwise -> Emit (GBLoop id x (f x) s') (x0, rest []) + step' GBDone = return $ Stop () +{-# INLINE groupBy1S #-} + +isolateS :: Monad m => Int -> StreamConduit a m a +isolateS count (Stream step ms0) = + Stream step' (liftM (count,) ms0) + where + step' (n, _) | n <= 0 = return $ Stop () + step' (n, s) = do + res <- step s + return $ case res of + Stop () -> Stop () + Skip s' -> Skip (n, s') + Emit s' x -> Emit (n - 1, s') x +{-# INLINE isolateS #-} + +filterS :: Monad m => (a -> Bool) -> StreamConduit a m a +filterS f (Stream step ms0) = + Stream step' ms0 + where + step' s = do + res <- step s + return $ case res of + Stop () -> Stop () + Skip s' -> Skip s' + Emit s' x + | f x -> Emit s' x + | otherwise -> Skip s' + +sinkNullS :: Monad m => StreamConsumer a m () +sinkNullS (Stream step ms0) = + Stream step' ms0 + where + step' s = do + res <- step s + return $ case res of + Stop () -> Stop () + Skip s' -> Skip s' + Emit s' _ -> Skip s' +{-# INLINE sinkNullS #-} + +sourceNullS :: Monad m => StreamProducer m a +sourceNullS _ = Stream (\_ -> return (Stop ())) (return ()) +{-# INLINE sourceNullS #-} diff --git a/Data/Conduit/Internal/Pipe.hs b/Data/Conduit/Internal/Pipe.hs new file mode 100644 index 0000000..a5dd7d1 --- /dev/null +++ b/Data/Conduit/Internal/Pipe.hs @@ -0,0 +1,641 @@ +{-# OPTIONS_HADDOCK not-home #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE UndecidableInstances #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE Trustworthy #-} +{-# LANGUAGE TypeFamilies #-} +module Data.Conduit.Internal.Pipe + ( -- ** Types + Pipe (..) + -- ** Primitives + , await + , awaitE + , awaitForever + , yield + , yieldM + , yieldOr + , leftover + -- ** Finalization + , bracketP + , addCleanup + -- ** Composition + , idP + , pipe + , pipeL + , runPipe + , injectLeftovers + , (>+>) + , (<+<) + -- ** Exceptions + , catchP + , handleP + , tryP + -- ** Utilities + , transPipe + , mapOutput + , mapOutputMaybe + , mapInput + , sourceList + , withUpstream + , Data.Conduit.Internal.Pipe.enumFromTo + , generalizeUpstream + ) where + +import Control.Applicative (Applicative (..)) +import Control.Exception.Lifted as E (Exception, catch) +import Control.Monad ((>=>), liftM, ap) +import Control.Monad.Error.Class(MonadError(..)) +import Control.Monad.Reader.Class(MonadReader(..)) +import Control.Monad.RWS.Class(MonadRWS()) +import Control.Monad.Writer.Class(MonadWriter(..)) +import Control.Monad.State.Class(MonadState(..)) +import Control.Monad.Trans.Class (MonadTrans (lift)) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad.Base (MonadBase (liftBase)) +import Control.Monad.Primitive (PrimMonad, PrimState, primitive) +import Data.Void (Void, absurd) +import Data.Monoid (Monoid (mappend, mempty)) +import Control.Monad.Trans.Resource +import qualified GHC.Exts +import Control.Monad.Morph (MFunctor (..)) +import qualified Control.Monad.Catch as Catch + +-- | The underlying datatype for all the types in this package. In has six +-- type parameters: +-- +-- * /l/ is the type of values that may be left over from this @Pipe@. A @Pipe@ +-- with no leftovers would use @Void@ here, and one with leftovers would use +-- the same type as the /i/ parameter. Leftovers are automatically provided to +-- the next @Pipe@ in the monadic chain. +-- +-- * /i/ is the type of values for this @Pipe@'s input stream. +-- +-- * /o/ is the type of values for this @Pipe@'s output stream. +-- +-- * /u/ is the result type from the upstream @Pipe@. +-- +-- * /m/ is the underlying monad. +-- +-- * /r/ is the result type. +-- +-- A basic intuition is that every @Pipe@ produces a stream of output values +-- (/o/), and eventually indicates that this stream is terminated by sending a +-- result (/r/). On the receiving end of a @Pipe@, these become the /i/ and /u/ +-- parameters. +-- +-- Since 0.5.0 +data Pipe l i o u m r = + -- | Provide new output to be sent downstream. This constructor has three + -- fields: the next @Pipe@ to be used, a finalization function, and the + -- output value. + HaveOutput (Pipe l i o u m r) (m ()) o + -- | Request more input from upstream. The first field takes a new input + -- value and provides a new @Pipe@. The second takes an upstream result + -- value, which indicates that upstream is producing no more results. + | NeedInput (i -> Pipe l i o u m r) (u -> Pipe l i o u m r) + -- | Processing with this @Pipe@ is complete, providing the final result. + | Done r + -- | Require running of a monadic action to get the next @Pipe@. + | PipeM (m (Pipe l i o u m r)) + -- | Return leftover input, which should be provided to future operations. + | Leftover (Pipe l i o u m r) l + +instance Monad m => Functor (Pipe l i o u m) where + fmap = liftM + {-# INLINE fmap #-} + +instance Monad m => Applicative (Pipe l i o u m) where + pure = Done + {-# INLINE pure #-} + (<*>) = ap + {-# INLINE (<*>) #-} + +instance Monad m => Monad (Pipe l i o u m) where + return = pure + {-# INLINE return #-} + + HaveOutput p c o >>= fp = HaveOutput (p >>= fp) c o + NeedInput p c >>= fp = NeedInput (p >=> fp) (c >=> fp) + Done x >>= fp = fp x + PipeM mp >>= fp = PipeM ((>>= fp) `liftM` mp) + Leftover p i >>= fp = Leftover (p >>= fp) i + +instance MonadBase base m => MonadBase base (Pipe l i o u m) where + liftBase = lift . liftBase + {-# INLINE liftBase #-} + +instance MonadTrans (Pipe l i o u) where + lift mr = PipeM (Done `liftM` mr) + {-# INLINE [1] lift #-} + +instance MonadIO m => MonadIO (Pipe l i o u m) where + liftIO = lift . liftIO + {-# INLINE liftIO #-} + +instance MonadThrow m => MonadThrow (Pipe l i o u m) where + throwM = lift . throwM + {-# INLINE throwM #-} + +instance Catch.MonadCatch m => Catch.MonadCatch (Pipe l i o u m) where + catch p0 onErr = + go p0 + where + go (Done r) = Done r + go (PipeM mp) = PipeM $ Catch.catch (liftM go mp) (return . onErr) + go (Leftover p i) = Leftover (go p) i + go (NeedInput x y) = NeedInput (go . x) (go . y) + go (HaveOutput p c o) = HaveOutput (go p) c o + {-# INLINE catch #-} + +instance Monad m => Monoid (Pipe l i o u m ()) where + mempty = return () + {-# INLINE mempty #-} + mappend = (>>) + {-# INLINE mappend #-} + +instance PrimMonad m => PrimMonad (Pipe l i o u m) where + type PrimState (Pipe l i o u m) = PrimState m + primitive = lift . primitive + +instance MonadResource m => MonadResource (Pipe l i o u m) where + liftResourceT = lift . liftResourceT + {-# INLINE liftResourceT #-} + +instance MonadReader r m => MonadReader r (Pipe l i o u m) where + ask = lift ask + {-# INLINE ask #-} + local f (HaveOutput p c o) = HaveOutput (local f p) c o + local f (NeedInput p c) = NeedInput (\i -> local f (p i)) (\u -> local f (c u)) + local _ (Done x) = Done x + local f (PipeM mp) = PipeM (liftM (local f) $ local f mp) + local f (Leftover p i) = Leftover (local f p) i + +-- Provided for doctest +#ifndef MIN_VERSION_mtl +#define MIN_VERSION_mtl(x, y, z) 0 +#endif + +instance MonadWriter w m => MonadWriter w (Pipe l i o u m) where +#if MIN_VERSION_mtl(2, 1, 0) + writer = lift . writer +#endif + + tell = lift . tell + + listen (HaveOutput p c o) = HaveOutput (listen p) c o + listen (NeedInput p c) = NeedInput (\i -> listen (p i)) (\u -> listen (c u)) + listen (Done x) = Done (x,mempty) + listen (PipeM mp) = + PipeM $ + do (p,w) <- listen mp + return $ do (x,w') <- listen p + return (x, w `mappend` w') + listen (Leftover p i) = Leftover (listen p) i + + pass (HaveOutput p c o) = HaveOutput (pass p) c o + pass (NeedInput p c) = NeedInput (\i -> pass (p i)) (\u -> pass (c u)) + pass (PipeM mp) = PipeM $ mp >>= (return . pass) + pass (Done (x,_)) = Done x + pass (Leftover p i) = Leftover (pass p) i + +instance MonadState s m => MonadState s (Pipe l i o u m) where + get = lift get + put = lift . put +#if MIN_VERSION_mtl(2, 1, 0) + state = lift . state +#endif + +instance MonadRWS r w s m => MonadRWS r w s (Pipe l i o u m) + +instance MonadError e m => MonadError e (Pipe l i o u m) where + throwError = lift . throwError + catchError (HaveOutput p c o) f = HaveOutput (catchError p f) c o + catchError (NeedInput p c) f = NeedInput (\i -> catchError (p i) f) (\u -> catchError (c u) f) + catchError (Done x) _ = Done x + catchError (PipeM mp) f = + PipeM $ catchError (liftM (flip catchError f) mp) (\e -> return (f e)) + catchError (Leftover p i) f = Leftover (catchError p f) i + +-- | Wait for a single input value from upstream. +-- +-- Since 0.5.0 +await :: Pipe l i o u m (Maybe i) +await = NeedInput (Done . Just) (\_ -> Done Nothing) +{-# RULES "conduit: CI.await >>= maybe" forall x y. await >>= maybe x y = NeedInput y (const x) #-} +{-# INLINE [1] await #-} + +-- | This is similar to @await@, but will return the upstream result value as +-- @Left@ if available. +-- +-- Since 0.5.0 +awaitE :: Pipe l i o u m (Either u i) +awaitE = NeedInput (Done . Right) (Done . Left) +{-# RULES "conduit: awaitE >>= either" forall x y. awaitE >>= either x y = NeedInput y x #-} +{-# INLINE [1] awaitE #-} + +-- | Wait for input forever, calling the given inner @Pipe@ for each piece of +-- new input. Returns the upstream result type. +-- +-- Since 0.5.0 +awaitForever :: Monad m => (i -> Pipe l i o r m r') -> Pipe l i o r m r +awaitForever inner = + self + where + self = awaitE >>= either return (\i -> inner i >> self) +{-# INLINE [1] awaitForever #-} + +-- | Send a single output value downstream. If the downstream @Pipe@ +-- terminates, this @Pipe@ will terminate as well. +-- +-- Since 0.5.0 +yield :: Monad m + => o -- ^ output value + -> Pipe l i o u m () +yield = HaveOutput (Done ()) (return ()) +{-# INLINE [1] yield #-} + +yieldM :: Monad m => m o -> Pipe l i o u m () +yieldM = PipeM . liftM (HaveOutput (Done ()) (return ())) +{-# INLINE [1] yieldM #-} + +-- | Similar to @yield@, but additionally takes a finalizer to be run if the +-- downstream @Pipe@ terminates. +-- +-- Since 0.5.0 +yieldOr :: Monad m + => o + -> m () -- ^ finalizer + -> Pipe l i o u m () +yieldOr o f = HaveOutput (Done ()) f o +{-# INLINE [1] yieldOr #-} + +{-# RULES + "CI.yield o >> p" forall o (p :: Pipe l i o u m r). yield o >> p = HaveOutput p (return ()) o + ; "CI.yieldOr o c >> p" forall o c (p :: Pipe l i o u m r). yieldOr o c >> p = HaveOutput p c o + ; "lift m >>= CI.yield" forall m. lift m >>= yield = yieldM m + #-} + -- FIXME: Too much inlining on mapM_, can't enforce; "mapM_ CI.yield" mapM_ yield = sourceList + -- Maybe we can get a rewrite rule on foldr instead? Need a benchmark to back this up. + +-- | Provide a single piece of leftover input to be consumed by the next pipe +-- in the current monadic binding. +-- +-- /Note/: it is highly encouraged to only return leftover values from input +-- already consumed from upstream. +-- +-- Since 0.5.0 +leftover :: l -> Pipe l i o u m () +leftover = Leftover (Done ()) +{-# INLINE [1] leftover #-} +{-# RULES "conduit: leftover l >> p" forall l (p :: Pipe l i o u m r). leftover l >> p = Leftover p l #-} + +-- | Bracket a pipe computation between allocation and release of a +-- resource. Two guarantees are given about resource finalization: +-- +-- 1. It will be /prompt/. The finalization will be run as early as possible. +-- +-- 2. It is exception safe. Due to usage of @resourcet@, the finalization will +-- be run in the event of any exceptions. +-- +-- Since 0.5.0 +bracketP :: MonadResource m + => IO a + -- ^ computation to run first (\"acquire resource\") + -> (a -> IO ()) + -- ^ computation to run last (\"release resource\") + -> (a -> Pipe l i o u m r) + -- ^ computation to run in-between + -> Pipe l i o u m r + -- returns the value from the in-between computation +bracketP alloc free inside = + PipeM start + where + start = do + (key, seed) <- allocate alloc free + return $ addCleanup (const $ release key) (inside seed) + +-- | Add some code to be run when the given @Pipe@ cleans up. +-- +-- Since 0.4.1 +addCleanup :: Monad m + => (Bool -> m ()) -- ^ @True@ if @Pipe@ ran to completion, @False@ for early termination. + -> Pipe l i o u m r + -> Pipe l i o u m r +addCleanup cleanup (Done r) = PipeM (cleanup True >> return (Done r)) +addCleanup cleanup (HaveOutput src close x) = HaveOutput + (addCleanup cleanup src) + (cleanup False >> close) + x +addCleanup cleanup (PipeM msrc) = PipeM (liftM (addCleanup cleanup) msrc) +addCleanup cleanup (NeedInput p c) = NeedInput + (addCleanup cleanup . p) + (addCleanup cleanup . c) +addCleanup cleanup (Leftover p i) = Leftover (addCleanup cleanup p) i + +-- | The identity @Pipe@. +-- +-- Since 0.5.0 +idP :: Monad m => Pipe l a a r m r +idP = NeedInput (HaveOutput idP (return ())) Done + +-- | Compose a left and right pipe together into a complete pipe. The left pipe +-- will be automatically closed when the right pipe finishes. +-- +-- Since 0.5.0 +pipe :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2 +pipe = + goRight (return ()) + where + goRight final left right = + case right of + HaveOutput p c o -> HaveOutput (recurse p) (c >> final) o + NeedInput rp rc -> goLeft rp rc final left + Done r2 -> PipeM (final >> return (Done r2)) + PipeM mp -> PipeM (liftM recurse mp) + Leftover _ i -> absurd i + where + recurse = goRight final left + + goLeft rp rc final left = + case left of + HaveOutput left' final' o -> goRight final' left' (rp o) + NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) + Done r1 -> goRight (return ()) (Done r1) (rc r1) + PipeM mp -> PipeM (liftM recurse mp) + Leftover left' i -> Leftover (recurse left') i + where + recurse = goLeft rp rc final + +-- | Same as 'pipe', but automatically applies 'injectLeftovers' to the right @Pipe@. +-- +-- Since 0.5.0 +pipeL :: Monad m => Pipe l a b r0 m r1 -> Pipe b b c r1 m r2 -> Pipe l a c r0 m r2 +-- Note: The following should be equivalent to the simpler: +-- +-- pipeL l r = l `pipe` injectLeftovers r +-- +-- However, this version tested as being significantly more efficient. +pipeL = + goRight (return ()) + where + goRight final left right = + case right of + HaveOutput p c o -> HaveOutput (recurse p) (c >> final) o + NeedInput rp rc -> goLeft rp rc final left + Done r2 -> PipeM (final >> return (Done r2)) + PipeM mp -> PipeM (liftM recurse mp) + Leftover right' i -> goRight final (HaveOutput left final i) right' + where + recurse = goRight final left + + goLeft rp rc final left = + case left of + HaveOutput left' final' o -> goRight final' left' (rp o) + NeedInput left' lc -> NeedInput (recurse . left') (recurse . lc) + Done r1 -> goRight (return ()) (Done r1) (rc r1) + PipeM mp -> PipeM (liftM recurse mp) + Leftover left' i -> Leftover (recurse left') i + where + recurse = goLeft rp rc final + +-- | Run a pipeline until processing completes. +-- +-- Since 0.5.0 +runPipe :: Monad m => Pipe Void () Void () m r -> m r +runPipe (HaveOutput _ _ o) = absurd o +runPipe (NeedInput _ c) = runPipe (c ()) +runPipe (Done r) = return r +runPipe (PipeM mp) = mp >>= runPipe +runPipe (Leftover _ i) = absurd i + +-- | Transforms a @Pipe@ that provides leftovers to one which does not, +-- allowing it to be composed. +-- +-- This function will provide any leftover values within this @Pipe@ to any +-- calls to @await@. If there are more leftover values than are demanded, the +-- remainder are discarded. +-- +-- Since 0.5.0 +injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r +injectLeftovers = + go [] + where + go ls (HaveOutput p c o) = HaveOutput (go ls p) c o + go (l:ls) (NeedInput p _) = go ls $ p l + go [] (NeedInput p c) = NeedInput (go [] . p) (go [] . c) + go _ (Done r) = Done r + go ls (PipeM mp) = PipeM (liftM (go ls) mp) + go ls (Leftover p l) = go (l:ls) p + +-- | Transform the monad that a @Pipe@ lives in. +-- +-- Note that the monad transforming function will be run multiple times, +-- resulting in unintuitive behavior in some cases. For a fuller treatment, +-- please see: +-- +-- +-- +-- This function is just a synonym for 'hoist'. +-- +-- Since 0.4.0 +transPipe :: Monad m => (forall a. m a -> n a) -> Pipe l i o u m r -> Pipe l i o u n r +transPipe f (HaveOutput p c o) = HaveOutput (transPipe f p) (f c) o +transPipe f (NeedInput p c) = NeedInput (transPipe f . p) (transPipe f . c) +transPipe _ (Done r) = Done r +transPipe f (PipeM mp) = + PipeM (f $ liftM (transPipe f) $ collapse mp) + where + -- Combine a series of monadic actions into a single action. Since we + -- throw away side effects between different actions, an arbitrary break + -- between actions will lead to a violation of the monad transformer laws. + -- Example available at: + -- + -- http://hpaste.org/75520 + collapse mpipe = do + pipe' <- mpipe + case pipe' of + PipeM mpipe' -> collapse mpipe' + _ -> return pipe' +transPipe f (Leftover p i) = Leftover (transPipe f p) i + +-- | Apply a function to all the output values of a @Pipe@. +-- +-- This mimics the behavior of `fmap` for a `Source` and `Conduit` in pre-0.4 +-- days. +-- +-- Since 0.4.1 +mapOutput :: Monad m => (o1 -> o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r +mapOutput f = + go + where + go (HaveOutput p c o) = HaveOutput (go p) c (f o) + go (NeedInput p c) = NeedInput (go . p) (go . c) + go (Done r) = Done r + go (PipeM mp) = PipeM (liftM (go) mp) + go (Leftover p i) = Leftover (go p) i +{-# INLINE mapOutput #-} + +-- | Same as 'mapOutput', but use a function that returns @Maybe@ values. +-- +-- Since 0.5.0 +mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r +mapOutputMaybe f = + go + where + go (HaveOutput p c o) = maybe id (\o' p' -> HaveOutput p' c o') (f o) (go p) + go (NeedInput p c) = NeedInput (go . p) (go . c) + go (Done r) = Done r + go (PipeM mp) = PipeM (liftM (go) mp) + go (Leftover p i) = Leftover (go p) i +{-# INLINE mapOutputMaybe #-} + +-- | Apply a function to all the input values of a @Pipe@. +-- +-- Since 0.5.0 +mapInput :: Monad m + => (i1 -> i2) -- ^ map initial input to new input + -> (l2 -> Maybe l1) -- ^ map new leftovers to initial leftovers + -> Pipe l2 i2 o u m r + -> Pipe l1 i1 o u m r +mapInput f f' (HaveOutput p c o) = HaveOutput (mapInput f f' p) c o +mapInput f f' (NeedInput p c) = NeedInput (mapInput f f' . p . f) (mapInput f f' . c) +mapInput _ _ (Done r) = Done r +mapInput f f' (PipeM mp) = PipeM (liftM (mapInput f f') mp) +mapInput f f' (Leftover p i) = maybe id (flip Leftover) (f' i) $ mapInput f f' p + +enumFromTo :: (Enum o, Eq o, Monad m) + => o + -> o + -> Pipe l i o u m () +enumFromTo start stop = + loop start + where + loop i + | i == stop = HaveOutput (Done ()) (return ()) i + | otherwise = HaveOutput (loop (succ i)) (return ()) i +{-# INLINE enumFromTo #-} + +-- | Convert a list into a source. +-- +-- Since 0.3.0 +sourceList :: Monad m => [a] -> Pipe l i a u m () +sourceList = + go + where + go [] = Done () + go (o:os) = HaveOutput (go os) (return ()) o +{-# INLINE [1] sourceList #-} + +-- | The equivalent of @GHC.Exts.build@ for @Pipe@. +-- +-- Since 0.4.2 +build :: Monad m => (forall b. (o -> b -> b) -> b -> b) -> Pipe l i o u m () +build g = g (\o p -> HaveOutput p (return ()) o) (return ()) + +{-# RULES + "sourceList/build" forall (f :: (forall b. (a -> b -> b) -> b -> b)). sourceList (GHC.Exts.build f) = build f #-} + +-- | Returns a tuple of the upstream and downstream results. Note that this +-- will force consumption of the entire input stream. +-- +-- Since 0.5.0 +withUpstream :: Monad m + => Pipe l i o u m r + -> Pipe l i o u m (u, r) +withUpstream down = + down >>= go + where + go r = + loop + where + loop = awaitE >>= either (\u -> return (u, r)) (\_ -> loop) + +infixr 9 <+< +infixl 9 >+> + +-- | Fuse together two @Pipe@s, connecting the output from the left to the +-- input of the right. +-- +-- Notice that the /leftover/ parameter for the @Pipe@s must be @Void@. This +-- ensures that there is no accidental data loss of leftovers during fusion. If +-- you have a @Pipe@ with leftovers, you must first call 'injectLeftovers'. +-- +-- Since 0.5.0 +(>+>) :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2 +(>+>) = pipe +{-# INLINE (>+>) #-} + +-- | Same as '>+>', but reverse the order of the arguments. +-- +-- Since 0.5.0 +(<+<) :: Monad m => Pipe Void b c r1 m r2 -> Pipe l a b r0 m r1 -> Pipe l a c r0 m r2 +(<+<) = flip pipe +{-# INLINE (<+<) #-} + +-- | Since 1.0.4 +instance MFunctor (Pipe l i o u) where + hoist = transPipe + +-- | See 'catchC' for more details. +-- +-- Since 1.0.11 +catchP :: (MonadBaseControl IO m, Exception e) + => Pipe l i o u m r + -> (e -> Pipe l i o u m r) + -> Pipe l i o u m r +catchP p0 onErr = + go p0 + where + go (Done r) = Done r + go (PipeM mp) = PipeM $ E.catch (liftM go mp) (return . onErr) + go (Leftover p i) = Leftover (go p) i + go (NeedInput x y) = NeedInput (go . x) (go . y) + go (HaveOutput p c o) = HaveOutput (go p) c o +{-# INLINABLE catchP #-} + +-- | The same as @flip catchP@. +-- +-- Since 1.0.11 +handleP :: (MonadBaseControl IO m, Exception e) + => (e -> Pipe l i o u m r) + -> Pipe l i o u m r + -> Pipe l i o u m r +handleP = flip catchP +{-# INLINE handleP #-} + +-- | See 'tryC' for more details. +-- +-- Since 1.0.11 +tryP :: (MonadBaseControl IO m, Exception e) + => Pipe l i o u m r + -> Pipe l i o u m (Either e r) +tryP = + go + where + go (Done r) = Done (Right r) + go (PipeM mp) = PipeM $ E.catch (liftM go mp) (return . Done . Left) + go (Leftover p i) = Leftover (go p) i + go (NeedInput x y) = NeedInput (go . x) (go . y) + go (HaveOutput p c o) = HaveOutput (go p) c o +{-# INLINABLE tryP #-} + +-- | Generalize the upstream return value for a @Pipe@ from unit to any type. +-- +-- Since 1.1.5 +generalizeUpstream :: Monad m => Pipe l i o () m r -> Pipe l i o u m r +generalizeUpstream = + go + where + go (HaveOutput p f o) = HaveOutput (go p) f o + go (NeedInput x y) = NeedInput (go . x) (\_ -> go (y ())) + go (Done r) = Done r + go (PipeM mp) = PipeM (liftM go mp) + go (Leftover p l) = Leftover (go p) l +{-# INLINE generalizeUpstream #-} + +{-# RULES "conduit: Pipe: lift x >>= f" forall m f. lift m >>= f = PipeM (liftM f m) #-} +{-# RULES "conduit: Pipe: lift x >> f" forall m f. lift m >> f = PipeM (liftM (\_ -> f) m) #-} diff --git a/Data/Conduit/Lift.hs b/Data/Conduit/Lift.hs new file mode 100644 index 0000000..9733b5e --- /dev/null +++ b/Data/Conduit/Lift.hs @@ -0,0 +1,631 @@ +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE Safe #-} +-- | Allow monad transformers to be run\/eval\/exec in a section of conduit +-- rather then needing to run across the whole conduit. The circumvents many +-- of the problems with breaking the monad transformer laws. For more +-- information, see the announcement blog post: +-- +-- +-- This module was added in conduit 1.0.11. +module Data.Conduit.Lift ( + -- * ExceptT + exceptC, + runExceptC, + catchExceptC, + + -- * ErrorT + errorC, + runErrorC, + catchErrorC, +-- liftCatchError, + + -- * CatchT + runCatchC, + catchCatchC, + + -- * MaybeT + maybeC, + runMaybeC, + + -- * ReaderT + readerC, + runReaderC, + + -- * StateT, lazy + stateLC, + runStateLC, + evalStateLC, + execStateLC, + + -- ** Strict + stateC, + runStateC, + evalStateC, + execStateC, + + -- * WriterT, lazy + writerLC, + runWriterLC, + execWriterLC, + + -- ** Strict + writerC, + runWriterC, + execWriterC, + + -- * RWST, lazy + rwsLC, + runRWSLC, + evalRWSLC, + execRWSLC, + + -- ** Strict + rwsC, + runRWSC, + evalRWSC, + execRWSC, + + -- * Utilities + + distribute + ) where + +import Data.Conduit +import Data.Conduit.Internal (ConduitM (..), Pipe (..)) + +import Control.Monad.Morph (hoist, lift, MFunctor(..), ) +import Control.Monad.Trans.Class (MonadTrans(..)) +import Control.Exception (SomeException) + +import Data.Monoid (Monoid(..)) + + +import qualified Control.Monad.Trans.Except as Ex +import qualified Control.Monad.Trans.Error as E +import qualified Control.Monad.Trans.Maybe as M +import qualified Control.Monad.Trans.Reader as R + +import qualified Control.Monad.Trans.State.Strict as SS +import qualified Control.Monad.Trans.Writer.Strict as WS +import qualified Control.Monad.Trans.RWS.Strict as RWSS + +import qualified Control.Monad.Trans.State.Lazy as SL +import qualified Control.Monad.Trans.Writer.Lazy as WL +import qualified Control.Monad.Trans.RWS.Lazy as RWSL +import Control.Monad.Catch.Pure (CatchT (runCatchT)) + + +catAwaitLifted + :: (Monad (t (ConduitM o1 o m)), Monad m, MonadTrans t) => + ConduitM i o1 (t (ConduitM o1 o m)) () +catAwaitLifted = go + where + go = do + x <- lift . lift $ await + case x of + Nothing -> return () + Just x2 -> do + yield x2 + go + +catYieldLifted + :: (Monad (t (ConduitM i o1 m)), Monad m, MonadTrans t) => + ConduitM o1 o (t (ConduitM i o1 m)) () +catYieldLifted = go + where + go = do + x <- await + case x of + Nothing -> return () + Just x2 -> do + lift . lift $ yield x2 + go + + +distribute + :: (Monad (t (ConduitM b o m)), Monad m, Monad (t m), MonadTrans t, + MFunctor t) => + ConduitM b o (t m) () -> t (ConduitM b o m) () +distribute p = catAwaitLifted =$= hoist (hoist lift) p $$ catYieldLifted + +-- | Wrap the base monad in 'Ex.ExceptT' +-- +-- Since 1.2.12 +exceptC + :: (Monad m, Monad (t (Ex.ExceptT e m)), MonadTrans t, MFunctor t) => + t m (Either e b) -> t (Ex.ExceptT e m) b +exceptC p = do + x <- hoist lift p + lift $ Ex.ExceptT (return x) + +-- | Run 'Ex.ExceptT' in the base monad +-- +-- Since 1.2.12 +runExceptC + :: Monad m => + ConduitM i o (Ex.ExceptT e m) r -> ConduitM i o m (Either e r) +runExceptC (ConduitM c0) = + ConduitM $ \rest -> + let go (Done r) = rest (Right r) + go (PipeM mp) = PipeM $ do + eres <- Ex.runExceptT mp + return $ case eres of + Left e -> rest $ Left e + Right p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p f o) = HaveOutput (go p) (Ex.runExceptT f >> return ()) o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go (c0 Done) +{-# INLINABLE runExceptC #-} + +-- | Catch an error in the base monad +-- +-- Since 1.2.12 +catchExceptC + :: Monad m => + ConduitM i o (Ex.ExceptT e m) r + -> (e -> ConduitM i o (Ex.ExceptT e m) r) + -> ConduitM i o (Ex.ExceptT e m) r +catchExceptC c0 h = + ConduitM $ \rest -> + let go (Done r) = rest r + go (PipeM mp) = PipeM $ do + eres <- lift $ Ex.runExceptT mp + return $ case eres of + Left e -> unConduitM (h e) rest + Right p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p f o) = HaveOutput (go p) f o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go $ unConduitM c0 Done + where +{-# INLINABLE catchExceptC #-} + +-- | Wrap the base monad in 'E.ErrorT' +-- +-- Since 1.0.11 +errorC + :: (Monad m, Monad (t (E.ErrorT e m)), MonadTrans t, E.Error e, + MFunctor t) => + t m (Either e b) -> t (E.ErrorT e m) b +errorC p = do + x <- hoist lift p + lift $ E.ErrorT (return x) + +-- | Run 'E.ErrorT' in the base monad +-- +-- Since 1.0.11 +runErrorC + :: (Monad m, E.Error e) => + ConduitM i o (E.ErrorT e m) r -> ConduitM i o m (Either e r) +runErrorC (ConduitM c0) = + ConduitM $ \rest -> + let go (Done r) = rest (Right r) + go (PipeM mp) = PipeM $ do + eres <- E.runErrorT mp + return $ case eres of + Left e -> rest $ Left e + Right p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p f o) = HaveOutput (go p) (E.runErrorT f >> return ()) o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go (c0 Done) +{-# INLINABLE runErrorC #-} + +-- | Catch an error in the base monad +-- +-- Since 1.0.11 +catchErrorC + :: (Monad m, E.Error e) => + ConduitM i o (E.ErrorT e m) r + -> (e -> ConduitM i o (E.ErrorT e m) r) + -> ConduitM i o (E.ErrorT e m) r +catchErrorC c0 h = + ConduitM $ \rest -> + let go (Done r) = rest r + go (PipeM mp) = PipeM $ do + eres <- lift $ E.runErrorT mp + return $ case eres of + Left e -> unConduitM (h e) rest + Right p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p f o) = HaveOutput (go p) f o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go $ unConduitM c0 Done + where +{-# INLINABLE catchErrorC #-} + +-- | Run 'CatchT' in the base monad +-- +-- Since 1.1.0 +runCatchC + :: Monad m => + ConduitM i o (CatchT m) r -> ConduitM i o m (Either SomeException r) +runCatchC c0 = + ConduitM $ \rest -> + let go (Done r) = rest (Right r) + go (PipeM mp) = PipeM $ do + eres <- runCatchT mp + return $ case eres of + Left e -> rest $ Left e + Right p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p f o) = HaveOutput (go p) (runCatchT f >> return ()) o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go $ unConduitM c0 Done +{-# INLINABLE runCatchC #-} + +-- | Catch an exception in the base monad +-- +-- Since 1.1.0 +catchCatchC + :: Monad m => + ConduitM i o (CatchT m) r + -> (SomeException -> ConduitM i o (CatchT m) r) + -> ConduitM i o (CatchT m) r +catchCatchC (ConduitM c0) h = + ConduitM $ \rest -> + let go (Done r) = rest r + go (PipeM mp) = PipeM $ do + eres <- lift $ runCatchT mp + return $ case eres of + Left e -> unConduitM (h e) rest + Right p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p f o) = HaveOutput (go p) f o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go (c0 Done) +{-# INLINABLE catchCatchC #-} + +-- | Wrap the base monad in 'M.MaybeT' +-- +-- Since 1.0.11 +maybeC + :: (Monad m, Monad (t (M.MaybeT m)), + MonadTrans t, + MFunctor t) => + t m (Maybe b) -> t (M.MaybeT m) b +maybeC p = do + x <- hoist lift p + lift $ M.MaybeT (return x) +{-# INLINABLE maybeC #-} + +-- | Run 'M.MaybeT' in the base monad +-- +-- Since 1.0.11 +runMaybeC + :: Monad m => + ConduitM i o (M.MaybeT m) r -> ConduitM i o m (Maybe r) +runMaybeC (ConduitM c0) = + ConduitM $ \rest -> + let go (Done r) = rest (Just r) + go (PipeM mp) = PipeM $ do + mres <- M.runMaybeT mp + return $ case mres of + Nothing -> rest Nothing + Just p -> go p + go (Leftover p i) = Leftover (go p) i + go (HaveOutput p c o) = HaveOutput (go p) (M.runMaybeT c >> return ()) o + go (NeedInput x y) = NeedInput (go . x) (go . y) + in go (c0 Done) +{-# INLINABLE runMaybeC #-} + +-- | Wrap the base monad in 'R.ReaderT' +-- +-- Since 1.0.11 +readerC + :: (Monad m, Monad (t1 (R.ReaderT t m)), + MonadTrans t1, + MFunctor t1) => + (t -> t1 m b) -> t1 (R.ReaderT t m) b +readerC k = do + i <- lift R.ask + hoist lift (k i) +{-# INLINABLE readerC #-} + +-- | Run 'R.ReaderT' in the base monad +-- +-- Since 1.0.11 +runReaderC + :: Monad m => + r -> ConduitM i o (R.ReaderT r m) res -> ConduitM i o m res +runReaderC r = hoist (`R.runReaderT` r) +{-# INLINABLE runReaderC #-} + + +-- | Wrap the base monad in 'SL.StateT' +-- +-- Since 1.0.11 +stateLC + :: (Monad m, Monad (t1 (SL.StateT t m)), + MonadTrans t1, + MFunctor t1) => + (t -> t1 m (b, t)) -> t1 (SL.StateT t m) b +stateLC k = do + s <- lift SL.get + (r, s') <- hoist lift (k s) + lift (SL.put s') + return r +{-# INLINABLE stateLC #-} + +thread :: Monad m + => (r -> s -> res) + -> (forall a. t m a -> s -> m (a, s)) + -> s + -> ConduitM i o (t m) r + -> ConduitM i o m res +thread toRes runM s0 (ConduitM c0) = + ConduitM $ \rest -> + let go s (Done r) = rest (toRes r s) + go s (PipeM mp) = PipeM $ do + (p, s') <- runM mp s + return $ go s' p + go s (Leftover p i) = Leftover (go s p) i + go s (NeedInput x y) = NeedInput (go s . x) (go s . y) + go s (HaveOutput p f o) = HaveOutput (go s p) (runM f s >> return ()) o + in go s0 (c0 Done) +{-# INLINABLE thread #-} + +-- | Run 'SL.StateT' in the base monad +-- +-- Since 1.0.11 +runStateLC + :: Monad m => + s -> ConduitM i o (SL.StateT s m) r -> ConduitM i o m (r, s) +runStateLC = thread (,) SL.runStateT +{-# INLINABLE runStateLC #-} + +-- | Evaluate 'SL.StateT' in the base monad +-- +-- Since 1.0.11 +evalStateLC + :: Monad m => + s -> ConduitM i o (SL.StateT s m) r -> ConduitM i o m r +evalStateLC s p = fmap fst $ runStateLC s p +{-# INLINABLE evalStateLC #-} + +-- | Execute 'SL.StateT' in the base monad +-- +-- Since 1.0.11 +execStateLC + :: Monad m => + s -> ConduitM i o (SL.StateT s m) r -> ConduitM i o m s +execStateLC s p = fmap snd $ runStateLC s p +{-# INLINABLE execStateLC #-} + + +-- | Wrap the base monad in 'SS.StateT' +-- +-- Since 1.0.11 +stateC + :: (Monad m, Monad (t1 (SS.StateT t m)), + MonadTrans t1, + MFunctor t1) => + (t -> t1 m (b, t)) -> t1 (SS.StateT t m) b +stateC k = do + s <- lift SS.get + (r, s') <- hoist lift (k s) + lift (SS.put s') + return r +{-# INLINABLE stateC #-} + +-- | Run 'SS.StateT' in the base monad +-- +-- Since 1.0.11 +runStateC + :: Monad m => + s -> ConduitM i o (SS.StateT s m) r -> ConduitM i o m (r, s) +runStateC = thread (,) SS.runStateT +{-# INLINABLE runStateC #-} + +-- | Evaluate 'SS.StateT' in the base monad +-- +-- Since 1.0.11 +evalStateC + :: Monad m => + s -> ConduitM i o (SS.StateT s m) r -> ConduitM i o m r +evalStateC s p = fmap fst $ runStateC s p +{-# INLINABLE evalStateC #-} + +-- | Execute 'SS.StateT' in the base monad +-- +-- Since 1.0.11 +execStateC + :: Monad m => + s -> ConduitM i o (SS.StateT s m) r -> ConduitM i o m s +execStateC s p = fmap snd $ runStateC s p +{-# INLINABLE execStateC #-} + + +-- | Wrap the base monad in 'WL.WriterT' +-- +-- Since 1.0.11 +writerLC + :: (Monad m, Monad (t (WL.WriterT w m)), MonadTrans t, Monoid w, + MFunctor t) => + t m (b, w) -> t (WL.WriterT w m) b +writerLC p = do + (r, w) <- hoist lift p + lift $ WL.tell w + return r +{-# INLINABLE writerLC #-} + +-- | Run 'WL.WriterT' in the base monad +-- +-- Since 1.0.11 +runWriterLC + :: (Monad m, Monoid w) => + ConduitM i o (WL.WriterT w m) r -> ConduitM i o m (r, w) +runWriterLC = thread (,) run mempty + where + run m w = do + (a, w') <- WL.runWriterT m + return (a, w `mappend` w') +{-# INLINABLE runWriterLC #-} + +-- | Execute 'WL.WriterT' in the base monad +-- +-- Since 1.0.11 +execWriterLC + :: (Monad m, Monoid w) => + ConduitM i o (WL.WriterT w m) r -> ConduitM i o m w +execWriterLC p = fmap snd $ runWriterLC p +{-# INLINABLE execWriterLC #-} + + +-- | Wrap the base monad in 'WS.WriterT' +-- +-- Since 1.0.11 +writerC + :: (Monad m, Monad (t (WS.WriterT w m)), MonadTrans t, Monoid w, + MFunctor t) => + t m (b, w) -> t (WS.WriterT w m) b +writerC p = do + (r, w) <- hoist lift p + lift $ WS.tell w + return r +{-# INLINABLE writerC #-} + +-- | Run 'WS.WriterT' in the base monad +-- +-- Since 1.0.11 +runWriterC + :: (Monad m, Monoid w) => + ConduitM i o (WS.WriterT w m) r -> ConduitM i o m (r, w) +runWriterC = thread (,) run mempty + where + run m w = do + (a, w') <- WS.runWriterT m + return (a, w `mappend` w') +{-# INLINABLE runWriterC #-} + +-- | Execute 'WS.WriterT' in the base monad +-- +-- Since 1.0.11 +execWriterC + :: (Monad m, Monoid w) => + ConduitM i o (WS.WriterT w m) r -> ConduitM i o m w +execWriterC p = fmap snd $ runWriterC p +{-# INLINABLE execWriterC #-} + + +-- | Wrap the base monad in 'RWSL.RWST' +-- +-- Since 1.0.11 +rwsLC + :: (Monad m, Monad (t1 (RWSL.RWST t w t2 m)), MonadTrans t1, + Monoid w, MFunctor t1) => + (t -> t2 -> t1 m (b, t2, w)) -> t1 (RWSL.RWST t w t2 m) b +rwsLC k = do + i <- lift RWSL.ask + s <- lift RWSL.get + (r, s', w) <- hoist lift (k i s) + lift $ do + RWSL.put s' + RWSL.tell w + return r +{-# INLINABLE rwsLC #-} + +-- | Run 'RWSL.RWST' in the base monad +-- +-- Since 1.0.11 +runRWSLC + :: (Monad m, Monoid w) => + r + -> s + -> ConduitM i o (RWSL.RWST r w s m) res + -> ConduitM i o m (res, s, w) +runRWSLC r s0 = thread toRes run (s0, mempty) + where + toRes a (s, w) = (a, s, w) + run m (s, w) = do + (res, s', w') <- RWSL.runRWST m r s + return (res, (s', w `mappend` w')) +{-# INLINABLE runRWSLC #-} + +-- | Evaluate 'RWSL.RWST' in the base monad +-- +-- Since 1.0.11 +evalRWSLC + :: (Monad m, Monoid w) => + r + -> s + -> ConduitM i o (RWSL.RWST r w s m) res + -> ConduitM i o m (res, w) +evalRWSLC i s p = fmap f $ runRWSLC i s p + where f x = let (r, _, w) = x in (r, w) +{-# INLINABLE evalRWSLC #-} + +-- | Execute 'RWSL.RWST' in the base monad +-- +-- Since 1.0.11 +execRWSLC + :: (Monad m, Monoid w) => + r + -> s + -> ConduitM i o (RWSL.RWST r w s m) res + -> ConduitM i o m (s, w) +execRWSLC i s p = fmap f $ runRWSLC i s p + where f x = let (_, s2, w2) = x in (s2, w2) +{-# INLINABLE execRWSLC #-} + + +-- | Wrap the base monad in 'RWSS.RWST' +-- +-- Since 1.0.11 +rwsC + :: (Monad m, Monad (t1 (RWSS.RWST t w t2 m)), MonadTrans t1, + Monoid w, MFunctor t1) => + (t -> t2 -> t1 m (b, t2, w)) -> t1 (RWSS.RWST t w t2 m) b +rwsC k = do + i <- lift RWSS.ask + s <- lift RWSS.get + (r, s', w) <- hoist lift (k i s) + lift $ do + RWSS.put s' + RWSS.tell w + return r +{-# INLINABLE rwsC #-} + +-- | Run 'RWSS.RWST' in the base monad +-- +-- Since 1.0.11 +runRWSC + :: (Monad m, Monoid w) => + r + -> s + -> ConduitM i o (RWSS.RWST r w s m) res + -> ConduitM i o m (res, s, w) +runRWSC r s0 = thread toRes run (s0, mempty) + where + toRes a (s, w) = (a, s, w) + run m (s, w) = do + (res, s', w') <- RWSS.runRWST m r s + return (res, (s', w `mappend` w')) +{-# INLINABLE runRWSC #-} + +-- | Evaluate 'RWSS.RWST' in the base monad +-- +-- Since 1.0.11 +evalRWSC + :: (Monad m, Monoid w) => + r + -> s + -> ConduitM i o (RWSS.RWST r w s m) res + -> ConduitM i o m (res, w) +evalRWSC i s p = fmap f $ runRWSC i s p + where f x = let (r, _, w) = x in (r, w) +{-# INLINABLE evalRWSC #-} + +-- | Execute 'RWSS.RWST' in the base monad +-- +-- Since 1.0.11 +execRWSC + :: (Monad m, Monoid w) => + r + -> s + -> ConduitM i o (RWSS.RWST r w s m) res + -> ConduitM i o m (s, w) +execRWSC i s p = fmap f $ runRWSC i s p + where f x = let (_, s2, w2) = x in (s2, w2) +{-# INLINABLE execRWSC #-} diff --git a/Data/Conduit/List.hs b/Data/Conduit/List.hs new file mode 100644 index 0000000..cbcb0bb --- /dev/null +++ b/Data/Conduit/List.hs @@ -0,0 +1,837 @@ +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE Trustworthy #-} +-- | Higher-level functions to interact with the elements of a stream. Most of +-- these are based on list functions. +-- +-- For many purposes, it's recommended to use the conduit-combinators library, +-- which provides a more complete set of functions. +-- +-- Note that these functions all deal with individual elements of a stream as a +-- sort of \"black box\", where there is no introspection of the contained +-- elements. Values such as @ByteString@ and @Text@ will likely need to be +-- treated specially to deal with their contents properly (@Word8@ and @Char@, +-- respectively). See the "Data.Conduit.Binary" and "Data.Conduit.Text" +-- modules. +module Data.Conduit.List + ( -- * Sources + sourceList + , sourceNull + , unfold + , unfoldEither + , unfoldM + , unfoldEitherM + , enumFromTo + , iterate + , replicate + , replicateM + -- * Sinks + -- ** Pure + , fold + , foldMap + , take + , drop + , head + , peek + , consume + , sinkNull + -- ** Monadic + , foldMapM + , foldM + , mapM_ + -- * Conduits + -- ** Pure + , map + , mapMaybe + , mapFoldable + , catMaybes + , concat + , concatMap + , concatMapAccum + , scanl + , scan + , mapAccum + , chunksOf + , groupBy + , groupOn1 + , isolate + , filter + -- ** Monadic + , mapM + , iterM + , scanlM + , scanM + , mapAccumM + , mapMaybeM + , mapFoldableM + , concatMapM + , concatMapAccumM + -- * Misc + , sequence + ) where + +import qualified Prelude +import Prelude + ( ($), return, (==), (-), Int + , (.), id, Maybe (..), Monad + , Either (..) + , Bool (..) + , (>>) + , (>>=) + , seq + , otherwise + , Enum, Eq + , maybe + , (<=) + , (>) + ) +import Data.Monoid (Monoid, mempty, mappend) +import qualified Data.Foldable as F +import Data.Conduit +import Data.Conduit.Internal.Fusion +import Data.Conduit.Internal.List.Stream +import qualified Data.Conduit.Internal as CI +import Control.Monad (when, (<=<), liftM, void) +import Control.Monad.Trans.Class (lift) + +-- Defines INLINE_RULE0, INLINE_RULE, STREAMING0, and STREAMING. +#include "fusion-macros.h" + +-- | Generate a source from a seed value. +-- +-- Subject to fusion +-- +-- Since 0.4.2 +unfold, unfoldC :: Monad m + => (b -> Maybe (a, b)) + -> b + -> Producer m a +unfoldC f = + go + where + go seed = + case f seed of + Just (a, seed') -> yield a >> go seed' + Nothing -> return () +{-# INLINE unfoldC #-} +STREAMING(unfold, unfoldC, unfoldS, f x) + +-- | Generate a source from a seed value with a return value. +-- +-- Subject to fusion +-- +-- @since 1.2.11 +unfoldEither, unfoldEitherC :: Monad m + => (b -> Either r (a, b)) + -> b + -> ConduitM i a m r +unfoldEitherC f = + go + where + go seed = + case f seed of + Right (a, seed') -> yield a >> go seed' + Left r -> return r +{-# INLINE unfoldEitherC #-} +STREAMING(unfoldEither, unfoldEitherC, unfoldEitherS, f x) + +-- | A monadic unfold. +-- +-- Subject to fusion +-- +-- Since 1.1.2 +unfoldM, unfoldMC :: Monad m + => (b -> m (Maybe (a, b))) + -> b + -> Producer m a +unfoldMC f = + go + where + go seed = do + mres <- lift $ f seed + case mres of + Just (a, seed') -> yield a >> go seed' + Nothing -> return () +STREAMING(unfoldM, unfoldMC, unfoldMS, f seed) + +-- | A monadic unfoldEither. +-- +-- Subject to fusion +-- +-- @since 1.2.11 +unfoldEitherM, unfoldEitherMC :: Monad m + => (b -> m (Either r (a, b))) + -> b + -> ConduitM i a m r +unfoldEitherMC f = + go + where + go seed = do + mres <- lift $ f seed + case mres of + Right (a, seed') -> yield a >> go seed' + Left r -> return r +STREAMING(unfoldEitherM, unfoldEitherMC, unfoldEitherMS, f seed) + +-- | Yield the values from the list. +-- +-- Subject to fusion +sourceList, sourceListC :: Monad m => [a] -> Producer m a +sourceListC = Prelude.mapM_ yield +{-# INLINE sourceListC #-} +STREAMING(sourceList, sourceListC, sourceListS, xs) + +-- | Enumerate from a value to a final value, inclusive, via 'succ'. +-- +-- This is generally more efficient than using @Prelude@\'s @enumFromTo@ and +-- combining with @sourceList@ since this avoids any intermediate data +-- structures. +-- +-- Subject to fusion +-- +-- Since 0.4.2 +enumFromTo, enumFromToC :: (Enum a, Prelude.Ord a, Monad m) + => a + -> a + -> Producer m a +enumFromToC x0 y = + loop x0 + where + loop x + | x Prelude.> y = return () + | otherwise = yield x >> loop (Prelude.succ x) +{-# INLINE enumFromToC #-} +STREAMING(enumFromTo, enumFromToC, enumFromToS, x0 y) + +-- | Produces an infinite stream of repeated applications of f to x. +-- +-- Subject to fusion +-- +iterate, iterateC :: Monad m => (a -> a) -> a -> Producer m a +iterateC f = + go + where + go a = yield a >> go (f a) +{-# INLINE iterateC #-} +STREAMING(iterate, iterateC, iterateS, f a) + +-- | Replicate a single value the given number of times. +-- +-- Subject to fusion +-- +-- Since 1.2.0 +replicate, replicateC :: Monad m => Int -> a -> Producer m a +replicateC cnt0 a = + loop cnt0 + where + loop i + | i <= 0 = return () + | otherwise = yield a >> loop (i - 1) +{-# INLINE replicateC #-} +STREAMING(replicate, replicateC, replicateS, cnt0 a) + +-- | Replicate a monadic value the given number of times. +-- +-- Subject to fusion +-- +-- Since 1.2.0 +replicateM, replicateMC :: Monad m => Int -> m a -> Producer m a +replicateMC cnt0 ma = + loop cnt0 + where + loop i + | i <= 0 = return () + | otherwise = lift ma >>= yield >> loop (i - 1) +{-# INLINE replicateMC #-} +STREAMING(replicateM, replicateMC, replicateMS, cnt0 ma) + +-- | A strict left fold. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +fold, foldC :: Monad m + => (b -> a -> b) + -> b + -> Consumer a m b +foldC f = + loop + where + loop !accum = await >>= maybe (return accum) (loop . f accum) +{-# INLINE foldC #-} +STREAMING(fold, foldC, foldS, f accum) + +-- | A monadic strict left fold. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +foldM, foldMC :: Monad m + => (b -> a -> m b) + -> b + -> Consumer a m b +foldMC f = + loop + where + loop accum = do + await >>= maybe (return accum) go + where + go a = do + accum' <- lift $ f accum a + accum' `seq` loop accum' +{-# INLINE foldMC #-} +STREAMING(foldM, foldMC, foldMS, f accum) + +----------------------------------------------------------------- +-- These are for cases where- for whatever reason- stream fusion cannot be +-- applied. +connectFold :: Monad m => Source m a -> (b -> a -> b) -> b -> m b +connectFold (CI.ConduitM src0) f = + go (src0 CI.Done) + where + go (CI.Done ()) b = return b + go (CI.HaveOutput src _ a) b = go src Prelude.$! f b a + go (CI.NeedInput _ c) b = go (c ()) b + go (CI.Leftover src ()) b = go src b + go (CI.PipeM msrc) b = do + src <- msrc + go src b +{-# INLINE connectFold #-} +{-# RULES "conduit: $$ fold" forall src f b. src $$ fold f b = connectFold src f b #-} + +connectFoldM :: Monad m => Source m a -> (b -> a -> m b) -> b -> m b +connectFoldM (CI.ConduitM src0) f = + go (src0 CI.Done) + where + go (CI.Done ()) b = return b + go (CI.HaveOutput src _ a) b = do + !b' <- f b a + go src b' + go (CI.NeedInput _ c) b = go (c ()) b + go (CI.Leftover src ()) b = go src b + go (CI.PipeM msrc) b = do + src <- msrc + go src b +{-# INLINE connectFoldM #-} +{-# RULES "conduit: $$ foldM" forall src f b. src $$ foldM f b = connectFoldM src f b #-} +----------------------------------------------------------------- + +-- | A monoidal strict left fold. +-- +-- Subject to fusion +-- +-- Since 0.5.3 +foldMap :: (Monad m, Monoid b) + => (a -> b) + -> Consumer a m b +INLINE_RULE(foldMap, f, let combiner accum = mappend accum . f in fold combiner mempty) + +-- | A monoidal strict left fold in a Monad. +-- +-- Since 1.0.8 +foldMapM :: (Monad m, Monoid b) + => (a -> m b) + -> Consumer a m b +INLINE_RULE(foldMapM, f, let combiner accum = liftM (mappend accum) . f in foldM combiner mempty) + +-- | Apply the action to all values in the stream. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +mapM_, mapM_C :: Monad m + => (a -> m ()) + -> Consumer a m () +mapM_C f = awaitForever $ lift . f +{-# INLINE mapM_C #-} +STREAMING(mapM_, mapM_C, mapM_S, f) + +srcMapM_ :: Monad m => Source m a -> (a -> m ()) -> m () +srcMapM_ (CI.ConduitM src) f = + go (src CI.Done) + where + go (CI.Done ()) = return () + go (CI.PipeM mp) = mp >>= go + go (CI.Leftover p ()) = go p + go (CI.HaveOutput p _ o) = f o >> go p + go (CI.NeedInput _ c) = go (c ()) +{-# INLINE srcMapM_ #-} +{-# RULES "conduit: connect to mapM_" [2] forall f src. src $$ mapM_ f = srcMapM_ src f #-} + +-- | Ignore a certain number of values in the stream. This function is +-- semantically equivalent to: +-- +-- > drop i = take i >> return () +-- +-- However, @drop@ is more efficient as it does not need to hold values in +-- memory. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +drop, dropC :: Monad m + => Int + -> Consumer a m () +dropC = + loop + where + loop i | i <= 0 = return () + loop count = await >>= maybe (return ()) (\_ -> loop (count - 1)) +{-# INLINE dropC #-} +STREAMING(drop, dropC, dropS, i) + +-- | Take some values from the stream and return as a list. If you want to +-- instead create a conduit that pipes data to another sink, see 'isolate'. +-- This function is semantically equivalent to: +-- +-- > take i = isolate i =$ consume +-- +-- Subject to fusion +-- +-- Since 0.3.0 +take, takeC :: Monad m + => Int + -> Consumer a m [a] +takeC = + loop id + where + loop front count | count <= 0 = return $ front [] + loop front count = await >>= maybe + (return $ front []) + (\x -> loop (front . (x:)) (count - 1)) +{-# INLINE takeC #-} +STREAMING(take, takeC, takeS, i) + +-- | Take a single value from the stream, if available. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +head, headC :: Monad m => Consumer a m (Maybe a) +headC = await +{-# INLINE headC #-} +STREAMING0(head, headC, headS) + +-- | Look at the next value in the stream, if available. This function will not +-- change the state of the stream. +-- +-- Since 0.3.0 +peek :: Monad m => Consumer a m (Maybe a) +peek = await >>= maybe (return Nothing) (\x -> leftover x >> return (Just x)) + +-- | Apply a transformation to all values in a stream. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +map, mapC :: Monad m => (a -> b) -> Conduit a m b +mapC f = awaitForever $ yield . f +{-# INLINE mapC #-} +STREAMING(map, mapC, mapS, f) + +-- Since a Source never has any leftovers, fusion rules on it are safe. +{- +{-# RULES "conduit: source/map fusion =$=" forall f src. src =$= map f = mapFuseRight src f #-} + +mapFuseRight :: Monad m => Source m a -> (a -> b) -> Source m b +mapFuseRight src f = CIC.mapOutput f src +{-# INLINE mapFuseRight #-} +-} + +{- + +It might be nice to include these rewrite rules, but they may have subtle +differences based on leftovers. + +{-# RULES "conduit: map-to-mapOutput pipeL" forall f src. pipeL src (map f) = mapOutput f src #-} +{-# RULES "conduit: map-to-mapOutput $=" forall f src. src $= (map f) = mapOutput f src #-} +{-# RULES "conduit: map-to-mapOutput pipe" forall f src. pipe src (map f) = mapOutput f src #-} +{-# RULES "conduit: map-to-mapOutput >+>" forall f src. src >+> (map f) = mapOutput f src #-} + +{-# RULES "conduit: map-to-mapInput pipeL" forall f sink. pipeL (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} +{-# RULES "conduit: map-to-mapInput =$" forall f sink. map f =$ sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} +{-# RULES "conduit: map-to-mapInput pipe" forall f sink. pipe (map f) sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} +{-# RULES "conduit: map-to-mapInput >+>" forall f sink. map f >+> sink = mapInput f (Prelude.const Prelude.Nothing) sink #-} + +{-# RULES "conduit: map-to-mapOutput =$=" forall f con. con =$= map f = mapOutput f con #-} +{-# RULES "conduit: map-to-mapInput =$=" forall f con. map f =$= con = mapInput f (Prelude.const Prelude.Nothing) con #-} + +{-# INLINE [1] map #-} + +-} + +-- | Apply a monadic transformation to all values in a stream. +-- +-- If you do not need the transformed values, and instead just want the monadic +-- side-effects of running the action, see 'mapM_'. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +mapM, mapMC :: Monad m => (a -> m b) -> Conduit a m b +mapMC f = awaitForever $ \a -> lift (f a) >>= yield +{-# INLINE mapMC #-} +STREAMING(mapM, mapMC, mapMS, f) + +-- | Apply a monadic action on all values in a stream. +-- +-- This @Conduit@ can be used to perform a monadic side-effect for every +-- value, whilst passing the value through the @Conduit@ as-is. +-- +-- > iterM f = mapM (\a -> f a >>= \() -> return a) +-- +-- Subject to fusion +-- +-- Since 0.5.6 +iterM, iterMC :: Monad m => (a -> m ()) -> Conduit a m a +iterMC f = awaitForever $ \a -> lift (f a) >> yield a +{-# INLINE iterMC #-} +STREAMING(iterM, iterMC, iterMS, f) + +-- | Apply a transformation that may fail to all values in a stream, discarding +-- the failures. +-- +-- Subject to fusion +-- +-- Since 0.5.1 +mapMaybe, mapMaybeC :: Monad m => (a -> Maybe b) -> Conduit a m b +mapMaybeC f = awaitForever $ maybe (return ()) yield . f +{-# INLINE mapMaybeC #-} +STREAMING(mapMaybe, mapMaybeC, mapMaybeS, f) + +-- | Apply a monadic transformation that may fail to all values in a stream, +-- discarding the failures. +-- +-- Subject to fusion +-- +-- Since 0.5.1 +mapMaybeM, mapMaybeMC :: Monad m => (a -> m (Maybe b)) -> Conduit a m b +mapMaybeMC f = awaitForever $ maybe (return ()) yield <=< lift . f +{-# INLINE mapMaybeMC #-} +STREAMING(mapMaybeM, mapMaybeMC, mapMaybeMS, f) + +-- | Filter the @Just@ values from a stream, discarding the @Nothing@ values. +-- +-- Subject to fusion +-- +-- Since 0.5.1 +catMaybes, catMaybesC :: Monad m => Conduit (Maybe a) m a +catMaybesC = awaitForever $ maybe (return ()) yield +{-# INLINE catMaybesC #-} +STREAMING0(catMaybes, catMaybesC, catMaybesS) + +-- | Generalization of 'catMaybes'. It puts all values from +-- 'F.Foldable' into stream. +-- +-- Subject to fusion +-- +-- Since 1.0.6 +concat, concatC :: (Monad m, F.Foldable f) => Conduit (f a) m a +concatC = awaitForever $ F.mapM_ yield +{-# INLINE concatC #-} +STREAMING0(concat, concatC, concatS) + +-- | Apply a transformation to all values in a stream, concatenating the output +-- values. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +concatMap, concatMapC :: Monad m => (a -> [b]) -> Conduit a m b +concatMapC f = awaitForever $ sourceList . f +{-# INLINE concatMapC #-} +STREAMING(concatMap, concatMapC, concatMapS, f) + +-- | Apply a monadic transformation to all values in a stream, concatenating +-- the output values. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +concatMapM, concatMapMC :: Monad m => (a -> m [b]) -> Conduit a m b +concatMapMC f = awaitForever $ sourceList <=< lift . f +{-# INLINE concatMapMC #-} +STREAMING(concatMapM, concatMapMC, concatMapMS, f) + +-- | 'concatMap' with a strict accumulator. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +concatMapAccum, concatMapAccumC :: Monad m => (a -> accum -> (accum, [b])) -> accum -> Conduit a m b +concatMapAccumC f x0 = void (mapAccum f x0) =$= concat +{-# INLINE concatMapAccumC #-} +STREAMING(concatMapAccum, concatMapAccumC, concatMapAccumS, f x0) + +-- | Deprecated synonym for @mapAccum@ +-- +-- Since 1.0.6 +scanl :: Monad m => (a -> s -> (s, b)) -> s -> Conduit a m b +scanl f s = void $ mapAccum f s +{-# DEPRECATED scanl "Use mapAccum instead" #-} + +-- | Deprecated synonym for @mapAccumM@ +-- +-- Since 1.0.6 +scanlM :: Monad m => (a -> s -> m (s, b)) -> s -> Conduit a m b +scanlM f s = void $ mapAccumM f s +{-# DEPRECATED scanlM "Use mapAccumM instead" #-} + +-- | Analog of @mapAccumL@ for lists. Note that in contrast to @mapAccumL@, the function argument +-- takes the accumulator as its second argument, not its first argument, and the accumulated value +-- is strict. +-- +-- Subject to fusion +-- +-- Since 1.1.1 +mapAccum, mapAccumC :: Monad m => (a -> s -> (s, b)) -> s -> ConduitM a b m s +mapAccumC f = + loop + where + loop !s = await >>= maybe (return s) go + where + go a = case f a s of + (s', b) -> yield b >> loop s' +STREAMING(mapAccum, mapAccumC, mapAccumS, f s) + +-- | Monadic `mapAccum`. +-- +-- Subject to fusion +-- +-- Since 1.1.1 +mapAccumM, mapAccumMC :: Monad m => (a -> s -> m (s, b)) -> s -> ConduitM a b m s +mapAccumMC f = + loop + where + loop !s = await >>= maybe (return s) go + where + go a = do (s', b) <- lift $ f a s + yield b + loop s' +{-# INLINE mapAccumMC #-} +STREAMING(mapAccumM, mapAccumMC, mapAccumMS, f s) + +-- | Analog of 'Prelude.scanl' for lists. +-- +-- Subject to fusion +-- +-- Since 1.1.1 +scan :: Monad m => (a -> b -> b) -> b -> ConduitM a b m b +INLINE_RULE(scan, f, mapAccum (\a b -> let r = f a b in (r, r))) + +-- | Monadic @scanl@. +-- +-- Subject to fusion +-- +-- Since 1.1.1 +scanM :: Monad m => (a -> b -> m b) -> b -> ConduitM a b m b +INLINE_RULE(scanM, f, mapAccumM (\a b -> f a b >>= \r -> return (r, r))) + +-- | 'concatMapM' with a strict accumulator. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +concatMapAccumM, concatMapAccumMC :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> Conduit a m b +concatMapAccumMC f x0 = void (mapAccumM f x0) =$= concat +{-# INLINE concatMapAccumMC #-} +STREAMING(concatMapAccumM, concatMapAccumMC, concatMapAccumMS, f x0) + +-- | Generalization of 'mapMaybe' and 'concatMap'. It applies function +-- to all values in a stream and send values inside resulting +-- 'Foldable' downstream. +-- +-- Subject to fusion +-- +-- Since 1.0.6 +mapFoldable, mapFoldableC :: (Monad m, F.Foldable f) => (a -> f b) -> Conduit a m b +mapFoldableC f = awaitForever $ F.mapM_ yield . f +{-# INLINE mapFoldableC #-} +STREAMING(mapFoldable, mapFoldableC, mapFoldableS, f) + +-- | Monadic variant of 'mapFoldable'. +-- +-- Subject to fusion +-- +-- Since 1.0.6 +mapFoldableM, mapFoldableMC :: (Monad m, F.Foldable f) => (a -> m (f b)) -> Conduit a m b +mapFoldableMC f = awaitForever $ F.mapM_ yield <=< lift . f +{-# INLINE mapFoldableMC #-} +STREAMING(mapFoldableM, mapFoldableMC, mapFoldableMS, f) + +-- | Consume all values from the stream and return as a list. Note that this +-- will pull all values into memory. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +consume, consumeC :: Monad m => Consumer a m [a] +consumeC = + loop id + where + loop front = await >>= maybe (return $ front []) (\x -> loop $ front . (x:)) +{-# INLINE consumeC #-} +STREAMING0(consume, consumeC, consumeS) + +-- | Group a stream into chunks of a given size. The last chunk may contain +-- fewer than n elements. +-- +-- Subject to fusion +-- +-- Since 1.2.9 +chunksOf :: Monad m => Int -> Conduit a m [a] +chunksOf n = + start + where + start = await >>= maybe (return ()) (\x -> loop n (x:)) + + loop !count rest = + await >>= maybe (yield (rest [])) go + where + go y + | count > 1 = loop (count - 1) (rest . (y:)) + | otherwise = yield (rest []) >> loop n (y:) + +-- | Grouping input according to an equality function. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +groupBy, groupByC :: Monad m => (a -> a -> Bool) -> Conduit a m [a] +groupByC f = + start + where + start = await >>= maybe (return ()) (loop id) + + loop rest x = + await >>= maybe (yield (x : rest [])) go + where + go y + | f x y = loop (rest . (y:)) x + | otherwise = yield (x : rest []) >> loop id y +STREAMING(groupBy, groupByC, groupByS, f) + +-- | 'groupOn1' is similar to @groupBy id@ +-- +-- returns a pair, indicating there are always 1 or more items in the grouping. +-- This is designed to be converted into a NonEmpty structure +-- but it avoids a dependency on another package +-- +-- > import Data.List.NonEmpty +-- > +-- > groupOn1 :: (Monad m, Eq b) => (a -> b) -> Conduit a m (NonEmpty a) +-- > groupOn1 f = CL.groupOn1 f =$= CL.map (uncurry (:|)) +-- +-- Subject to fusion +-- +-- Since 1.1.7 +groupOn1, groupOn1C :: (Monad m, Eq b) + => (a -> b) + -> Conduit a m (a, [a]) +groupOn1C f = + start + where + start = await >>= maybe (return ()) (loop id) + + loop rest x = + await >>= maybe (yield (x, rest [])) go + where + go y + | f x == f y = loop (rest . (y:)) x + | otherwise = yield (x, rest []) >> loop id y +STREAMING(groupOn1, groupOn1C, groupOn1S, f) + +-- | Ensure that the inner sink consumes no more than the given number of +-- values. Note this this does /not/ ensure that the sink consumes all of those +-- values. To get the latter behavior, combine with 'sinkNull', e.g.: +-- +-- > src $$ do +-- > x <- isolate count =$ do +-- > x <- someSink +-- > sinkNull +-- > return x +-- > someOtherSink +-- > ... +-- +-- Subject to fusion +-- +-- Since 0.3.0 +isolate, isolateC :: Monad m => Int -> Conduit a m a +isolateC = + loop + where + loop count | count <= 0 = return () + loop count = await >>= maybe (return ()) (\x -> yield x >> loop (count - 1)) +STREAMING(isolate, isolateC, isolateS, count) + +-- | Keep only values in the stream passing a given predicate. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +filter, filterC :: Monad m => (a -> Bool) -> Conduit a m a +filterC f = awaitForever $ \i -> when (f i) (yield i) +STREAMING(filter, filterC, filterS, f) + +filterFuseRight :: Monad m => Source m a -> (a -> Bool) -> Source m a +filterFuseRight (CI.ConduitM src) f = CI.ConduitM $ \rest -> let + go (CI.Done ()) = rest () + go (CI.PipeM mp) = CI.PipeM (liftM go mp) + go (CI.Leftover p i) = CI.Leftover (go p) i + go (CI.HaveOutput p c o) + | f o = CI.HaveOutput (go p) c o + | otherwise = go p + go (CI.NeedInput p c) = CI.NeedInput (go . p) (go . c) + in go (src CI.Done) +-- Intermediate finalizers are dropped, but this is acceptable: the next +-- yielded value would be demanded by downstream in any event, and that new +-- finalizer will always override the existing finalizer. +{-# RULES "conduit: source/filter fusion =$=" forall f src. src =$= filter f = filterFuseRight src f #-} +{-# INLINE filterFuseRight #-} + +-- | Ignore the remainder of values in the source. Particularly useful when +-- combined with 'isolate'. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +sinkNull, sinkNullC :: Monad m => Consumer a m () +sinkNullC = awaitForever $ \_ -> return () +{-# INLINE sinkNullC #-} +STREAMING0(sinkNull, sinkNullC, sinkNullS) + +srcSinkNull :: Monad m => Source m a -> m () +srcSinkNull (CI.ConduitM src) = + go (src CI.Done) + where + go (CI.Done ()) = return () + go (CI.PipeM mp) = mp >>= go + go (CI.Leftover p ()) = go p + go (CI.HaveOutput p _ _) = go p + go (CI.NeedInput _ c) = go (c ()) +{-# INLINE srcSinkNull #-} +{-# RULES "conduit: connect to sinkNull" forall src. src $$ sinkNull = srcSinkNull src #-} + +-- | A source that outputs no values. Note that this is just a type-restricted +-- synonym for 'mempty'. +-- +-- Subject to fusion +-- +-- Since 0.3.0 +sourceNull, sourceNullC :: Monad m => Producer m a +sourceNullC = return () +{-# INLINE sourceNullC #-} +STREAMING0(sourceNull, sourceNullC, sourceNullS) + +-- | Run a @Pipe@ repeatedly, and output its result value downstream. Stops +-- when no more input is available from upstream. +-- +-- Since 0.5.0 +sequence :: Monad m + => Consumer i m o -- ^ @Pipe@ to run repeatedly + -> Conduit i m o +sequence sink = + self + where + self = awaitForever $ \i -> leftover i >> sink >>= yield diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d9f0417 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2012 Michael Snoyman, http://www.yesodweb.com/ + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..c363b58 --- /dev/null +++ b/README.md @@ -0,0 +1,9 @@ +## conduit + +`conduit` is a solution to the streaming data problem, allowing for production, +transformation, and consumption of streams of data in constant memory. It is an +alternative to lazy I\/O which guarantees deterministic resource handling. + +For more information about conduit in general, and how this package in +particular fits into the ecosystem, see [the conduit +homepage](https://github.com/snoyberg/conduit#readme). diff --git a/Setup.lhs b/Setup.lhs new file mode 100755 index 0000000..06e2708 --- /dev/null +++ b/Setup.lhs @@ -0,0 +1,7 @@ +#!/usr/bin/env runhaskell + +> module Main where +> import Distribution.Simple + +> main :: IO () +> main = defaultMain diff --git a/benchmarks/optimize-201408.hs b/benchmarks/optimize-201408.hs new file mode 100644 index 0000000..eeec2a9 --- /dev/null +++ b/benchmarks/optimize-201408.hs @@ -0,0 +1,428 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE TupleSections #-} +{-# LANGUAGE RankNTypes #-} +-- Collection of three benchmarks: a simple integral sum, monte carlo analysis, +-- and sliding vector. +import Control.DeepSeq +import Control.Monad (foldM) +import Control.Monad (when, liftM) +import Control.Monad.Codensity (lowerCodensity) +import Control.Monad.IO.Class (MonadIO, liftIO) +import Control.Monad.Trans.Class (lift) +import Criterion.Main +import Data.Conduit +import Data.Conduit.Internal (ConduitM (..), Pipe (..)) +import qualified Data.Conduit.Internal as CI +import qualified Data.Conduit.List as CL +import qualified Data.Foldable as F +import Data.Functor.Identity (runIdentity) +import Data.IORef +import Data.List (foldl') +import Data.Monoid (mempty) +import qualified Data.Sequence as Seq +import qualified Data.Vector as VB +import qualified Data.Vector.Generic as V +import qualified Data.Vector.Generic.Mutable as VM +import qualified Data.Vector.Unboxed as VU +import System.Environment (withArgs) +import qualified System.Random.MWC as MWC +import Test.Hspec + +data TestBench = TBGroup String [TestBench] + | TBBench Benchmark + | forall a b. (Eq b, Show b) => TBPure String a b (a -> b) + | forall a. (Eq a, Show a) => TBIO String a (IO a) + | forall a. (Eq a, Show a) => TBIOTest String (a -> IO ()) (IO a) + | forall a. (Eq a, Show a) => TBIOBench String a (IO a) (IO ()) + +toSpec :: TestBench -> Spec +toSpec (TBGroup name tbs) = describe name $ mapM_ toSpec tbs +toSpec (TBBench _) = return () +toSpec (TBPure name a b f) = it name $ f a `shouldBe` b +toSpec (TBIO name a f) = it name $ f >>= (`shouldBe` a) +toSpec (TBIOTest name spec f) = it name $ f >>= spec +toSpec (TBIOBench name a f _) = it name $ f >>= (`shouldBe` a) + +toBench :: TestBench -> Benchmark +toBench (TBGroup name tbs) = bgroup name $ map toBench tbs +toBench (TBBench b) = b +toBench (TBPure name a _ f) = bench name $ whnf f a +toBench (TBIO name _ f) = bench name $ whnfIO f +toBench (TBIOTest name _ f) = bench name $ whnfIO f +toBench (TBIOBench name _ _ f) = bench name $ whnfIO f + +runTestBench :: [TestBench] -> IO () +runTestBench tbs = do + withArgs [] $ hspec $ mapM_ toSpec tbs + defaultMain $ map toBench tbs + +main :: IO () +main = runTestBench =<< sequence + [ sumTB + , mapSumTB + , monteCarloTB + , fmap (TBGroup "sliding window") $ sequence + [ slidingWindow 10 + , slidingWindow 30 + , slidingWindow 100 + , slidingWindow 1000 + ] + ] + +----------------------------------------------------------------------- + +sumTB :: IO TestBench +sumTB = do + upperRef <- newIORef upper0 + return $ TBGroup "sum" + [ TBPure "Data.List.foldl'" upper0 expected + $ \upper -> foldl' (+) 0 [1..upper] + , TBIO "Control.Monad.foldM" expected $ do + upper <- readIORef upperRef + foldM plusM 0 [1..upper] + , TBPure "low level" upper0 expected $ \upper -> + let go x !t + | x > upper = t + | otherwise = go (x + 1) (t + x) + in go 1 0 + , TBIO "boxed vectors, I/O" expected $ do + upper <- readIORef upperRef + VB.foldM' plusM 0 $ VB.enumFromTo 1 upper + , TBPure "boxed vectors" upper0 expected + $ \upper -> VB.foldl' (+) 0 (VB.enumFromTo 1 upper) + , TBPure "unboxed vectors" upper0 expected + $ \upper -> VU.foldl' (+) 0 (VU.enumFromTo 1 upper) + , TBPure "conduit, pure, fold" upper0 expected + $ \upper -> runIdentity $ CL.enumFromTo 1 upper $$ CL.fold (+) 0 + , TBPure "conduit, pure, foldM" upper0 expected + $ \upper -> runIdentity $ CL.enumFromTo 1 upper $$ CL.foldM plusM 0 + , TBIO "conduit, IO, fold" expected $ do + upper <- readIORef upperRef + CL.enumFromTo 1 upper $$ CL.fold (+) 0 + , TBIO "conduit, IO, foldM" expected $ do + upper <- readIORef upperRef + CL.enumFromTo 1 upper $$ CL.foldM plusM 0 + ] + where + upper0 = 10000 :: Int + expected = sum [1..upper0] + + plusM x y = return $! x + y + +----------------------------------------------------------------------- + +mapSumTB :: IO TestBench +mapSumTB = return $ TBGroup "map + sum" + [ TBPure "boxed vectors" upper0 expected + $ \upper -> VB.foldl' (+) 0 + $ VB.map (+ 1) + $ VB.map (* 2) + $ VB.enumFromTo 1 upper + , TBPure "unboxed vectors" upper0 expected + $ \upper -> VU.foldl' (+) 0 + $ VU.map (+ 1) + $ VU.map (* 2) + $ VU.enumFromTo 1 upper + , TBPure "conduit, connect1" upper0 expected $ \upper -> runIdentity + $ CL.enumFromTo 1 upper + $$ CL.map (* 2) + =$= CL.map (+ 1) + =$= CL.fold (+) 0 + , TBPure "conduit, connect2" upper0 expected $ \upper -> runIdentity + $ CL.enumFromTo 1 upper + =$= CL.map (* 2) + $$ CL.map (+ 1) + =$= CL.fold (+) 0 + , TBPure "conduit, connect3" upper0 expected $ \upper -> runIdentity + $ CL.enumFromTo 1 upper + =$= CL.map (* 2) + =$= CL.map (+ 1) + $$ CL.fold (+) 0 + , TBPure "conduit, inner fuse" upper0 expected $ \upper -> runIdentity + $ CL.enumFromTo 1 upper + =$= (CL.map (* 2) + =$= CL.map (+ 1)) + $$ CL.fold (+) 0 + ] + where + upper0 = 10000 :: Int + expected = sum $ map (+ 1) $ map (* 2) [1..upper0] + +----------------------------------------------------------------------- + +monteCarloTB :: IO TestBench +monteCarloTB = return $ TBGroup "monte carlo" + [ TBIOTest "conduit" closeEnough $ do + gen <- MWC.createSystemRandom + successes <- CL.replicateM count (MWC.uniform gen) + $$ CL.fold (\t (x, y) -> + if (x*x + y*(y :: Double) < 1) + then t + 1 + else t) + (0 :: Int) + return $ fromIntegral successes / fromIntegral count * 4 + , TBIOTest "low level" closeEnough $ do + gen <- MWC.createSystemRandom + let go :: Int -> Int -> IO Double + go 0 !t = return $! fromIntegral t / fromIntegral count * 4 + go i !t = do + (x, y) <- MWC.uniform gen + let t' + | x*x + y*(y :: Double) < 1 = t + 1 + | otherwise = t + go (i - 1) t' + go count (0 :: Int) + ] + where + count = 100000 :: Int + + closeEnough x + | abs (x - 3.14159 :: Double) < 0.2 = return () + | otherwise = error $ "Monte carlo analysis too inaccurate: " ++ show x + +----------------------------------------------------------------------- + +slidingWindow :: Int -> IO TestBench +slidingWindow window = do + upperRef <- newIORef upper0 + return $ TBGroup (show window) + [ TBIOBench "low level, Seq" expected + (swLowLevelSeq window upperRef id (\x y -> x . (F.toList y:)) ($ [])) + (swLowLevelSeq window upperRef () (\() y -> rnf y) id) + , TBIOBench "conduit, Seq" expected + (swConduitSeq window upperRef id (\x y -> x . (F.toList y:)) ($ [])) + (swConduitSeq window upperRef () (\() y -> rnf y) id) + {- https://ghc.haskell.org/trac/ghc/ticket/9446 + , TBIOBench "low level, boxed Vector" expected + (swLowLevelVector window upperRef id (\x y -> x . (VB.toList y:)) ($ [])) + (swLowLevelVector window upperRef () (\() y -> rnf (y :: VB.Vector Int)) id) + -} + , TBBench $ bench "low level, boxed Vector" $ whnfIO $ + swLowLevelVector window upperRef () (\() y -> rnf (y :: VB.Vector Int)) id + + {- https://ghc.haskell.org/trac/ghc/ticket/9446 + , TBIOBench "conduit, boxed Vector" expected + (swConduitVector window upperRef id (\x y -> x . (VB.toList y:)) ($ [])) + (swConduitVector window upperRef () (\() y -> rnf (y :: VB.Vector Int)) id) + -} + + , TBBench $ bench "conduit, boxed Vector" $ whnfIO $ + swConduitVector window upperRef () (\() y -> rnf (y :: VB.Vector Int)) id + + + , TBIOBench "low level, unboxed Vector" expected + (swLowLevelVector window upperRef id (\x y -> x . (VU.toList y:)) ($ [])) + (swLowLevelVector window upperRef () (\() y -> rnf (y :: VU.Vector Int)) id) + , TBIOBench "conduit, unboxed Vector" expected + (swConduitVector window upperRef id (\x y -> x . (VU.toList y:)) ($ [])) + (swConduitVector window upperRef () (\() y -> rnf (y :: VU.Vector Int)) id) + ] + where + upper0 = 10000 + expected = + loop [1..upper0] + where + loop input + | length x == window = x : loop y + | otherwise = [] + where + x = take window input + y = drop 1 input + +swLowLevelSeq :: Int -> IORef Int -> t -> (t -> Seq.Seq Int -> t) -> (t -> t') -> IO t' +swLowLevelSeq window upperRef t0 f final = do + upper <- readIORef upperRef + + let phase1 i !s + | i > window = phase2 i s t0 + | otherwise = phase1 (i + 1) (s Seq.|> i) + + phase2 i !s !t + | i > upper = t' + | otherwise = phase2 (i + 1) s' t' + where + t' = f t s + s' = Seq.drop 1 s Seq.|> i + + return $! final $! phase1 1 mempty + +swLowLevelVector :: V.Vector v Int + => Int + -> IORef Int + -> t + -> (t -> v Int -> t) + -> (t -> t') + -> IO t' +swLowLevelVector window upperRef t0 f final = do + upper <- readIORef upperRef + + let go !i !t _ _ _ | i > upper = return $! final $! t + go !i !t !end _mv mv2 | end == bufSz = newBuf >>= go i t sz mv2 + go !i !t !end mv mv2 = do + VM.unsafeWrite mv end i + when (end > sz) $ VM.unsafeWrite mv2 (end - sz) i + let end' = end + 1 + t' <- + if end' < sz + then return t + else do + v <- V.unsafeFreeze $ VM.unsafeSlice (end' - sz) sz mv + return $! f t v + go (i + 1) t' end' mv mv2 + + mv <- newBuf + mv2 <- newBuf + go 1 t0 0 mv mv2 + where + sz = window + bufSz = 2 * window + newBuf = VM.new bufSz + +swConduitSeq :: Int + -> IORef Int + -> t + -> (t -> Seq.Seq Int -> t) + -> (t -> t') + -> IO t' +swConduitSeq window upperRef t0 f final = do + upper <- readIORef upperRef + + t <- CL.enumFromTo 1 upper + $= slidingWindowC window + $$ CL.fold f t0 + return $! final t + +swConduitVector :: V.Vector v Int + => Int + -> IORef Int + -> t + -> (t -> v Int -> t) + -> (t -> t') + -> IO t' +swConduitVector window upperRef t0 f final = do + upper <- readIORef upperRef + + t <- CL.enumFromTo 1 upper + $= slidingVectorC window + $$ CL.fold f t0 + return $! final t + +slidingWindowC :: Monad m => Int -> Conduit a m (Seq.Seq a) +slidingWindowC = slidingWindowCC +{-# INLINE [0] slidingWindowC #-} +{-# RULES "unstream slidingWindowC" + forall i. slidingWindowC i = CI.unstream (CI.streamConduit (slidingWindowCC i) (slidingWindowS i)) + #-} + +slidingWindowCC :: Monad m => Int -> Conduit a m (Seq.Seq a) +slidingWindowCC sz = + go sz mempty + where + goContinue st = await >>= + maybe (return ()) + (\x -> do + let st' = st Seq.|> x + yield st' >> goContinue (Seq.drop 1 st') + ) + go 0 st = yield st >> goContinue (Seq.drop 1 st) + go !n st = CL.head >>= \m -> + case m of + Nothing | n < sz -> yield st + | otherwise -> return () + Just x -> go (n-1) (st Seq.|> x) +{-# INLINE slidingWindowCC #-} + +slidingWindowS :: Monad m => Int -> CI.Stream m a () -> CI.Stream m (Seq.Seq a) () +slidingWindowS sz (CI.Stream step ms0) = + CI.Stream step' $ liftM (\s -> Left (s, sz, mempty)) ms0 + where + step' (Left (s, 0, st)) = return $ CI.Emit (Right (s, st)) st + step' (Left (s, i, st)) = do + res <- step s + return $ case res of + CI.Stop () -> CI.Stop () + CI.Skip s' -> CI.Skip $ Left (s', i, st) + CI.Emit s' a -> CI.Skip $ Left (s', i - 1, st Seq.|> a) + step' (Right (s, st)) = do + res <- step s + return $ case res of + CI.Stop () -> CI.Stop () + CI.Skip s' -> CI.Skip $ Right (s', st) + CI.Emit s' a -> + let st' = Seq.drop 1 st Seq.|> a + in CI.Emit (Right (s', st')) st' +{-# INLINE slidingWindowS #-} + +slidingVectorC :: V.Vector v a => Int -> Conduit a IO (v a) +slidingVectorC = slidingVectorCC +{-# INLINE [0] slidingVectorC #-} +{-# RULES "unstream slidingVectorC" + forall i. slidingVectorC i = CI.unstream (CI.streamConduit (slidingVectorCC i) (slidingVectorS i)) + #-} + +slidingVectorCC :: V.Vector v a => Int -> Conduit a IO (v a) +slidingVectorCC sz = do + mv <- newBuf + mv2 <- newBuf + go 0 mv mv2 + where + bufSz = 2 * sz + newBuf = liftIO (VM.new bufSz) + + go !end _mv mv2 | end == bufSz = newBuf >>= go sz mv2 + go !end mv mv2 = do + mx <- await + case mx of + Nothing -> when (end > 0 && end < sz) $ do + v <- liftIO $ V.unsafeFreeze $ VM.take end mv + yield v + Just x -> do + liftIO $ do + VM.unsafeWrite mv end x + when (end > sz) $ VM.unsafeWrite mv2 (end - sz) x + let end' = end + 1 + when (end' >= sz) $ do + v <- liftIO $ V.unsafeFreeze $ VM.unsafeSlice (end' - sz) sz mv + yield v + go end' mv mv2 + +slidingVectorS :: V.Vector v a => Int -> CI.Stream IO a () -> CI.Stream IO (v a) () +slidingVectorS sz (CI.Stream step ms0) = + CI.Stream step' ms1 + where + bufSz = 2 * sz + newBuf = liftIO (VM.new bufSz) + + ms1 = do + s <- ms0 + mv <- newBuf + mv2 <- newBuf + return (s, 0, mv, mv2) + + step' (_, -1, _, _) = return $ CI.Stop () + step' (s, end, _mv, mv2) | end == bufSz = do + mv3 <- newBuf + return $ CI.Skip (s, sz, mv2, mv3) + step' (s, end, mv, mv2) = do + res <- step s + case res of + CI.Stop () + | end > 0 && end < sz -> do + v <- liftIO $ V.unsafeFreeze $ VM.take end mv + return $ CI.Emit (s, -1, mv, mv2) v + | otherwise -> return $ CI.Stop () + CI.Skip s' -> return $ CI.Skip (s', end, mv, mv2) + CI.Emit s' x -> liftIO $ do + VM.unsafeWrite mv end x + when (end > sz) $ VM.unsafeWrite mv2 (end - sz) x + let end' = end + 1 + state = (s', end', mv, mv2) + if end' >= sz + then do + v <- V.unsafeFreeze $ VM.unsafeSlice (end' - sz) sz mv + return $ CI.Emit state v + else return $ CI.Skip state +{-# INLINE slidingVectorS #-} diff --git a/benchmarks/unfused.hs b/benchmarks/unfused.hs new file mode 100644 index 0000000..50548a3 --- /dev/null +++ b/benchmarks/unfused.hs @@ -0,0 +1,80 @@ +{-# LANGUAGE RankNTypes, BangPatterns #-} +-- Compare low-level, fused, unfused, and partially fused +import Data.Conduit +import qualified Data.Conduit.List as CL +import Data.Conduit.Internal (Step (..), Stream (..), unstream, StreamConduit (..)) +import Criterion.Main +import Data.Functor.Identity (runIdentity) + +-- | unfused +enumFromToC :: (Eq a, Monad m, Enum a) => a -> a -> Producer m a +enumFromToC x0 y = + loop x0 + where + loop x + | x == y = yield x + | otherwise = yield x >> loop (succ x) +{-# INLINE enumFromToC #-} + +-- | unfused +mapC :: Monad m => (a -> b) -> Conduit a m b +mapC f = awaitForever $ yield . f +{-# INLINE mapC #-} + +-- | unfused +foldC :: Monad m => (b -> a -> b) -> b -> Consumer a m b +foldC f = + loop + where + loop !b = await >>= maybe (return b) (loop . f b) +{-# INLINE foldC #-} + +main :: IO () +main = defaultMain + [ bench "low level" $ flip whnf upper0 $ \upper -> + let loop x t + | x > upper = t + | otherwise = loop (x + 1) (t + ((x * 2) + 1)) + in loop 1 0 + , bench "completely fused" $ flip whnf upper0 $ \upper -> + runIdentity + $ CL.enumFromTo 1 upper + $$ CL.map (* 2) + =$ CL.map (+ 1) + =$ CL.fold (+) 0 + , bench "runConduit, completely fused" $ flip whnf upper0 $ \upper -> + runIdentity + $ runConduit + $ CL.enumFromTo 1 upper + =$= CL.map (* 2) + =$= CL.map (+ 1) + =$= CL.fold (+) 0 + , bench "completely unfused" $ flip whnf upper0 $ \upper -> + runIdentity + $ enumFromToC 1 upper + $$ mapC (* 2) + =$ mapC (+ 1) + =$ foldC (+) 0 + , bench "beginning fusion" $ flip whnf upper0 $ \upper -> + runIdentity + $ (CL.enumFromTo 1 upper $= CL.map (* 2)) + $$ mapC (+ 1) + =$ foldC (+) 0 + , bench "middle fusion" $ flip whnf upper0 $ \upper -> + runIdentity + $ enumFromToC 1 upper + $$ (CL.map (* 2) =$= CL.map (+ 1)) + =$ foldC (+) 0 + , bench "ending fusion" $ flip whnf upper0 $ \upper -> + runIdentity + $ enumFromToC 1 upper + $= mapC (* 2) + $$ (CL.map (+ 1) =$ CL.fold (+) 0) + , bench "performance of CL.enumFromTo without fusion" $ flip whnf upper0 $ \upper -> + runIdentity + $ CL.enumFromTo 1 upper + $= mapC (* 2) + $$ (CL.map (+ 1) =$ CL.fold (+) 0) + ] + where + upper0 = 100000 :: Int diff --git a/conduit.cabal b/conduit.cabal new file mode 100644 index 0000000..52b18b0 --- /dev/null +++ b/conduit.cabal @@ -0,0 +1,120 @@ +Name: conduit +Version: 1.2.12.1 +Synopsis: Streaming data processing library. +description: + `conduit` is a solution to the streaming data problem, allowing for production, + transformation, and consumption of streams of data in constant memory. It is an + alternative to lazy I\/O which guarantees deterministic resource handling. + . + For more information about conduit in general, and how this package in + particular fits into the ecosystem, see [the conduit + homepage](https://github.com/snoyberg/conduit#readme). + . + Hackage documentation generation is not reliable. For up to date documentation, please see: . +License: MIT +License-file: LICENSE +Author: Michael Snoyman +Maintainer: michael@snoyman.com +Category: Data, Conduit +Build-type: Simple +Cabal-version: >=1.8 +Homepage: http://github.com/snoyberg/conduit +extra-source-files: test/main.hs + , README.md + , ChangeLog.md + , fusion-macros.h + +Library + Exposed-modules: Data.Conduit + Data.Conduit.List + Data.Conduit.Internal + Data.Conduit.Lift + Data.Conduit.Internal.Fusion + Data.Conduit.Internal.List.Stream + other-modules: Data.Conduit.Internal.Pipe + Data.Conduit.Internal.Conduit + Build-depends: base >= 4.5 && < 5 + , resourcet >= 1.1 && < 1.2 + , exceptions >= 0.6 + , lifted-base >= 0.1 + , transformers-base >= 0.4.1 && < 0.5 + , transformers >= 0.2.2 + , transformers-compat >= 0.3 + , mtl + , mmorph + , monad-control + , primitive + if !impl(ghc>=7.9) + build-depends: void >= 0.5.5 + ghc-options: -Wall + include-dirs: . + +test-suite test + hs-source-dirs: test + main-is: main.hs + other-modules: Data.Conduit.Extra.ZipConduitSpec + , Data.Conduit.StreamSpec + type: exitcode-stdio-1.0 + cpp-options: -DTEST + build-depends: conduit + , base + , hspec >= 1.3 + , QuickCheck >= 2.7 + , transformers + , mtl + , resourcet + , containers + , exceptions >= 0.6 + , safe + , split >= 0.2.0.0 + if !impl(ghc>=7.9) + build-depends: void + ghc-options: -Wall + +--test-suite doctests +-- hs-source-dirs: test +-- main-is: doctests.hs +-- type: exitcode-stdio-1.0 +-- ghc-options: -threaded +-- build-depends: base, directory, doctest >= 0.8 + +-- benchmark utf8-memory-usage +-- type: exitcode-stdio-1.0 +-- hs-source-dirs: benchmarks +-- build-depends: base +-- , text-stream-decode +-- , bytestring +-- , text +-- , conduit +-- main-is: utf8-memory-usage.hs +-- ghc-options: -Wall -O2 -with-rtsopts=-s + +benchmark optimize-201408 + type: exitcode-stdio-1.0 + hs-source-dirs: benchmarks + build-depends: base + , conduit + , vector + , deepseq + , containers + , transformers + , hspec + , mwc-random + , criterion + , kan-extensions + main-is: optimize-201408.hs + ghc-options: -Wall -O2 -rtsopts + +benchmark unfused + type: exitcode-stdio-1.0 + hs-source-dirs: benchmarks + build-depends: base + , conduit + , criterion + , transformers + main-is: unfused.hs + ghc-options: -Wall -O2 -rtsopts + +source-repository head + type: git + location: git://github.com/snoyberg/conduit.git diff --git a/fusion-macros.h b/fusion-macros.h new file mode 100644 index 0000000..fecd35e --- /dev/null +++ b/fusion-macros.h @@ -0,0 +1,23 @@ +#define INLINE_RULE0(new,old) ;\ + new = old ;\ + {-# INLINE [0] new #-} ;\ + {-# RULES "inline new" new = old #-} + +#define INLINE_RULE(new,vars,body) ;\ + new vars = body ;\ + {-# INLINE [0] new #-} ;\ + {-# RULES "inline new" forall vars. new vars = body #-} + +#define STREAMING0(name, nameC, nameS) ;\ + name = nameC ;\ + {-# INLINE [0] name #-} ;\ + {-# RULES "unstream name" \ + name = unstream (streamConduit nameC nameS) \ + #-} + +#define STREAMING(name, nameC, nameS, vars) ;\ + name = nameC ;\ + {-# INLINE [0] name #-} ;\ + {-# RULES "unstream name" forall vars. \ + name vars = unstream (streamConduit (nameC vars) (nameS vars)) \ + #-} diff --git a/test/Data/Conduit/Extra/ZipConduitSpec.hs b/test/Data/Conduit/Extra/ZipConduitSpec.hs new file mode 100644 index 0000000..91b5c75 --- /dev/null +++ b/test/Data/Conduit/Extra/ZipConduitSpec.hs @@ -0,0 +1,34 @@ +module Data.Conduit.Extra.ZipConduitSpec (spec) where +import Test.Hspec +import Data.Conduit +import qualified Data.Conduit.List as CL +import Control.Applicative ((<*), pure) + +spec :: Spec +spec = describe "Data.Conduit.Extra.ZipConduit" $ do + it "ZipConduit" $ do + let src = mapM_ yield [1..3 :: Int] + conduit1 = CL.map (+1) + conduit2 = CL.concatMap (replicate 2) + conduit = getZipConduit $ ZipConduit conduit1 <* ZipConduit conduit2 + sink = CL.consume + res <- src $$ conduit =$ sink + res `shouldBe` [2, 1, 1, 3, 2, 2, 4, 3, 3] + it "sequenceConduits" $ do + let src = mapM_ yield [1..3 :: Int] + conduit1 = CL.map (+1) + conduit2 = CL.concatMap (replicate 2) + conduit = do + x <- sequenceConduits [conduit1, conduit2] + yield $ length x + 10 + sink = CL.consume + res <- src $$ conduit =$ sink + res `shouldBe` [2, 1, 1, 3, 2, 2, 4, 3, 3, 12] + it "ZipConduitMonad" $ do + let src = mapM_ yield [1..3 :: Int] + conduit1 = CL.mapM (pure . (+1)) + conduit2 = CL.map id + conduit = getZipConduit $ ZipConduit conduit1 <* ZipConduit conduit2 + sink = CL.consume + res <- src $$ conduit =$ sink + res `shouldBe` [2, 1, 3, 2, 4, 3] diff --git a/test/Data/Conduit/StreamSpec.hs b/test/Data/Conduit/StreamSpec.hs new file mode 100644 index 0000000..8a0e808 --- /dev/null +++ b/test/Data/Conduit/StreamSpec.hs @@ -0,0 +1,596 @@ +{-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE CPP #-} +module Data.Conduit.StreamSpec where + +import Control.Applicative +import qualified Control.Monad +import Control.Monad (MonadPlus(..), liftM) +import Control.Monad.Identity (Identity, runIdentity) +import Control.Monad.State (StateT(..), get, put) +import Data.Conduit +import Data.Conduit.Internal.Fusion +import Data.Conduit.Internal.List.Stream +import Data.Conduit.List +import qualified Data.Foldable as F +import Data.Function (on) +import qualified Data.List +import qualified Data.Maybe +import Data.Monoid (Monoid(..)) +import Prelude + ((.), ($), (>>=), (=<<), return, (==), Int, id, Maybe(..), Monad, + Eq, Show, String, Functor, fst, snd) +import qualified Prelude +import qualified Safe +import Test.Hspec +import Test.QuickCheck + +spec :: Spec +spec = describe "Comparing list function to" $ do + qit "unfold" $ + \(getBlind -> f, initial :: Int) -> + unfold f initial `checkInfiniteProducer` + (Data.List.unfoldr f initial :: [Int]) + qit "unfoldS" $ + \(getBlind -> f, initial :: Int) -> + unfoldS f initial `checkInfiniteStreamProducer` + (Data.List.unfoldr f initial :: [Int]) + qit "unfoldM" $ + \(getBlind -> f, initial :: Int) -> + unfoldM f initial `checkInfiniteProducerM` + (unfoldrM f initial :: M [Int]) + qit "unfoldMS" $ + \(getBlind -> f, initial :: Int) -> + unfoldMS f initial `checkInfiniteStreamProducerM` + (unfoldrM f initial :: M [Int]) + qit "sourceList" $ + \(xs :: [Int]) -> + sourceList xs `checkProducer` xs + qit "sourceListS" $ + \(xs :: [Int]) -> + sourceListS xs `checkStreamProducer` xs + qit "enumFromTo" $ + \(fr :: Small Int, to :: Small Int) -> + enumFromTo fr to `checkProducer` + Prelude.enumFromTo fr to + qit "enumFromToS" $ + \(fr :: Small Int, to :: Small Int) -> + enumFromToS fr to `checkStreamProducer` + Prelude.enumFromTo fr to + qit "enumFromToS_int" $ + \(getSmall -> fr :: Int, getSmall -> to :: Int) -> + enumFromToS_int fr to `checkStreamProducer` + Prelude.enumFromTo fr to + qit "iterate" $ + \(getBlind -> f, initial :: Int) -> + iterate f initial `checkInfiniteProducer` + Prelude.iterate f initial + qit "iterateS" $ + \(getBlind -> f, initial :: Int) -> + iterateS f initial `checkInfiniteStreamProducer` + Prelude.iterate f initial + qit "replicate" $ + \(getSmall -> n, getSmall -> x) -> + replicate n x `checkProducer` + (Prelude.replicate n x :: [Int]) + qit "replicateS" $ + \(getSmall -> n, getSmall -> x) -> + replicateS n x `checkStreamProducer` + (Prelude.replicate n x :: [Int]) + qit "replicateM" $ + \(getSmall -> n, getBlind -> f) -> + replicateM n f `checkProducerM` + (Control.Monad.replicateM n f :: M [Int]) + qit "replicateMS" $ + \(getSmall -> n, getBlind -> f) -> + replicateMS n f `checkStreamProducerM` + (Control.Monad.replicateM n f :: M [Int]) + qit "fold" $ + \(getBlind -> f, initial :: Int) -> + fold f initial `checkConsumer` + Data.List.foldl' f initial + qit "foldS" $ + \(getBlind -> f, initial :: Int) -> + foldS f initial `checkStreamConsumer` + Data.List.foldl' f initial + qit "foldM" $ + \(getBlind -> f, initial :: Int) -> + foldM f initial `checkConsumerM` + (Control.Monad.foldM f initial :: [Int] -> M Int) + qit "foldMS" $ + \(getBlind -> f, initial :: Int) -> + foldMS f initial `checkStreamConsumerM` + (Control.Monad.foldM f initial :: [Int] -> M Int) + qit "foldMap" $ + \(getBlind -> (f :: Int -> Sum Int)) -> + foldMap f `checkConsumer` + F.foldMap f + qit "mapM_" $ + \(getBlind -> (f :: Int -> M ())) -> + mapM_ f `checkConsumerM` + Prelude.mapM_ f + qit "mapM_S" $ + \(getBlind -> (f :: Int -> M ())) -> + mapM_S f `checkStreamConsumerM` + Prelude.mapM_ f + qit "take" $ + \(getSmall -> n) -> + take n `checkConsumer` + Prelude.take n + qit "takeS" $ + \(getSmall -> n) -> + takeS n `checkStreamConsumer` + Prelude.take n + qit "head" $ + \() -> + head `checkConsumer` + Safe.headMay + qit "headS" $ + \() -> + headS `checkStreamConsumer` + Safe.headMay + qit "peek" $ + \() -> + peek `checkConsumer` + Safe.headMay + qit "map" $ + \(getBlind -> (f :: Int -> Int)) -> + map f `checkConduit` + Prelude.map f + qit "mapS" $ + \(getBlind -> (f :: Int -> Int)) -> + mapS f `checkStreamConduit` + Prelude.map f + qit "mapM" $ + \(getBlind -> (f :: Int -> M Int)) -> + mapM f `checkConduitM` + Prelude.mapM f + qit "mapMS" $ + \(getBlind -> (f :: Int -> M Int)) -> + mapMS f `checkStreamConduitM` + Prelude.mapM f + qit "iterM" $ + \(getBlind -> (f :: Int -> M ())) -> + iterM f `checkConduitM` + iterML f + qit "iterMS" $ + \(getBlind -> (f :: Int -> M ())) -> + iterMS f `checkStreamConduitM` + iterML f + qit "mapMaybe" $ + \(getBlind -> (f :: Int -> Maybe Int)) -> + mapMaybe f `checkConduit` + Data.Maybe.mapMaybe f + qit "mapMaybeS" $ + \(getBlind -> (f :: Int -> Maybe Int)) -> + mapMaybeS f `checkStreamConduit` + Data.Maybe.mapMaybe f + qit "mapMaybeM" $ + \(getBlind -> (f :: Int -> M (Maybe Int))) -> + mapMaybeM f `checkConduitM` + mapMaybeML f + qit "mapMaybeMS" $ + \(getBlind -> (f :: Int -> M (Maybe Int))) -> + mapMaybeMS f `checkStreamConduitM` + mapMaybeML f + qit "catMaybes" $ + \() -> + catMaybes `checkConduit` + (Data.Maybe.catMaybes :: [Maybe Int] -> [Int]) + qit "catMaybesS" $ + \() -> + catMaybesS `checkStreamConduit` + (Data.Maybe.catMaybes :: [Maybe Int] -> [Int]) + qit "concat" $ + \() -> + concat `checkConduit` + (Prelude.concat :: [[Int]] -> [Int]) + qit "concatS" $ + \() -> + concatS `checkStreamConduit` + (Prelude.concat :: [[Int]] -> [Int]) + qit "concatMap" $ + \(getBlind -> f) -> + concatMap f `checkConduit` + (Prelude.concatMap f :: [Int] -> [Int]) + qit "concatMapS" $ + \(getBlind -> f) -> + concatMapS f `checkStreamConduit` + (Prelude.concatMap f :: [Int] -> [Int]) + qit "concatMapM" $ + \(getBlind -> (f :: Int -> M [Int])) -> + concatMapM f `checkConduitM` + concatMapML f + qit "concatMapMS" $ + \(getBlind -> (f :: Int -> M [Int])) -> + concatMapMS f `checkStreamConduitM` + concatMapML f + qit "concatMapAccum" $ + \(getBlind -> (f :: Int -> Int -> (Int, [Int])), initial :: Int) -> + concatMapAccum f initial `checkConduit` + concatMapAccumL f initial + qit "concatMapAccumS" $ + \(getBlind -> (f :: Int -> Int -> (Int, [Int])), initial :: Int) -> + concatMapAccumS f initial `checkStreamConduit` + concatMapAccumL f initial + {-qit "mapAccum" $ + \(getBlind -> (f :: Int -> Int -> (Int, [Int])), initial :: Int) -> + mapAccum f initial `checkConduitResult` + mapAccumL f initial-} + qit "mapAccumS" $ + \(getBlind -> (f :: Int -> Int -> (Int, [Int])), initial :: Int) -> + mapAccumS f initial `checkStreamConduitResult` + mapAccumL f initial + {-qit "mapAccumM" $ + \(getBlind -> (f :: Int -> Int -> M (Int, [Int])), initial :: Int) -> + mapAccumM f initial `checkConduitResultM` + mapAccumML f initial-} + qit "mapAccumMS" $ + \(getBlind -> (f :: Int -> Int -> M (Int, [Int])), initial :: Int) -> + mapAccumMS f initial `checkStreamConduitResultM` + mapAccumML f initial + {-qit "scan" $ + \(getBlind -> (f :: Int -> Int -> Int), initial :: Int) -> + scan f initial `checkConduitResult` + scanL f initial-} + {-qit "scanM" $ + \(getBlind -> (f :: Int -> Int -> M Int), initial :: Int) -> + scanM f initial `checkConduitResultM` + scanML f initial-} + qit "mapFoldable" $ + \(getBlind -> (f :: Int -> [Int])) -> + mapFoldable f `checkConduit` + mapFoldableL f + qit "mapFoldableS" $ + \(getBlind -> (f :: Int -> [Int])) -> + mapFoldableS f `checkStreamConduit` + mapFoldableL f + qit "mapFoldableM" $ + \(getBlind -> (f :: Int -> M [Int])) -> + mapFoldableM f `checkConduitM` + mapFoldableML f + qit "mapFoldableMS" $ + \(getBlind -> (f :: Int -> M [Int])) -> + mapFoldableMS f `checkStreamConduitM` + mapFoldableML f + qit "consume" $ + \() -> + consume `checkConsumer` + id + qit "consumeS" $ + \() -> + consumeS `checkStreamConsumer` + id + qit "groupBy" $ + \(getBlind -> f) -> + groupBy f `checkConduit` + (Data.List.groupBy f :: [Int] -> [[Int]]) + qit "groupByS" $ + \(getBlind -> f) -> + groupByS f `checkStreamConduit` + (Data.List.groupBy f :: [Int] -> [[Int]]) + qit "groupOn1" $ + \(getBlind -> (f :: Int -> Int)) -> + groupOn1 f `checkConduit` + groupOn1L f + qit "groupOn1S" $ + \(getBlind -> (f :: Int -> Int)) -> + groupOn1S f `checkStreamConduit` + groupOn1L f + qit "isolate" $ + \n -> + isolate n `checkConduit` + (Data.List.take n :: [Int] -> [Int]) + qit "isolateS" $ + \n -> + isolateS n `checkStreamConduit` + (Data.List.take n :: [Int] -> [Int]) + qit "filter" $ + \(getBlind -> f) -> + filter f `checkConduit` + (Data.List.filter f :: [Int] -> [Int]) + qit "filterS" $ + \(getBlind -> f) -> + filterS f `checkStreamConduit` + (Data.List.filter f :: [Int] -> [Int]) + qit "sourceNull" $ + \() -> + sourceNull `checkProducer` + ([] :: [Int]) + qit "sourceNullS" $ + \() -> + sourceNullS `checkStreamProducer` + ([] :: [Int]) + +qit :: (Arbitrary a, Testable prop, Show a) + => String -> (a -> prop) -> Spec +qit n f = it n $ property $ forAll arbitrary f + +-------------------------------------------------------------------------------- +-- Quickcheck utilities for pure conduits / streams + +checkProducer :: (Show a, Eq a) => Source Identity a -> [a] -> Property +checkProducer c l = checkProducerM' runIdentity c (return l) + +checkStreamProducer :: (Show a, Eq a) => StreamSource Identity a -> [a] -> Property +checkStreamProducer s l = checkStreamProducerM' runIdentity s (return l) + +checkInfiniteProducer :: (Show a, Eq a) => Source Identity a -> [a] -> Property +checkInfiniteProducer c l = checkInfiniteProducerM' runIdentity c (return l) + +checkInfiniteStreamProducer :: (Show a, Eq a) => StreamSource Identity a -> [a] -> Property +checkInfiniteStreamProducer s l = checkInfiniteStreamProducerM' runIdentity s (return l) + +checkConsumer :: (Show b, Eq b) => Consumer Int Identity b -> ([Int] -> b) -> Property +checkConsumer c l = checkConsumerM' runIdentity c (return . l) + +checkStreamConsumer :: (Show b, Eq b) => StreamConsumer Int Identity b -> ([Int] -> b) -> Property +checkStreamConsumer c l = checkStreamConsumerM' runIdentity c (return . l) + +checkConduit :: (Show a, Arbitrary a, Show b, Eq b) => Conduit a Identity b -> ([a] -> [b]) -> Property +checkConduit c l = checkConduitM' runIdentity c (return . l) + +checkStreamConduit :: (Show a, Arbitrary a, Show b, Eq b) => StreamConduit a Identity b -> ([a] -> [b]) -> Property +checkStreamConduit c l = checkStreamConduitM' runIdentity c (return . l) + +-- checkConduitResult :: (Show a, Arbitrary a, Show b, Eq b, Show r, Eq r) => ConduitM a b Identity r -> ([a] -> ([b], r)) -> Property +-- checkConduitResult c l = checkConduitResultM' runIdentity c (return . l) + +checkStreamConduitResult :: (Show a, Arbitrary a, Show b, Eq b, Show r, Eq r) => StreamConduitM a b Identity r -> ([a] -> ([b], r)) -> Property +checkStreamConduitResult c l = checkStreamConduitResultM' runIdentity c (return . l) + +-------------------------------------------------------------------------------- +-- Quickcheck utilities for conduits / streams in the M monad. + +checkProducerM :: (Show a, Eq a) => Source M a -> M [a] -> Property +checkProducerM = checkProducerM' runM + +checkStreamProducerM :: (Show a, Eq a) => StreamSource M a -> M [a] -> Property +checkStreamProducerM = checkStreamProducerM' runM + +checkInfiniteProducerM :: (Show a, Eq a) => Source M a -> M [a] -> Property +checkInfiniteProducerM = checkInfiniteProducerM' (fst . runM) + +checkInfiniteStreamProducerM :: (Show a, Eq a) => StreamSource M a -> M [a] -> Property +checkInfiniteStreamProducerM = checkInfiniteStreamProducerM' (fst . runM) + +checkConsumerM :: (Show b, Eq b) => Consumer Int M b -> ([Int] -> M b) -> Property +checkConsumerM = checkConsumerM' runM + +checkStreamConsumerM :: (Show b, Eq b) => StreamConsumer Int M b -> ([Int] -> M b) -> Property +checkStreamConsumerM = checkStreamConsumerM' runM + +checkConduitM :: (Show a, Arbitrary a, Show b, Eq b) => Conduit a M b -> ([a] -> M [b]) -> Property +checkConduitM = checkConduitM' runM + +checkStreamConduitM :: (Show a, Arbitrary a, Show b, Eq b) => StreamConduit a M b -> ([a] -> M [b]) -> Property +checkStreamConduitM = checkStreamConduitM' runM + +-- checkConduitResultM :: (Show a, Arbitrary a, Show b, Eq b, Show r, Eq r) => ConduitM a b M r -> ([a] -> M ([b], r)) -> Property +-- checkConduitResultM = checkConduitResultM' runM + +checkStreamConduitResultM :: (Show a, Arbitrary a, Show b, Eq b, Show r, Eq r) => StreamConduitM a b M r -> ([a] -> M ([b], r)) -> Property +checkStreamConduitResultM = checkStreamConduitResultM' runM + +-------------------------------------------------------------------------------- +-- Quickcheck utilities for monadic streams / conduits +-- These are polymorphic in which Monad is used. + +checkProducerM' :: (Show a, Monad m, Show b, Eq b) + => (m [a] -> b) + -> Source m a + -> m [a] + -> Property +checkProducerM' f c l = + f (preventFusion c $$ consume) + === + f l + +checkStreamProducerM' :: (Show a, Monad m, Show b, Eq b) + => (m [a] -> b) + -> StreamSource m a + -> m [a] + -> Property +checkStreamProducerM' f s l = + f (liftM fst $ evalStream $ s emptyStream) + === + f l + +checkInfiniteProducerM' :: (Show a, Monad m, Show b, Eq b) + => (m [a] -> b) + -> Source m a + -> m [a] + -> Property +checkInfiniteProducerM' f s l = + checkProducerM' f + (preventFusion s $= isolate 10) + (liftM (Prelude.take 10) l) + +checkInfiniteStreamProducerM' :: (Show a, Monad m, Show b, Eq b) + => (m [a] -> b) + -> StreamSource m a + -> m [a] + -> Property +checkInfiniteStreamProducerM' f s l = + f (liftM snd $ evalStream $ takeS 10 $ s emptyStream) + === + f (liftM (Prelude.take 10) l) + +checkConsumerM' :: (Show a, Monad m, Show b, Eq b) + => (m a -> b) + -> Consumer Int m a + -> ([Int] -> m a) + -> Property +checkConsumerM' f c l = forAll arbitrary $ \xs -> + f (sourceList xs $$ preventFusion c) + === + f (l xs) + +checkStreamConsumerM' :: (Show a, Monad m, Show b, Eq b) + => (m a -> b) + -> StreamConsumer Int m a + -> ([Int] -> m a) + -> Property +checkStreamConsumerM' f s l = forAll arbitrary $ \xs -> + f (liftM snd $ evalStream $ s $ sourceListS xs emptyStream) + === + f (l xs) + +checkConduitM' :: (Show a, Arbitrary a, Monad m, Show c, Eq c) + => (m [b] -> c) + -> Conduit a m b + -> ([a] -> m [b]) + -> Property +checkConduitM' f c l = forAll arbitrary $ \xs -> + f (sourceList xs $= preventFusion c $$ consume) + === + f (l xs) + +checkStreamConduitM' :: (Show a, Arbitrary a, Monad m, Show c, Eq c) + => (m [b] -> c) + -> StreamConduit a m b + -> ([a] -> m [b]) + -> Property +checkStreamConduitM' f s l = forAll arbitrary $ \xs -> + f (liftM fst $ evalStream $ s $ sourceListS xs emptyStream) + === + f (l xs) + +-- TODO: Fixing this would allow comparing conduit consumers against +-- their list versions. +-- +-- checkConduitResultM' :: (Show a, Arbitrary a, Monad m, Show c, Eq c) +-- => (m ([b], r) -> c) +-- -> ConduitM a b m r +-- -> ([a] -> m ([b], r)) +-- -> Property +-- checkConduitResultM' f c l = FIXME forAll arbitrary $ \xs -> +-- f (sourceList xs $= preventFusion c $$ consume) +-- === +-- f (l xs) + +checkStreamConduitResultM' :: (Show a, Arbitrary a, Monad m, Show c, Eq c) + => (m ([b], r) -> c) + -> StreamConduitM a b m r + -> ([a] -> m ([b], r)) + -> Property +checkStreamConduitResultM' f s l = forAll arbitrary $ \xs -> + f (evalStream $ s $ sourceListS xs emptyStream) + === + f (l xs) + +emptyStream :: Monad m => Stream m () () +emptyStream = Stream (\_ -> return $ Stop ()) (return ()) + +evalStream :: Monad m => Stream m o r -> m ([o], r) +evalStream (Stream step s0) = go =<< s0 + where + go s = do + res <- step s + case res of + Stop r -> return ([], r) + Skip s' -> go s' + Emit s' x -> liftM (\(l, r) -> (x:l, r)) (go s') + +-------------------------------------------------------------------------------- +-- Misc utilities + +-- Prefer this to creating an orphan instance for Data.Monoid.Sum: + +newtype Sum a = Sum a + deriving (Eq, Show, Arbitrary) + +instance Prelude.Num a => Monoid (Sum a) where + mempty = Sum 0 + mappend (Sum x) (Sum y) = Sum $ x Prelude.+ y + +preventFusion :: a -> a +preventFusion = id +{-# INLINE [0] preventFusion #-} + +newtype M a = M (StateT Int Identity a) + deriving (Functor, Applicative, Monad) + +instance Arbitrary a => Arbitrary (M a) where + arbitrary = do + f <- arbitrary + return $ do + s <- M get + let (x, s') = f s + M (put s') + return x + +runM :: M a -> (a, Int) +runM (M m) = runIdentity $ runStateT m 0 + +-------------------------------------------------------------------------------- +-- List versions of some functions + +iterML :: Monad m => (a -> m ()) -> [a] -> m [a] +iterML f = Prelude.mapM (\a -> f a >>= \() -> return a) + +mapMaybeML :: Monad m => (a -> m (Maybe b)) -> [a] -> m [b] +mapMaybeML f = liftM Data.Maybe.catMaybes . Prelude.mapM f + +concatMapML :: Monad m => (a -> m [b]) -> [a] -> m [b] +concatMapML f = liftM Prelude.concat . Prelude.mapM f + +concatMapAccumL :: (a -> s -> (s, [b])) -> s -> [a] -> [b] +concatMapAccumL f acc0 = + runIdentity . concatMapAccumML (\a acc -> return $ f a acc) acc0 + +mapAccumL :: (a -> s -> (s, b)) -> s -> [a] -> ([b], s) +mapAccumL f acc0 = + runIdentity . mapAccumML (\a acc -> return $ f a acc) acc0 + +concatMapAccumML :: Monad m => (a -> s -> m (s, [b])) -> s -> [a] -> m [b] +concatMapAccumML f acc0 = + liftM (Prelude.concat . fst) . mapAccumML f acc0 + +scanL :: (a -> b -> b) -> b -> [a] -> ([b], b) +scanL f = mapAccumL (\a b -> let r = f a b in (r, r)) + +scanML :: Monad m => (a -> b -> m b) -> b -> [a] -> m ([b], b) +scanML f = mapAccumML (\a b -> f a b >>= \r -> return (r, r)) + +mapFoldableL :: F.Foldable f => (a -> f b) -> [a] -> [b] +mapFoldableL f = runIdentity . mapFoldableML (return . f) + +mapFoldableML :: (Monad m, F.Foldable f) => (a -> m (f b)) -> [a] -> m [b] +mapFoldableML f = concatMapML (liftM F.toList . f) + +groupOn1L :: Eq b => (a -> b) -> [a] -> [(a, [a])] +groupOn1L f = + Data.List.map (\(x:xs) -> (x, xs)) . Data.List.groupBy ((==) `on` f) + +mapAccumML :: Monad m => (a -> s -> m (s, b)) -> s -> [a] -> m ([b], s) +mapAccumML f s0 = go s0 + where + go s [] = return ([], s) + go s (x:xs) = do + (s', r) <- f x s + liftM (\(l, o) -> (r:l, o)) $ go s' xs + +-------------------------------------------------------------------------------- +-- Utilities taken from monad-loops package + +-- http://hackage.haskell.org/package/monad-loops + +-- |See 'Data.List.unfoldr'. This is a monad-friendly version of that. +unfoldrM :: (Monad m) => (a -> m (Maybe (b,a))) -> a -> m [b] +unfoldrM = unfoldrM' + +-- |See 'Data.List.unfoldr'. This is a monad-friendly version of that, with a +-- twist. Rather than returning a list, it returns any MonadPlus type of your +-- choice. +unfoldrM' :: (Monad m, MonadPlus f) => (a -> m (Maybe (b,a))) -> a -> m (f b) +unfoldrM' f = go + where go z = do + x <- f z + case x of + Nothing -> return mzero + Just (x', z') -> do + xs <- go z' + return (return x' `mplus` xs) diff --git a/test/main.hs b/test/main.hs new file mode 100644 index 0000000..1e08979 --- /dev/null +++ b/test/main.hs @@ -0,0 +1,1108 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE FlexibleInstances #-} +{-# OPTIONS_GHC -fno-warn-orphans #-} +import Test.Hspec +import Test.Hspec.QuickCheck (prop) +import Test.QuickCheck.Monadic (assert, monadicIO, run) + +import qualified Data.Conduit as C +import qualified Data.Conduit.Lift as C +import qualified Data.Conduit.Internal as CI +import qualified Data.Conduit.List as CL +import Data.Typeable (Typeable) +import Control.Exception (throw) +import Control.Monad.Trans.Resource as C (runResourceT) +import Data.Maybe (fromMaybe,catMaybes,fromJust) +import qualified Data.List as DL +import qualified Data.List.Split as DLS (chunksOf) +import Control.Monad.ST (runST) +import Data.Monoid +import qualified Data.IORef as I +import Control.Monad.Trans.Resource (allocate, resourceForkIO) +import Control.Concurrent (threadDelay, killThread) +import Control.Monad.IO.Class (MonadIO, liftIO) +import Control.Monad.Trans.Class (lift) +import Control.Monad.Trans.Writer (execWriter, tell, runWriterT) +import Control.Monad.Trans.State (evalStateT, get, put, modify) +import Control.Monad.Trans.Maybe (MaybeT (..)) +import qualified Control.Monad.Writer as W +import Control.Applicative (pure, (<$>), (<*>)) +import qualified Control.Monad.Catch as Catch +import Data.Functor.Identity (Identity,runIdentity) +import Control.Monad (forever, void) +import Data.Void (Void) +import qualified Control.Concurrent.MVar as M +import Control.Monad.Error (catchError, throwError, Error) +import qualified Data.Map as Map +import qualified Data.Conduit.Extra.ZipConduitSpec as ZipConduit +import qualified Data.Conduit.StreamSpec as Stream + +(@=?) :: (Eq a, Show a) => a -> a -> IO () +(@=?) = flip shouldBe + +-- Quickcheck property for testing equivalence of list processing +-- functions and their conduit counterparts +equivToList :: Eq b => ([a] -> [b]) -> CI.Conduit a Identity b -> [a] -> Bool +equivToList f conduit xs = + f xs == runIdentity (CL.sourceList xs C.$$ conduit C.=$= CL.consume) + + +main :: IO () +main = hspec $ do + describe "data loss rules" $ do + it "consumes the source to quickly" $ do + x <- runResourceT $ CL.sourceList [1..10 :: Int] C.$$ do + strings <- CL.map show C.=$ CL.take 5 + liftIO $ putStr $ unlines strings + CL.fold (+) 0 + 40 `shouldBe` x + + it "correctly consumes a chunked resource" $ do + x <- runResourceT $ (CL.sourceList [1..5 :: Int] `mappend` CL.sourceList [6..10]) C.$$ do + strings <- CL.map show C.=$ CL.take 5 + liftIO $ putStr $ unlines strings + CL.fold (+) 0 + 40 `shouldBe` x + + describe "filter" $ do + it "even" $ do + x <- runResourceT $ CL.sourceList [1..10] C.$$ CL.filter even C.=$ CL.consume + x `shouldBe` filter even [1..10 :: Int] + + prop "concat" $ equivToList (concat :: [[Int]]->[Int]) CL.concat + + describe "mapFoldable" $ do + prop "list" $ + equivToList (concatMap (:[]) :: [Int]->[Int]) (CL.mapFoldable (:[])) + let f x = if odd x then Just x else Nothing + prop "Maybe" $ + equivToList (catMaybes . map f :: [Int]->[Int]) (CL.mapFoldable f) + + prop "scan" $ equivToList (tail . scanl (+) 0 :: [Int]->[Int]) (void $ CL.scan (+) 0) + + -- mapFoldableM and scanlM are fully polymorphic in type of monad + -- so it suffice to check only with Identity. + describe "mapFoldableM" $ do + prop "list" $ + equivToList (concatMap (:[]) :: [Int]->[Int]) (CL.mapFoldableM (return . (:[]))) + let f x = if odd x then Just x else Nothing + prop "Maybe" $ + equivToList (catMaybes . map f :: [Int]->[Int]) (CL.mapFoldableM (return . f)) + + prop "scanM" $ equivToList (tail . scanl (+) 0) (void $ CL.scanM (\a s -> return $ a + s) (0 :: Int)) + + describe "ResourceT" $ do + it "resourceForkIO" $ do + counter <- I.newIORef 0 + let w = allocate + (I.atomicModifyIORef counter $ \i -> + (i + 1, ())) + (const $ I.atomicModifyIORef counter $ \i -> + (i - 1, ())) + runResourceT $ do + _ <- w + _ <- resourceForkIO $ return () + _ <- resourceForkIO $ return () + sequence_ $ replicate 1000 $ do + tid <- resourceForkIO $ return () + liftIO $ killThread tid + _ <- resourceForkIO $ return () + _ <- resourceForkIO $ return () + return () + + -- give enough of a chance to the cleanup code to finish + threadDelay 1000 + res <- I.readIORef counter + res `shouldBe` (0 :: Int) + + describe "sum" $ do + it "works for 1..10" $ do + x <- runResourceT $ CL.sourceList [1..10] C.$$ CL.fold (+) (0 :: Int) + x `shouldBe` sum [1..10] + prop "is idempotent" $ \list -> + (runST $ CL.sourceList list C.$$ CL.fold (+) (0 :: Int)) + == sum list + + describe "foldMap" $ do + it "sums 1..10" $ do + Sum x <- CL.sourceList [1..(10 :: Int)] C.$$ CL.foldMap Sum + x `shouldBe` sum [1..10] + + it "preserves order" $ do + x <- CL.sourceList [[4],[2],[3],[1]] C.$$ CL.foldMap (++[(9 :: Int)]) + x `shouldBe` [4,9,2,9,3,9,1,9] + + describe "foldMapM" $ do + it "sums 1..10" $ do + Sum x <- CL.sourceList [1..(10 :: Int)] C.$$ CL.foldMapM (return . Sum) + x `shouldBe` sum [1..10] + + it "preserves order" $ do + x <- CL.sourceList [[4],[2],[3],[1]] C.$$ CL.foldMapM (return . (++[(9 :: Int)])) + x `shouldBe` [4,9,2,9,3,9,1,9] + + describe "unfold" $ do + it "works" $ do + let f 0 = Nothing + f i = Just (show i, i - 1) + seed = 10 :: Int + x <- CL.unfold f seed C.$$ CL.consume + let y = DL.unfoldr f seed + x `shouldBe` y + + describe "unfoldM" $ do + it "works" $ do + let f 0 = Nothing + f i = Just (show i, i - 1) + seed = 10 :: Int + x <- CL.unfoldM (return . f) seed C.$$ CL.consume + let y = DL.unfoldr f seed + x `shouldBe` y + + describe "Monoid instance for Source" $ do + it "mappend" $ do + x <- runResourceT $ (CL.sourceList [1..5 :: Int] `mappend` CL.sourceList [6..10]) C.$$ CL.fold (+) 0 + x `shouldBe` sum [1..10] + it "mconcat" $ do + x <- runResourceT $ mconcat + [ CL.sourceList [1..5 :: Int] + , CL.sourceList [6..10] + , CL.sourceList [11..20] + ] C.$$ CL.fold (+) 0 + x `shouldBe` sum [1..20] + + describe "zipping" $ do + it "zipping two small lists" $ do + res <- runResourceT $ CI.zipSources (CL.sourceList [1..10]) (CL.sourceList [11..12]) C.$$ CL.consume + res @=? zip [1..10 :: Int] [11..12 :: Int] + + describe "zipping sinks" $ do + it "take all" $ do + res <- runResourceT $ CL.sourceList [1..10] C.$$ CI.zipSinks CL.consume CL.consume + res @=? ([1..10 :: Int], [1..10 :: Int]) + it "take fewer on left" $ do + res <- runResourceT $ CL.sourceList [1..10] C.$$ CI.zipSinks (CL.take 4) CL.consume + res @=? ([1..4 :: Int], [1..10 :: Int]) + it "take fewer on right" $ do + res <- runResourceT $ CL.sourceList [1..10] C.$$ CI.zipSinks CL.consume (CL.take 4) + res @=? ([1..10 :: Int], [1..4 :: Int]) + + describe "Monad instance for Sink" $ do + it "binding" $ do + x <- runResourceT $ CL.sourceList [1..10] C.$$ do + _ <- CL.take 5 + CL.fold (+) (0 :: Int) + x `shouldBe` sum [6..10] + + describe "Applicative instance for Sink" $ do + it "<$> and <*>" $ do + x <- runResourceT $ CL.sourceList [1..10] C.$$ + (+) <$> pure 5 <*> CL.fold (+) (0 :: Int) + x `shouldBe` sum [1..10] + 5 + + describe "resumable sources" $ do + it "simple" $ do + (x, y, z) <- runResourceT $ do + let src1 = CL.sourceList [1..10 :: Int] + (src2, x) <- src1 C.$$+ CL.take 5 + (src3, y) <- src2 C.$$++ CL.fold (+) 0 + z <- src3 C.$$+- CL.consume + return (x, y, z) + x `shouldBe` [1..5] :: IO () + y `shouldBe` sum [6..10] + z `shouldBe` [] + + describe "conduits" $ do + it "map, left" $ do + x <- runResourceT $ + CL.sourceList [1..10] + C.$= CL.map (* 2) + C.$$ CL.fold (+) 0 + x `shouldBe` 2 * sum [1..10 :: Int] + + it "map, left >+>" $ do + x <- runResourceT $ + CI.ConduitM + ((CI.unConduitM (CL.sourceList [1..10]) CI.Done + CI.>+> CI.injectLeftovers (flip CI.unConduitM CI.Done $ CL.map (* 2))) >>=) + C.$$ CL.fold (+) 0 + x `shouldBe` 2 * sum [1..10 :: Int] + + it "map, right" $ do + x <- runResourceT $ + CL.sourceList [1..10] + C.$$ CL.map (* 2) + C.=$ CL.fold (+) 0 + x `shouldBe` 2 * sum [1..10 :: Int] + + prop "chunksOf" $ equivToList + (DLS.chunksOf 5 :: [Int]->[[Int]]) (CL.chunksOf 5) + + prop "chunksOf (negative)" $ equivToList + (map (:[]) :: [Int]->[[Int]]) (CL.chunksOf (-5)) + + it "groupBy" $ do + let input = [1::Int, 1, 2, 3, 3, 3, 4, 5, 5] + x <- runResourceT $ CL.sourceList input + C.$$ CL.groupBy (==) + C.=$ CL.consume + x `shouldBe` DL.groupBy (==) input + + it "groupBy (nondup begin/end)" $ do + let input = [1::Int, 2, 3, 3, 3, 4, 5] + x <- runResourceT $ CL.sourceList input + C.$$ CL.groupBy (==) + C.=$ CL.consume + x `shouldBe` DL.groupBy (==) input + + it "groupOn1" $ do + let input = [1::Int, 1, 2, 3, 3, 3, 4, 5, 5] + x <- runResourceT $ CL.sourceList input + C.$$ CL.groupOn1 id + C.=$ CL.consume + x `shouldBe` [(1,[1]), (2, []), (3,[3,3]), (4,[]), (5, [5])] + + it "groupOn1 (nondup begin/end)" $ do + let input = [1::Int, 2, 3, 3, 3, 4, 5] + x <- runResourceT $ CL.sourceList input + C.$$ CL.groupOn1 id + C.=$ CL.consume + x `shouldBe` [(1,[]), (2, []), (3,[3,3]), (4,[]), (5, [])] + + + it "mapMaybe" $ do + let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3] + x <- runResourceT $ CL.sourceList input + C.$$ CL.mapMaybe ((+2) <$>) + C.=$ CL.consume + x `shouldBe` [3, 4, 5] + + it "mapMaybeM" $ do + let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3] + x <- runResourceT $ CL.sourceList input + C.$$ CL.mapMaybeM (return . ((+2) <$>)) + C.=$ CL.consume + x `shouldBe` [3, 4, 5] + + it "catMaybes" $ do + let input = [Just (1::Int), Nothing, Just 2, Nothing, Just 3] + x <- runResourceT $ CL.sourceList input + C.$$ CL.catMaybes + C.=$ CL.consume + x `shouldBe` [1, 2, 3] + + it "concatMap" $ do + let input = [1, 11, 21] + x <- runResourceT $ CL.sourceList input + C.$$ CL.concatMap (\i -> enumFromTo i (i + 9)) + C.=$ CL.fold (+) (0 :: Int) + x `shouldBe` sum [1..30] + + it "bind together" $ do + let conduit = CL.map (+ 5) C.=$= CL.map (* 2) + x <- runResourceT $ CL.sourceList [1..10] C.$= conduit C.$$ CL.fold (+) 0 + x `shouldBe` sum (map (* 2) $ map (+ 5) [1..10 :: Int]) + +#if !FAST + describe "isolate" $ do + it "bound to resumable source" $ do + (x, y) <- runResourceT $ do + let src1 = CL.sourceList [1..10 :: Int] + (src2, x) <- src1 C.$= CL.isolate 5 C.$$+ CL.consume + y <- src2 C.$$+- CL.consume + return (x, y) + x `shouldBe` [1..5] + y `shouldBe` [] + + it "bound to sink, non-resumable" $ do + (x, y) <- runResourceT $ do + CL.sourceList [1..10 :: Int] C.$$ do + x <- CL.isolate 5 C.=$ CL.consume + y <- CL.consume + return (x, y) + x `shouldBe` [1..5] + y `shouldBe` [6..10] + + it "bound to sink, resumable" $ do + (x, y) <- runResourceT $ do + let src1 = CL.sourceList [1..10 :: Int] + (src2, x) <- src1 C.$$+ CL.isolate 5 C.=$ CL.consume + y <- src2 C.$$+- CL.consume + return (x, y) + x `shouldBe` [1..5] + y `shouldBe` [6..10] + + it "consumes all data" $ do + x <- runResourceT $ CL.sourceList [1..10 :: Int] C.$$ do + CL.isolate 5 C.=$ CL.sinkNull + CL.consume + x `shouldBe` [6..10] + + describe "sequence" $ do + it "simple sink" $ do + let sumSink = do + ma <- CL.head + case ma of + Nothing -> return 0 + Just a -> (+a) . fromMaybe 0 <$> CL.head + + res <- runResourceT $ CL.sourceList [1..11 :: Int] + C.$= CL.sequence sumSink + C.$$ CL.consume + res `shouldBe` [3, 7, 11, 15, 19, 11] + + it "sink with unpull behaviour" $ do + let sumSink = do + ma <- CL.head + case ma of + Nothing -> return 0 + Just a -> (+a) . fromMaybe 0 <$> CL.peek + + res <- runResourceT $ CL.sourceList [1..11 :: Int] + C.$= CL.sequence sumSink + C.$$ CL.consume + res `shouldBe` [3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 11] + +#endif + + describe "peek" $ do + it "works" $ do + (a, b) <- runResourceT $ CL.sourceList [1..10 :: Int] C.$$ do + a <- CL.peek + b <- CL.consume + return (a, b) + (a, b) `shouldBe` (Just 1, [1..10]) + + describe "unbuffering" $ do + it "works" $ do + x <- runResourceT $ do + let src1 = CL.sourceList [1..10 :: Int] + (src2, ()) <- src1 C.$$+ CL.drop 5 + src2 C.$$+- CL.fold (+) 0 + x `shouldBe` sum [6..10] + + describe "operators" $ do + it "only use =$=" $ + runIdentity + ( CL.sourceList [1..10 :: Int] + C.$$ CL.map (+ 1) + C.=$ CL.map (subtract 1) + C.=$ CL.mapM (return . (* 2)) + C.=$ CL.map (`div` 2) + C.=$ CL.fold (+) 0 + ) `shouldBe` sum [1..10] + it "only use =$" $ + runIdentity + ( CL.sourceList [1..10 :: Int] + C.$$ CL.map (+ 1) + C.=$ CL.map (subtract 1) + C.=$ CL.map (* 2) + C.=$ CL.map (`div` 2) + C.=$ CL.fold (+) 0 + ) `shouldBe` sum [1..10] + it "chain" $ do + x <- CL.sourceList [1..10 :: Int] + C.$= CL.map (+ 1) + C.$= CL.map (+ 1) + C.$= CL.map (+ 1) + C.$= CL.map (subtract 3) + C.$= CL.map (* 2) + C.$$ CL.map (`div` 2) + C.=$ CL.map (+ 1) + C.=$ CL.map (+ 1) + C.=$ CL.map (+ 1) + C.=$ CL.map (subtract 3) + C.=$ CL.fold (+) 0 + x `shouldBe` sum [1..10] + + + describe "termination" $ do + it "terminates early" $ do + let src = forever $ C.yield () + x <- src C.$$ CL.head + x `shouldBe` Just () + it "bracket" $ do + ref <- I.newIORef (0 :: Int) + let src = C.bracketP + (I.modifyIORef ref (+ 1)) + (\() -> I.modifyIORef ref (+ 2)) + (\() -> forever $ C.yield (1 :: Int)) + val <- C.runResourceT $ src C.$$ CL.isolate 10 C.=$ CL.fold (+) 0 + val `shouldBe` 10 + i <- I.readIORef ref + i `shouldBe` 3 + it "bracket skipped if not needed" $ do + ref <- I.newIORef (0 :: Int) + let src = C.bracketP + (I.modifyIORef ref (+ 1)) + (\() -> I.modifyIORef ref (+ 2)) + (\() -> forever $ C.yield (1 :: Int)) + src' = CL.sourceList $ repeat 1 + val <- C.runResourceT $ (src' >> src) C.$$ CL.isolate 10 C.=$ CL.fold (+) 0 + val `shouldBe` 10 + i <- I.readIORef ref + i `shouldBe` 0 + it "bracket + toPipe" $ do + ref <- I.newIORef (0 :: Int) + let src = C.bracketP + (I.modifyIORef ref (+ 1)) + (\() -> I.modifyIORef ref (+ 2)) + (\() -> forever $ C.yield (1 :: Int)) + val <- C.runResourceT $ src C.$$ CL.isolate 10 C.=$ CL.fold (+) 0 + val `shouldBe` 10 + i <- I.readIORef ref + i `shouldBe` 3 + it "bracket skipped if not needed" $ do + ref <- I.newIORef (0 :: Int) + let src = C.bracketP + (I.modifyIORef ref (+ 1)) + (\() -> I.modifyIORef ref (+ 2)) + (\() -> forever $ C.yield (1 :: Int)) + src' = CL.sourceList $ repeat 1 + val <- C.runResourceT $ (src' >> src) C.$$ CL.isolate 10 C.=$ CL.fold (+) 0 + val `shouldBe` 10 + i <- I.readIORef ref + i `shouldBe` 0 + + describe "invariant violations" $ do + it "leftovers without input" $ do + ref <- I.newIORef [] + let add x = I.modifyIORef ref (x:) + adder' = CI.NeedInput (\a -> liftIO (add a) >> adder') return + adder = CI.ConduitM (adder' >>=) + residue x = CI.ConduitM $ \rest -> CI.Leftover (rest ()) x + + _ <- C.yield 1 C.$$ adder + x <- I.readIORef ref + x `shouldBe` [1 :: Int] + I.writeIORef ref [] + + _ <- C.yield 1 C.$$ (residue 2 >> residue 3) >> adder + y <- I.readIORef ref + y `shouldBe` [1, 2, 3] + I.writeIORef ref [] + + _ <- C.yield 1 C.$$ residue 2 >> (residue 3 >> adder) + z <- I.readIORef ref + z `shouldBe` [1, 2, 3] + I.writeIORef ref [] + + describe "sane yield/await'" $ do + it' "yield terminates" $ do + let is = [1..10] ++ undefined + src [] = return () + src (x:xs) = C.yield x >> src xs + x <- src is C.$$ CL.take 10 + x `shouldBe` [1..10 :: Int] + it' "yield terminates (2)" $ do + let is = [1..10] ++ undefined + x <- mapM_ C.yield is C.$$ CL.take 10 + x `shouldBe` [1..10 :: Int] + it' "yieldOr finalizer called" $ do + iref <- I.newIORef (0 :: Int) + let src = mapM_ (\i -> C.yieldOr i $ I.writeIORef iref i) [1..] + src C.$$ CL.isolate 10 C.=$ CL.sinkNull + x <- I.readIORef iref + x `shouldBe` 10 + + describe "upstream results" $ do + it' "works" $ do + let foldUp :: (b -> a -> b) -> b -> CI.Pipe l a Void u IO (u, b) + foldUp f b = CI.awaitE >>= either (\u -> return (u, b)) (\a -> let b' = f b a in b' `seq` foldUp f b') + passFold :: (b -> a -> b) -> b -> CI.Pipe l a a () IO b + passFold f b = CI.await >>= maybe (return b) (\a -> let b' = f b a in b' `seq` CI.yield a >> passFold f b') + (x, y) <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> passFold (+) 0 CI.>+> foldUp (*) 1 + (x, y) `shouldBe` (sum [1..10], product [1..10]) + + describe "input/output mapping" $ do + it' "mapOutput" $ do + x <- C.mapOutput (+ 1) (CL.sourceList [1..10 :: Int]) C.$$ CL.fold (+) 0 + x `shouldBe` sum [2..11] + it' "mapOutputMaybe" $ do + x <- C.mapOutputMaybe (\i -> if even i then Just i else Nothing) (CL.sourceList [1..10 :: Int]) C.$$ CL.fold (+) 0 + x `shouldBe` sum [2, 4..10] + it' "mapInput" $ do + xyz <- (CL.sourceList $ map show [1..10 :: Int]) C.$$ do + (x, y) <- C.mapInput read (Just . show) $ ((do + x <- CL.isolate 5 C.=$ CL.fold (+) 0 + y <- CL.peek + return (x :: Int, y :: Maybe Int)) :: C.Sink Int IO (Int, Maybe Int)) + z <- CL.consume + return (x, y, concat z) + + xyz `shouldBe` (sum [1..5], Just 6, "678910") + + describe "left/right identity" $ do + it' "left identity" $ do + x <- CL.sourceList [1..10 :: Int] C.$$ CI.ConduitM (CI.idP >>=) C.=$ CL.fold (+) 0 + y <- CL.sourceList [1..10 :: Int] C.$$ CL.fold (+) 0 + x `shouldBe` y + it' "right identity" $ do + x <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ flip CI.unConduitM CI.Done $ CL.fold (+) 0) CI.>+> CI.idP + y <- CI.runPipe $ mapM_ CI.yield [1..10 :: Int] CI.>+> (CI.injectLeftovers $ flip CI.unConduitM CI.Done $ CL.fold (+) 0) + x `shouldBe` y + + describe "generalizing" $ do + it' "works" $ do + x <- CI.runPipe + $ CI.sourceToPipe (CL.sourceList [1..10 :: Int]) + CI.>+> CI.conduitToPipe (CL.map (+ 1)) + CI.>+> CI.sinkToPipe (CL.fold (+) 0) + x `shouldBe` sum [2..11] + + describe "withUpstream" $ do + it' "works" $ do + let src = mapM_ CI.yield [1..10 :: Int] >> return True + fold f = + loop + where + loop accum = + CI.await >>= maybe (return accum) go + where + go a = + let accum' = f accum a + in accum' `seq` loop accum' + sink = CI.withUpstream $ fold (+) 0 + res <- CI.runPipe $ src CI.>+> sink + res `shouldBe` (True, sum [1..10]) + + describe "iterate" $ do + it' "works" $ do + res <- CL.iterate (+ 1) (1 :: Int) C.$$ CL.isolate 10 C.=$ CL.fold (+) 0 + res `shouldBe` sum [1..10] + + prop "replicate" $ \cnt' -> do + let cnt = min cnt' 100 + res <- CL.replicate cnt () C.$$ CL.consume + res `shouldBe` replicate cnt () + + prop "replicateM" $ \cnt' -> do + ref <- I.newIORef 0 + let cnt = min cnt' 100 + res <- CL.replicateM cnt (I.modifyIORef ref (+ 1)) C.$$ CL.consume + res `shouldBe` replicate cnt () + + ref' <- I.readIORef ref + ref' `shouldBe` (if cnt' <= 0 then 0 else cnt) + + describe "unwrapResumable" $ do + it' "works" $ do + ref <- I.newIORef (0 :: Int) + let src0 = do + C.yieldOr () $ I.writeIORef ref 1 + C.yieldOr () $ I.writeIORef ref 2 + C.yieldOr () $ I.writeIORef ref 3 + (rsrc0, Just ()) <- src0 C.$$+ CL.head + + x0 <- I.readIORef ref + x0 `shouldBe` 0 + + (_, final) <- C.unwrapResumable rsrc0 + + x1 <- I.readIORef ref + x1 `shouldBe` 0 + + final + + x2 <- I.readIORef ref + x2 `shouldBe` 1 + + it' "isn't called twice" $ do + ref <- I.newIORef (0 :: Int) + let src0 = do + C.yieldOr () $ I.writeIORef ref 1 + C.yieldOr () $ I.writeIORef ref 2 + (rsrc0, Just ()) <- src0 C.$$+ CL.head + + x0 <- I.readIORef ref + x0 `shouldBe` 0 + + (src1, final) <- C.unwrapResumable rsrc0 + + x1 <- I.readIORef ref + x1 `shouldBe` 0 + + Just () <- src1 C.$$ CL.head + + x2 <- I.readIORef ref + x2 `shouldBe` 2 + + final + + x3 <- I.readIORef ref + x3 `shouldBe` 2 + + it' "source isn't used" $ do + ref <- I.newIORef (0 :: Int) + let src0 = do + C.yieldOr () $ I.writeIORef ref 1 + C.yieldOr () $ I.writeIORef ref 2 + (rsrc0, Just ()) <- src0 C.$$+ CL.head + + x0 <- I.readIORef ref + x0 `shouldBe` 0 + + (src1, final) <- C.unwrapResumable rsrc0 + + x1 <- I.readIORef ref + x1 `shouldBe` 0 + + () <- src1 C.$$ return () + + x2 <- I.readIORef ref + x2 `shouldBe` 0 + + final + + x3 <- I.readIORef ref + x3 `shouldBe` 1 + describe "injectLeftovers" $ do + it "works" $ do + let src = mapM_ CI.yield [1..10 :: Int] + conduit = CI.injectLeftovers $ flip CI.unConduitM CI.Done $ C.awaitForever $ \i -> do + js <- CL.take 2 + mapM_ C.leftover $ reverse js + C.yield i + res <- CI.ConduitM ((src CI.>+> CI.injectLeftovers conduit) >>=) C.$$ CL.consume + res `shouldBe` [1..10] + describe "up-upstream finalizers" $ do + it "pipe" $ do + let p1 = CI.await >>= maybe (return ()) CI.yield + p2 = idMsg "p2-final" + p3 = idMsg "p3-final" + idMsg msg = CI.addCleanup (const $ tell [msg]) $ CI.awaitForever CI.yield + printer = CI.awaitForever $ lift . tell . return . show + src = mapM_ CI.yield [1 :: Int ..] + let run' p = execWriter $ CI.runPipe $ printer CI.<+< p CI.<+< src + run' (p1 CI.<+< (p2 CI.<+< p3)) `shouldBe` run' ((p1 CI.<+< p2) CI.<+< p3) + it "conduit" $ do + let p1 = C.await >>= maybe (return ()) C.yield + p2 = idMsg "p2-final" + p3 = idMsg "p3-final" + idMsg msg = C.addCleanup (const $ tell [msg]) $ C.awaitForever C.yield + printer = C.awaitForever $ lift . tell . return . show + src = CL.sourceList [1 :: Int ..] + let run' p = execWriter $ src C.$$ p C.=$ printer + run' ((p3 C.=$= p2) C.=$= p1) `shouldBe` run' (p3 C.=$= (p2 C.=$= p1)) + describe "monad transformer laws" $ do + it "transPipe" $ do + let source = CL.sourceList $ replicate 10 () + let tell' x = tell [x :: Int] + + let replaceNum1 = C.awaitForever $ \() -> do + i <- lift get + lift $ (put $ i + 1) >> (get >>= lift . tell') + C.yield i + + let replaceNum2 = C.awaitForever $ \() -> do + i <- lift get + lift $ put $ i + 1 + lift $ get >>= lift . tell' + C.yield i + + x <- runWriterT $ source C.$$ C.transPipe (`evalStateT` 1) replaceNum1 C.=$ CL.consume + y <- runWriterT $ source C.$$ C.transPipe (`evalStateT` 1) replaceNum2 C.=$ CL.consume + x `shouldBe` y + describe "iterM" $ do + prop "behavior" $ \l -> monadicIO $ do + let counter ref = CL.iterM (const $ liftIO $ M.modifyMVar_ ref (\i -> return $! i + 1)) + v <- run $ do + ref <- M.newMVar 0 + CL.sourceList l C.$= counter ref C.$$ CL.mapM_ (const $ return ()) + M.readMVar ref + + assert $ v == length (l :: [Int]) + prop "mapM_ equivalence" $ \l -> monadicIO $ do + let runTest h = run $ do + ref <- M.newMVar (0 :: Int) + let f = action ref + s <- CL.sourceList (l :: [Int]) C.$= h f C.$$ CL.fold (+) 0 + c <- M.readMVar ref + + return (c, s) + + action ref = const $ liftIO $ M.modifyMVar_ ref (\i -> return $! i + 1) + (c1, s1) <- runTest CL.iterM + (c2, s2) <- runTest (\f -> CL.mapM (\a -> f a >>= \() -> return a)) + + assert $ c1 == c2 + assert $ s1 == s2 + + describe "generalizing" $ do + it "works" $ do + let src :: Int -> C.Source IO Int + src i = CL.sourceList [1..i] + sink :: C.Sink Int IO Int + sink = CL.fold (+) 0 + res <- C.yield 10 C.$$ C.awaitForever (C.toProducer . src) C.=$ (C.toConsumer sink >>= C.yield) C.=$ C.await + res `shouldBe` Just (sum [1..10]) + + describe "mergeSource" $ do + it "works" $ do + let src :: C.Source IO String + src = CL.sourceList ["A", "B", "C"] + withIndex :: C.Conduit String IO (Integer, String) + withIndex = CI.mergeSource (CL.sourceList [1..]) + output <- src C.$= withIndex C.$$ CL.consume + output `shouldBe` [(1, "A"), (2, "B"), (3, "C")] + it "does stop processing when the source exhausted" $ do + let src :: C.Source IO Integer + src = CL.sourceList [1..] + withShortAlphaIndex :: C.Conduit Integer IO (String, Integer) + withShortAlphaIndex = CI.mergeSource (CL.sourceList ["A", "B", "C"]) + output <- src C.$= withShortAlphaIndex C.$$ CL.consume + output `shouldBe` [("A", 1), ("B", 2), ("C", 3)] + + let modFlag ref cur next = do + prev <- I.atomicModifyIORef ref $ (,) next + prev `shouldBe` cur + flagShouldBe ref expect = do + cur <- I.readIORef ref + cur `shouldBe` expect + it "properly run the finalizer - When the main Conduit is fully consumed" $ do + called <- I.newIORef ("RawC" :: String) + let src :: MonadIO m => C.Source m String + src = CL.sourceList ["A", "B", "C"] + withIndex :: MonadIO m => C.Conduit String m (Integer, String) + withIndex = C.addCleanup (\f -> liftIO $ modFlag called "AllocC-3" ("FinalC:" ++ show f)) . CI.mergeSource $ do + liftIO $ modFlag called "RawC" "AllocC-1" + C.yield 1 + liftIO $ modFlag called "AllocC-1" "AllocC-2" + C.yield 2 + liftIO $ modFlag called "AllocC-2" "AllocC-3" + C.yield 3 + liftIO $ modFlag called "AllocC-3" "AllocC-4" + C.yield 4 + output <- src C.$= withIndex C.$$ CL.consume + output `shouldBe` [(1, "A"), (2, "B"), (3, "C")] + called `flagShouldBe` "FinalC:True" + it "properly run the finalizer - When the branch Source is fully consumed" $ do + called <- I.newIORef ("RawS" :: String) + let src :: MonadIO m => C.Source m Integer + src = CL.sourceList [1..] + withIndex :: MonadIO m => C.Conduit Integer m (String, Integer) + withIndex = C.addCleanup (\f -> liftIO $ modFlag called "AllocS-C" ("FinalS:" ++ show f)) . CI.mergeSource $ do + liftIO $ modFlag called "RawS" "AllocS-A" + C.yield "A" + liftIO $ modFlag called "AllocS-A" "AllocS-B" + C.yield "B" + liftIO $ modFlag called "AllocS-B" "AllocS-C" + C.yield "C" + output <- src C.$= withIndex C.$$ CL.consume + output `shouldBe` [("A", 1), ("B", 2), ("C", 3)] + called `flagShouldBe` "FinalS:True" + it "properly DO NOT run the finalizer - When nothing consumed" $ do + called <- I.newIORef ("Raw0" :: String) + let src :: MonadIO m => C.Source m String + src = CL.sourceList ["A", "B", "C"] + withIndex :: MonadIO m => C.Conduit String m (Integer, String) + withIndex = C.addCleanup (\f -> liftIO $ modFlag called "WONT CALLED" ("Final0:" ++ show f)) . CI.mergeSource $ do + liftIO $ modFlag called "Raw0" "Alloc0-1" + C.yield 1 + output <- src C.$= withIndex C.$$ return () + output `shouldBe` () + called `flagShouldBe` "Raw0" + it "properly run the finalizer - When only one item consumed" $ do + called <- I.newIORef ("Raw1" :: String) + let src :: MonadIO m => C.Source m Integer + src = CL.sourceList [1..] + withIndex :: MonadIO m => C.Conduit Integer m (String, Integer) + withIndex = C.addCleanup (\f -> liftIO $ modFlag called "Alloc1-A" ("Final1:" ++ show f)) . CI.mergeSource $ do + liftIO $ modFlag called "Raw1" "Alloc1-A" + C.yield "A" + liftIO $ modFlag called "Alloc1-A" "Alloc1-B" + C.yield "B" + liftIO $ modFlag called "Alloc1-B" "Alloc1-C" + C.yield "C" + output <- src C.$= withIndex C.$= CL.isolate 1 C.$$ CL.consume + output `shouldBe` [("A", 1)] + called `flagShouldBe` "Final1:False" + + it "handles finalizers" $ do + ref <- I.newIORef (0 :: Int) + let src1 = C.addCleanup + (const $ I.modifyIORef ref (+1)) + (mapM_ C.yield [1 :: Int ..]) + src2 = mapM_ C.yield ("hi" :: String) + res1 <- src1 C.$$ C.mergeSource src2 C.=$ CL.consume + res1 `shouldBe` [('h', 1), ('i', 2)] + i1 <- I.readIORef ref + i1 `shouldBe` 1 + + res2 <- src2 C.$$ C.mergeSource src1 C.=$ CL.consume + res2 `shouldBe` [(1, 'h'), (2, 'i')] + i2 <- I.readIORef ref + i2 `shouldBe` 2 + + describe "passthroughSink" $ do + it "works" $ do + ref <- I.newIORef (-1) + let sink = CL.fold (+) (0 :: Int) + conduit = C.passthroughSink sink (I.writeIORef ref) + input = [1..10] + output <- mapM_ C.yield input C.$$ conduit C.=$ CL.consume + output `shouldBe` input + x <- I.readIORef ref + x `shouldBe` sum input + it "does nothing when downstream does nothing" $ do + ref <- I.newIORef (-1) + let sink = CL.fold (+) (0 :: Int) + conduit = C.passthroughSink sink (I.writeIORef ref) + input = [undefined] + mapM_ C.yield input C.$$ conduit C.=$ return () + x <- I.readIORef ref + x `shouldBe` (-1) + + it "handles the last input correctly #304" $ do + ref <- I.newIORef (-1 :: Int) + let sink = CL.mapM_ (I.writeIORef ref) + conduit = C.passthroughSink sink (const (return ())) + res <- mapM_ C.yield [1..] C.$$ conduit C.=$ CL.take 5 + res `shouldBe` [1..5] + x <- I.readIORef ref + x `shouldBe` 5 + + describe "mtl instances" $ do + it "ErrorT" $ do + let src = flip catchError (const $ C.yield 4) $ do + lift $ return () + C.yield 1 + lift $ return () + C.yield 2 + lift $ return () + () <- throwError DummyError + lift $ return () + C.yield 3 + lift $ return () + (src C.$$ CL.consume) `shouldBe` Right [1, 2, 4 :: Int] + describe "WriterT" $ + it "pass" $ + let writer = W.pass $ do + W.tell [1 :: Int] + pure ((), (2:)) + in execWriter (C.runConduit writer) `shouldBe` [2, 1] + + describe "finalizers" $ do + it "promptness" $ do + imsgs <- I.newIORef [] + let add x = liftIO $ do + msgs <- I.readIORef imsgs + I.writeIORef imsgs $ msgs ++ [x] + src' = C.bracketP + (add "acquire") + (const $ add "release") + (const $ C.addCleanup (const $ add "inside") (mapM_ C.yield [1..5])) + src = do + src' C.$= CL.isolate 4 + add "computation" + sink = CL.mapM (\x -> add (show x) >> return x) C.=$ CL.consume + + res <- C.runResourceT $ src C.$$ sink + + msgs <- I.readIORef imsgs + -- FIXME this would be better msgs `shouldBe` words "acquire 1 2 3 4 inside release computation" + msgs `shouldBe` words "acquire 1 2 3 4 release inside computation" + + res `shouldBe` [1..4 :: Int] + + it "left associative" $ do + imsgs <- I.newIORef [] + let add x = liftIO $ do + msgs <- I.readIORef imsgs + I.writeIORef imsgs $ msgs ++ [x] + p1 = C.bracketP (add "start1") (const $ add "stop1") (const $ add "inside1" >> C.yield ()) + p2 = C.bracketP (add "start2") (const $ add "stop2") (const $ add "inside2" >> C.await >>= maybe (return ()) C.yield) + p3 = C.bracketP (add "start3") (const $ add "stop3") (const $ add "inside3" >> C.await) + + res <- C.runResourceT $ (p1 C.$= p2) C.$$ p3 + res `shouldBe` Just () + + msgs <- I.readIORef imsgs + msgs `shouldBe` words "start3 inside3 start2 inside2 start1 inside1 stop3 stop2 stop1" + + it "right associative" $ do + imsgs <- I.newIORef [] + let add x = liftIO $ do + msgs <- I.readIORef imsgs + I.writeIORef imsgs $ msgs ++ [x] + p1 = C.bracketP (add "start1") (const $ add "stop1") (const $ add "inside1" >> C.yield ()) + p2 = C.bracketP (add "start2") (const $ add "stop2") (const $ add "inside2" >> C.await >>= maybe (return ()) C.yield) + p3 = C.bracketP (add "start3") (const $ add "stop3") (const $ add "inside3" >> C.await) + + res <- C.runResourceT $ p1 C.$$ (p2 C.=$ p3) + res `shouldBe` Just () + + msgs <- I.readIORef imsgs + msgs `shouldBe` words "start3 inside3 start2 inside2 start1 inside1 stop3 stop2 stop1" + + describe "dan burton's associative tests" $ do + let tellLn = tell . (++ "\n") + finallyP fin = CI.addCleanup (const fin) + printer = CI.awaitForever $ lift . tellLn . show + idMsg msg = finallyP (tellLn msg) CI.idP + takeP 0 = return () + takeP n = CI.awaitE >>= \ex -> case ex of + Left _u -> return () + Right i -> CI.yield i >> takeP (pred n) + + testPipe p = execWriter $ runPipe $ printer <+< p <+< CI.sourceList ([1..] :: [Int]) + + p1 = takeP (1 :: Int) + p2 = idMsg "foo" + p3 = idMsg "bar" + + (<+<) = (CI.<+<) + runPipe = CI.runPipe + + test1L = testPipe $ (p1 <+< p2) <+< p3 + test1R = testPipe $ p1 <+< (p2 <+< p3) + + _test2L = testPipe $ (p2 <+< p1) <+< p3 + _test2R = testPipe $ p2 <+< (p1 <+< p3) + + test3L = testPipe $ (p2 <+< p3) <+< p1 + test3R = testPipe $ p2 <+< (p3 <+< p1) + + verify testL testR p1' p2' p3' + | testL == testR = return () :: IO () + | otherwise = error $ unlines + [ "FAILURE" + , "" + , "(" ++ p1' ++ " <+< " ++ p2' ++ ") <+< " ++ p3' + , "------------------" + , testL + , "" + , p1' ++ " <+< (" ++ p2' ++ " <+< " ++ p3' ++ ")" + , "------------------" + , testR + ] + + it "test1" $ verify test1L test1R "p1" "p2" "p3" + -- FIXME this is broken it "test2" $ verify test2L test2R "p2" "p1" "p3" + it "test3" $ verify test3L test3R "p2" "p3" "p1" + + describe "Data.Conduit.Lift" $ do + it "execStateC" $ do + let sink = C.execStateLC 0 $ CL.mapM_ $ modify . (+) + src = mapM_ C.yield [1..10 :: Int] + res <- src C.$$ sink + res `shouldBe` sum [1..10] + + it "execWriterC" $ do + let sink = C.execWriterLC $ CL.mapM_ $ tell . return + src = mapM_ C.yield [1..10 :: Int] + res <- src C.$$ sink + res `shouldBe` [1..10] + + it "runErrorC" $ do + let sink = C.runErrorC $ do + x <- C.catchErrorC (lift $ throwError "foo") return + return $ x ++ "bar" + res <- return () C.$$ sink + res `shouldBe` Right ("foobar" :: String) + + it "runMaybeC" $ do + let src = void $ C.runMaybeC $ do + C.yield 1 + () <- lift $ MaybeT $ return Nothing + C.yield 2 + sink = CL.consume + res <- src C.$$ sink + res `shouldBe` [1 :: Int] + + describe "sequenceSources" $ do + it "works" $ do + let src1 = mapM_ C.yield [1, 2, 3 :: Int] + src2 = mapM_ C.yield [3, 2, 1] + src3 = mapM_ C.yield $ repeat 2 + srcs = C.sequenceSources $ Map.fromList + [ (1 :: Int, src1) + , (2, src2) + , (3, src3) + ] + res <- srcs C.$$ CL.consume + res `shouldBe` + [ Map.fromList [(1, 1), (2, 3), (3, 2)] + , Map.fromList [(1, 2), (2, 2), (3, 2)] + , Map.fromList [(1, 3), (2, 1), (3, 2)] + ] + describe "zipSink" $ do + it "zip equal-sized" $ do + x <- runResourceT $ + CL.sourceList [1..100] C.$$ + C.sequenceSinks [ CL.fold (+) 0, + (`mod` 101) <$> CL.fold (*) 1 ] + x `shouldBe` [5050, 100 :: Integer] + + it "zip distinct sizes" $ do + let sink = C.getZipSink $ + (*) <$> C.ZipSink (CL.fold (+) 0) + <*> C.ZipSink (Data.Maybe.fromJust <$> C.await) + x <- C.runResourceT $ CL.sourceList [100,99..1] C.$$ sink + x `shouldBe` (505000 :: Integer) + + describe "upstream results" $ do + it "fuseBoth" $ do + let upstream = do + C.yield ("hello" :: String) + CL.isolate 5 C.=$= CL.fold (+) 0 + downstream = C.fuseBoth upstream CL.consume + res <- CL.sourceList [1..10 :: Int] C.$$ do + (x, y) <- downstream + z <- CL.consume + return (x, y, z) + res `shouldBe` (sum [1..5], ["hello"], [6..10]) + + it "fuseBothMaybe with no result" $ do + let src = mapM_ C.yield [1 :: Int ..] + sink = CL.isolate 5 C.=$= CL.fold (+) 0 + (mup, down) <- C.runConduit $ C.fuseBothMaybe src sink + mup `shouldBe` (Nothing :: Maybe ()) + down `shouldBe` sum [1..5] + + it "fuseBothMaybe with result" $ do + let src = mapM_ C.yield [1 :: Int .. 5] + sink = CL.isolate 6 C.=$= CL.fold (+) 0 + (mup, down) <- C.runConduit $ C.fuseBothMaybe src sink + mup `shouldBe` Just () + down `shouldBe` sum [1..5] + + it "fuseBothMaybe with almost result" $ do + let src = mapM_ C.yield [1 :: Int .. 5] + sink = CL.isolate 5 C.=$= CL.fold (+) 0 + (mup, down) <- C.runConduit $ C.fuseBothMaybe src sink + mup `shouldBe` (Nothing :: Maybe ()) + down `shouldBe` sum [1..5] + + describe "catching exceptions" $ do + it "works" $ do + let src = do + C.yield 1 + () <- Catch.throwM DummyError + C.yield 2 + src' = do + Catch.catch src (\DummyError -> C.yield (3 :: Int)) + res <- src' C.$$ CL.consume + res `shouldBe` [1, 3] + + describe "sourceToList" $ do + it "works lazily in Identity" $ do + let src = C.yield 1 >> C.yield 2 >> throw DummyError + let res = runIdentity $ C.sourceToList src + take 2 res `shouldBe` [1, 2 :: Int] + it "is not lazy in IO" $ do + let src = C.yield 1 >> C.yield (2 :: Int) >> throw DummyError + C.sourceToList src `shouldThrow` (==DummyError) + + ZipConduit.spec + Stream.spec + +it' :: String -> IO () -> Spec +it' = it + +data DummyError = DummyError + deriving (Show, Eq, Typeable) +instance Error DummyError +instance Catch.Exception DummyError