|
Packit |
4b2029 |
{-# LANGUAGE FlexibleInstances #-}
|
|
Packit |
4b2029 |
{-# LANGUAGE TypeFamilies #-}
|
|
Packit |
4b2029 |
{-# LANGUAGE RankNTypes #-}
|
|
Packit |
4b2029 |
{-# LANGUAGE CPP #-}
|
|
Packit |
4b2029 |
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
|
Packit |
4b2029 |
-- | A full tutorial for this module is available at:
|
|
Packit |
4b2029 |
-- <https://github.com/snoyberg/conduit/blob/master/PROCESS.md>.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Note that this is a very thin layer around the @Data.Streaming.Process@ module. In particular, it:
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- * Provides orphan instances for conduit
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- * Provides some useful helper functions
|
|
Packit |
4b2029 |
module Data.Conduit.Process
|
|
Packit |
4b2029 |
( -- * Functions
|
|
Packit |
4b2029 |
sourceCmdWithConsumer
|
|
Packit |
4b2029 |
, sourceProcessWithConsumer
|
|
Packit |
4b2029 |
, sourceCmdWithStreams
|
|
Packit |
4b2029 |
, sourceProcessWithStreams
|
|
Packit |
4b2029 |
, withCheckedProcessCleanup
|
|
Packit |
4b2029 |
-- * Reexport
|
|
Packit |
4b2029 |
, module Data.Streaming.Process
|
|
Packit |
4b2029 |
) where
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
import Data.Streaming.Process
|
|
Packit |
4b2029 |
import Data.Streaming.Process.Internal
|
|
Packit |
4b2029 |
import System.Exit (ExitCode (..))
|
|
Packit |
4b2029 |
import Control.Monad.IO.Class (MonadIO, liftIO)
|
|
Packit |
4b2029 |
import System.IO (hClose)
|
|
Packit |
4b2029 |
import Data.Conduit
|
|
Packit |
4b2029 |
import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush)
|
|
Packit |
4b2029 |
import Data.ByteString (ByteString)
|
|
Packit |
4b2029 |
import Data.ByteString.Builder (Builder)
|
|
Packit |
4b2029 |
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
|
|
Packit |
4b2029 |
import Control.Monad.Catch (MonadMask, onException, throwM, finally, bracket)
|
|
Packit |
4b2029 |
#if (__GLASGOW_HASKELL__ < 710)
|
|
Packit |
4b2029 |
import Control.Applicative ((<$>), (<*>))
|
|
Packit |
4b2029 |
#endif
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where
|
|
Packit |
4b2029 |
isStdStream = (\(Just h) -> return $ sinkHandle h, Just CreatePipe)
|
|
Packit |
4b2029 |
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where
|
|
Packit |
4b2029 |
isStdStream = (\(Just h) -> return (sinkHandle h, liftIO $ hClose h), Just CreatePipe)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Wrapper for input source which accepts 'Data.ByteString.Builder.Builder's.
|
|
Packit |
4b2029 |
-- You can pass 'Data.ByteString.Builder.Extra.flush' to flush the input. Note
|
|
Packit |
4b2029 |
-- that the pipe will /not/ automatically close when the processing completes.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.2.2
|
|
Packit |
4b2029 |
newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Wrapper for input source which accepts @Flush@es. Note that the pipe
|
|
Packit |
4b2029 |
-- will /not/ automatically close then processing completes.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.2.2
|
|
Packit |
4b2029 |
newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where
|
|
Packit |
4b2029 |
isStdStream = (\(Just h) -> return $ BuilderInput $ sinkHandleBuilder h, Just CreatePipe)
|
|
Packit |
4b2029 |
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where
|
|
Packit |
4b2029 |
isStdStream = (\(Just h) -> return (BuilderInput $ sinkHandleBuilder h, liftIO $ hClose h), Just CreatePipe)
|
|
Packit |
4b2029 |
instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where
|
|
Packit |
4b2029 |
isStdStream = (\(Just h) -> return $ FlushInput $ sinkHandleFlush h, Just CreatePipe)
|
|
Packit |
4b2029 |
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where
|
|
Packit |
4b2029 |
isStdStream = (\(Just h) -> return (FlushInput $ sinkHandleFlush h, liftIO $ hClose h), Just CreatePipe)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where
|
|
Packit |
4b2029 |
osStdStream = (\(Just h) -> return $ sourceHandle h, Just CreatePipe)
|
|
Packit |
4b2029 |
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where
|
|
Packit |
4b2029 |
osStdStream = (\(Just h) -> return (sourceHandle h, liftIO $ hClose h), Just CreatePipe)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Given a @CreateProcess@, run the process, with its output being used as a
|
|
Packit |
4b2029 |
-- @Source@ to feed the provided @Consumer@. Once the process has completed,
|
|
Packit |
4b2029 |
-- return a tuple of the @ExitCode@ from the process and the output collected
|
|
Packit |
4b2029 |
-- from the @Consumer@.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Note that, if an exception is raised by the consumer, the process is /not/
|
|
Packit |
4b2029 |
-- terminated. This behavior is different from 'sourceProcessWithStreams' due
|
|
Packit |
4b2029 |
-- to historical reasons.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 1.1.2
|
|
Packit |
4b2029 |
sourceProcessWithConsumer :: MonadIO m
|
|
Packit |
4b2029 |
=> CreateProcess
|
|
Packit |
4b2029 |
-> Consumer ByteString m a -- ^ stdout
|
|
Packit |
4b2029 |
-> m (ExitCode, a)
|
|
Packit |
4b2029 |
sourceProcessWithConsumer cp consumer = do
|
|
Packit |
4b2029 |
(ClosedStream, (source, close), ClosedStream, cph) <- streamingProcess cp
|
|
Packit |
4b2029 |
res <- source $$ consumer
|
|
Packit |
4b2029 |
close
|
|
Packit |
4b2029 |
ec <- waitForStreamingProcess cph
|
|
Packit |
4b2029 |
return (ec, res)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Like @sourceProcessWithConsumer@ but providing the command to be run as
|
|
Packit |
4b2029 |
-- a @String@.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- Since 1.1.2
|
|
Packit |
4b2029 |
sourceCmdWithConsumer :: MonadIO m
|
|
Packit |
4b2029 |
=> String -- ^command
|
|
Packit |
4b2029 |
-> Consumer ByteString m a -- ^stdout
|
|
Packit |
4b2029 |
-> m (ExitCode, a)
|
|
Packit |
4b2029 |
sourceCmdWithConsumer cmd = sourceProcessWithConsumer (shell cmd)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Given a @CreateProcess@, run the process
|
|
Packit |
4b2029 |
-- and feed the provided @Producer@
|
|
Packit |
4b2029 |
-- to the stdin @Sink@ of the process.
|
|
Packit |
4b2029 |
-- Use the process outputs (stdout, stderr) as @Source@s
|
|
Packit |
4b2029 |
-- and feed it to the provided @Consumer@s.
|
|
Packit |
4b2029 |
-- Once the process has completed,
|
|
Packit |
4b2029 |
-- return a tuple of the @ExitCode@ from the process
|
|
Packit |
4b2029 |
-- and the results collected from the @Consumer@s.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- If an exception is raised by any of the streams,
|
|
Packit |
4b2029 |
-- the process is terminated.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- IO is required because the streams are run concurrently
|
|
Packit |
4b2029 |
-- using the <https://hackage.haskell.org/package/async async> package
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.1.12
|
|
Packit |
4b2029 |
sourceProcessWithStreams :: CreateProcess
|
|
Packit |
4b2029 |
-> Producer IO ByteString -- ^stdin
|
|
Packit |
4b2029 |
-> Consumer ByteString IO a -- ^stdout
|
|
Packit |
4b2029 |
-> Consumer ByteString IO b -- ^stderr
|
|
Packit |
4b2029 |
-> IO (ExitCode, a, b)
|
|
Packit |
4b2029 |
sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr = do
|
|
Packit |
4b2029 |
( (sinkStdin, closeStdin)
|
|
Packit |
4b2029 |
, (sourceStdout, closeStdout)
|
|
Packit |
4b2029 |
, (sourceStderr, closeStderr)
|
|
Packit |
4b2029 |
, sph) <- streamingProcess cp
|
|
Packit |
4b2029 |
(_, resStdout, resStderr) <-
|
|
Packit |
4b2029 |
runConcurrently (
|
|
Packit |
4b2029 |
(,,)
|
|
Packit |
4b2029 |
<$> Concurrently ((producerStdin $$ sinkStdin) `finally` closeStdin)
|
|
Packit |
4b2029 |
<*> Concurrently (sourceStdout $$ consumerStdout)
|
|
Packit |
4b2029 |
<*> Concurrently (sourceStderr $$ consumerStderr))
|
|
Packit |
4b2029 |
`finally` (closeStdout >> closeStderr)
|
|
Packit |
4b2029 |
`onException` terminateStreamingProcess sph
|
|
Packit |
4b2029 |
ec <- waitForStreamingProcess sph
|
|
Packit |
4b2029 |
return (ec, resStdout, resStderr)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Like @sourceProcessWithStreams@ but providing the command to be run as
|
|
Packit |
4b2029 |
-- a @String@.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.1.12
|
|
Packit |
4b2029 |
sourceCmdWithStreams :: String -- ^command
|
|
Packit |
4b2029 |
-> Producer IO ByteString -- ^stdin
|
|
Packit |
4b2029 |
-> Consumer ByteString IO a -- ^stdout
|
|
Packit |
4b2029 |
-> Consumer ByteString IO b -- ^stderr
|
|
Packit |
4b2029 |
-> IO (ExitCode, a, b)
|
|
Packit |
4b2029 |
sourceCmdWithStreams cmd = sourceProcessWithStreams (shell cmd)
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
-- | Same as 'withCheckedProcess', but kills the child process in the case of
|
|
Packit |
4b2029 |
-- an exception being thrown by the provided callback function.
|
|
Packit |
4b2029 |
--
|
|
Packit |
4b2029 |
-- @since 1.1.11
|
|
Packit |
4b2029 |
withCheckedProcessCleanup
|
|
Packit |
4b2029 |
:: ( InputSource stdin
|
|
Packit |
4b2029 |
, OutputSink stderr
|
|
Packit |
4b2029 |
, OutputSink stdout
|
|
Packit |
4b2029 |
, MonadIO m
|
|
Packit |
4b2029 |
, MonadMask m
|
|
Packit |
4b2029 |
)
|
|
Packit |
4b2029 |
=> CreateProcess
|
|
Packit |
4b2029 |
-> (stdin -> stdout -> stderr -> m b)
|
|
Packit |
4b2029 |
-> m b
|
|
Packit |
4b2029 |
withCheckedProcessCleanup cp f = bracket
|
|
Packit |
4b2029 |
(streamingProcess cp)
|
|
Packit |
4b2029 |
(\(_, _, _, sph) -> closeStreamingProcessHandle sph)
|
|
Packit |
4b2029 |
$ \(x, y, z, sph) -> do
|
|
Packit |
4b2029 |
res <- f x y z `onException` liftIO (terminateStreamingProcess sph)
|
|
Packit |
4b2029 |
ec <- waitForStreamingProcess sph
|
|
Packit |
4b2029 |
if ec == ExitSuccess
|
|
Packit |
4b2029 |
then return res
|
|
Packit |
4b2029 |
else throwM $ ProcessExitedUnsuccessfully cp ec
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
|
|
Packit |
4b2029 |
terminateStreamingProcess :: StreamingProcessHandle -> IO ()
|
|
Packit |
4b2029 |
terminateStreamingProcess = terminateProcess . streamingProcessHandleRaw
|