Blame Data/Conduit/Process.hs

Packit 4b2029
{-# LANGUAGE FlexibleInstances #-}
Packit 4b2029
{-# LANGUAGE TypeFamilies #-}
Packit 4b2029
{-# LANGUAGE RankNTypes #-}
Packit 4b2029
{-# LANGUAGE CPP #-}
Packit 4b2029
{-# OPTIONS_GHC -fno-warn-orphans #-}
Packit 4b2029
-- | A full tutorial for this module is available at:
Packit 4b2029
-- <https://github.com/snoyberg/conduit/blob/master/PROCESS.md>.
Packit 4b2029
--
Packit 4b2029
-- Note that this is a very thin layer around the @Data.Streaming.Process@ module. In particular, it:
Packit 4b2029
--
Packit 4b2029
-- * Provides orphan instances for conduit
Packit 4b2029
--
Packit 4b2029
-- * Provides some useful helper functions
Packit 4b2029
module Data.Conduit.Process
Packit 4b2029
    ( -- * Functions
Packit 4b2029
      sourceCmdWithConsumer
Packit 4b2029
    , sourceProcessWithConsumer
Packit 4b2029
    , sourceCmdWithStreams
Packit 4b2029
    , sourceProcessWithStreams
Packit 4b2029
    , withCheckedProcessCleanup
Packit 4b2029
      -- * Reexport
Packit 4b2029
    , module Data.Streaming.Process
Packit 4b2029
    ) where
Packit 4b2029
Packit 4b2029
import Data.Streaming.Process
Packit 4b2029
import Data.Streaming.Process.Internal
Packit 4b2029
import System.Exit (ExitCode (..))
Packit 4b2029
import Control.Monad.IO.Class (MonadIO, liftIO)
Packit 4b2029
import System.IO (hClose)
Packit 4b2029
import Data.Conduit
Packit 4b2029
import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush)
Packit 4b2029
import Data.ByteString (ByteString)
Packit 4b2029
import Data.ByteString.Builder (Builder)
Packit 4b2029
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
Packit 4b2029
import Control.Monad.Catch (MonadMask, onException, throwM, finally, bracket)
Packit 4b2029
#if (__GLASGOW_HASKELL__ < 710)
Packit 4b2029
import Control.Applicative ((<$>), (<*>))
Packit 4b2029
#endif
Packit 4b2029
Packit 4b2029
instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where
Packit 4b2029
    isStdStream = (\(Just h) -> return $ sinkHandle h, Just CreatePipe)
Packit 4b2029
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where
Packit 4b2029
    isStdStream = (\(Just h) -> return (sinkHandle h, liftIO $ hClose h), Just CreatePipe)
Packit 4b2029
Packit 4b2029
-- | Wrapper for input source which accepts 'Data.ByteString.Builder.Builder's.
Packit 4b2029
-- You can pass 'Data.ByteString.Builder.Extra.flush' to flush the input. Note
Packit 4b2029
-- that the pipe will /not/ automatically close when the processing completes.
Packit 4b2029
--
Packit 4b2029
-- @since 1.2.2
Packit 4b2029
newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r)
Packit 4b2029
Packit 4b2029
-- | Wrapper for input source  which accepts @Flush@es. Note that the pipe
Packit 4b2029
-- will /not/ automatically close then processing completes.
Packit 4b2029
--
Packit 4b2029
-- @since 1.2.2
Packit 4b2029
newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r)
Packit 4b2029
Packit 4b2029
instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where
Packit 4b2029
  isStdStream = (\(Just h) -> return $ BuilderInput $ sinkHandleBuilder h, Just CreatePipe)
Packit 4b2029
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where
Packit 4b2029
  isStdStream = (\(Just h) -> return (BuilderInput $ sinkHandleBuilder h, liftIO $ hClose h), Just CreatePipe)
Packit 4b2029
instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where
Packit 4b2029
  isStdStream = (\(Just h) -> return $ FlushInput $ sinkHandleFlush h, Just CreatePipe)
Packit 4b2029
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where
Packit 4b2029
  isStdStream = (\(Just h) -> return (FlushInput $ sinkHandleFlush h, liftIO $ hClose h), Just CreatePipe)
Packit 4b2029
Packit 4b2029
instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where
Packit 4b2029
    osStdStream = (\(Just h) -> return $ sourceHandle h, Just CreatePipe)
Packit 4b2029
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where
Packit 4b2029
    osStdStream = (\(Just h) -> return (sourceHandle h, liftIO $ hClose h), Just CreatePipe)
Packit 4b2029
Packit 4b2029
-- | Given a @CreateProcess@, run the process, with its output being used as a
Packit 4b2029
-- @Source@ to feed the provided @Consumer@. Once the process has completed,
Packit 4b2029
-- return a tuple of the @ExitCode@ from the process and the output collected
Packit 4b2029
-- from the @Consumer@.
Packit 4b2029
--
Packit 4b2029
-- Note that, if an exception is raised by the consumer, the process is /not/
Packit 4b2029
-- terminated. This behavior is different from 'sourceProcessWithStreams' due
Packit 4b2029
-- to historical reasons.
Packit 4b2029
--
Packit 4b2029
-- Since 1.1.2
Packit 4b2029
sourceProcessWithConsumer :: MonadIO m
Packit 4b2029
                          => CreateProcess
Packit 4b2029
                          -> Consumer ByteString m a -- ^ stdout
Packit 4b2029
                          -> m (ExitCode, a)
Packit 4b2029
sourceProcessWithConsumer cp consumer = do
Packit 4b2029
    (ClosedStream, (source, close), ClosedStream, cph) <- streamingProcess cp
Packit 4b2029
    res <- source $$ consumer
Packit 4b2029
    close
Packit 4b2029
    ec <- waitForStreamingProcess cph
Packit 4b2029
    return (ec, res)
Packit 4b2029
Packit 4b2029
-- | Like @sourceProcessWithConsumer@ but providing the command to be run as
Packit 4b2029
-- a @String@.
Packit 4b2029
--
Packit 4b2029
-- Since 1.1.2
Packit 4b2029
sourceCmdWithConsumer :: MonadIO m
Packit 4b2029
                      => String                  -- ^command
Packit 4b2029
                      -> Consumer ByteString m a -- ^stdout
Packit 4b2029
                      -> m (ExitCode, a)
Packit 4b2029
sourceCmdWithConsumer cmd = sourceProcessWithConsumer (shell cmd)
Packit 4b2029
Packit 4b2029
Packit 4b2029
-- | Given a @CreateProcess@, run the process
Packit 4b2029
-- and feed the provided @Producer@
Packit 4b2029
-- to the stdin @Sink@ of the process.
Packit 4b2029
-- Use the process outputs (stdout, stderr) as @Source@s
Packit 4b2029
-- and feed it to the provided @Consumer@s.
Packit 4b2029
-- Once the process has completed,
Packit 4b2029
-- return a tuple of the @ExitCode@ from the process
Packit 4b2029
-- and the results collected from the @Consumer@s.
Packit 4b2029
--
Packit 4b2029
-- If an exception is raised by any of the streams,
Packit 4b2029
-- the process is terminated.
Packit 4b2029
--
Packit 4b2029
-- IO is required because the streams are run concurrently
Packit 4b2029
-- using the <https://hackage.haskell.org/package/async async> package
Packit 4b2029
--
Packit 4b2029
-- @since 1.1.12
Packit 4b2029
sourceProcessWithStreams :: CreateProcess
Packit 4b2029
                         -> Producer IO ByteString   -- ^stdin
Packit 4b2029
                         -> Consumer ByteString IO a -- ^stdout
Packit 4b2029
                         -> Consumer ByteString IO b -- ^stderr
Packit 4b2029
                         -> IO (ExitCode, a, b)
Packit 4b2029
sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr = do
Packit 4b2029
    (  (sinkStdin, closeStdin)
Packit 4b2029
     , (sourceStdout, closeStdout)
Packit 4b2029
     , (sourceStderr, closeStderr)
Packit 4b2029
     , sph) <- streamingProcess cp
Packit 4b2029
    (_, resStdout, resStderr) <-
Packit 4b2029
      runConcurrently (
Packit 4b2029
        (,,)
Packit 4b2029
        <$> Concurrently ((producerStdin $$ sinkStdin) `finally` closeStdin)
Packit 4b2029
        <*> Concurrently (sourceStdout  $$ consumerStdout)
Packit 4b2029
        <*> Concurrently (sourceStderr  $$ consumerStderr))
Packit 4b2029
      `finally` (closeStdout >> closeStderr)
Packit 4b2029
      `onException` terminateStreamingProcess sph
Packit 4b2029
    ec <- waitForStreamingProcess sph
Packit 4b2029
    return (ec, resStdout, resStderr)
Packit 4b2029
Packit 4b2029
-- | Like @sourceProcessWithStreams@ but providing the command to be run as
Packit 4b2029
-- a @String@.
Packit 4b2029
--
Packit 4b2029
-- @since 1.1.12
Packit 4b2029
sourceCmdWithStreams :: String                   -- ^command
Packit 4b2029
                     -> Producer IO ByteString   -- ^stdin
Packit 4b2029
                     -> Consumer ByteString IO a -- ^stdout
Packit 4b2029
                     -> Consumer ByteString IO b -- ^stderr
Packit 4b2029
                     -> IO (ExitCode, a, b)
Packit 4b2029
sourceCmdWithStreams cmd = sourceProcessWithStreams (shell cmd)
Packit 4b2029
Packit 4b2029
-- | Same as 'withCheckedProcess', but kills the child process in the case of
Packit 4b2029
-- an exception being thrown by the provided callback function.
Packit 4b2029
--
Packit 4b2029
-- @since 1.1.11
Packit 4b2029
withCheckedProcessCleanup
Packit 4b2029
    :: ( InputSource stdin
Packit 4b2029
       , OutputSink stderr
Packit 4b2029
       , OutputSink stdout
Packit 4b2029
       , MonadIO m
Packit 4b2029
       , MonadMask m
Packit 4b2029
       )
Packit 4b2029
    => CreateProcess
Packit 4b2029
    -> (stdin -> stdout -> stderr -> m b)
Packit 4b2029
    -> m b
Packit 4b2029
withCheckedProcessCleanup cp f = bracket
Packit 4b2029
    (streamingProcess cp)
Packit 4b2029
    (\(_, _, _, sph) -> closeStreamingProcessHandle sph)
Packit 4b2029
    $ \(x, y, z, sph) -> do
Packit 4b2029
        res <- f x y z `onException` liftIO (terminateStreamingProcess sph)
Packit 4b2029
        ec <- waitForStreamingProcess sph
Packit 4b2029
        if ec == ExitSuccess
Packit 4b2029
            then return res
Packit 4b2029
            else throwM $ ProcessExitedUnsuccessfully cp ec
Packit 4b2029
Packit 4b2029
Packit 4b2029
terminateStreamingProcess :: StreamingProcessHandle -> IO ()
Packit 4b2029
terminateStreamingProcess = terminateProcess . streamingProcessHandleRaw