Blame Data/Conduit/Network.hs

Packit 4b2029
{-# LANGUAGE FlexibleContexts #-}
Packit 4b2029
{-# LANGUAGE RankNTypes #-}
Packit 4b2029
{-# LANGUAGE ScopedTypeVariables #-}
Packit 4b2029
{-# LANGUAGE CPP #-}
Packit 4b2029
module Data.Conduit.Network
Packit 4b2029
    ( -- * Basic utilities
Packit 4b2029
      sourceSocket
Packit 4b2029
    , sinkSocket
Packit 4b2029
      -- * Simple TCP server/client interface.
Packit 4b2029
    , SN.AppData
Packit 4b2029
    , appSource
Packit 4b2029
    , appSink
Packit 4b2029
    , SN.appSockAddr
Packit 4b2029
    , SN.appLocalAddr
Packit 4b2029
      -- ** Server
Packit 4b2029
    , SN.ServerSettings
Packit 4b2029
    , serverSettings
Packit 4b2029
    , SN.runTCPServer
Packit 4b2029
    , SN.runTCPServerWithHandle
Packit 4b2029
    , forkTCPServer
Packit 4b2029
    , runGeneralTCPServer
Packit 4b2029
      -- ** Client
Packit 4b2029
    , SN.ClientSettings
Packit 4b2029
    , clientSettings
Packit 4b2029
    , SN.runTCPClient
Packit 4b2029
    , runGeneralTCPClient
Packit 4b2029
      -- ** Getters
Packit 4b2029
    , SN.getPort
Packit 4b2029
    , SN.getHost
Packit 4b2029
    , SN.getAfterBind
Packit 4b2029
    , SN.getNeedLocalAddr
Packit 4b2029
      -- ** Setters
Packit 4b2029
    , SN.setPort
Packit 4b2029
    , SN.setHost
Packit 4b2029
    , SN.setAfterBind
Packit 4b2029
    , SN.setNeedLocalAddr
Packit 4b2029
      -- * Types
Packit 4b2029
    , SN.HostPreference
Packit 4b2029
    ) where
Packit 4b2029
Packit 4b2029
import Prelude
Packit 4b2029
import Data.Conduit
Packit 4b2029
import Network.Socket (Socket)
Packit 4b2029
import Network.Socket.ByteString (sendAll)
Packit 4b2029
import Data.ByteString (ByteString)
Packit 4b2029
import qualified GHC.Conc as Conc (yield)
Packit 4b2029
import qualified Data.ByteString as S
Packit 4b2029
import Control.Monad.IO.Class (MonadIO (liftIO))
Packit 4b2029
import Control.Monad (unless, void)
Packit 4b2029
import Control.Monad.Trans.Control (MonadBaseControl, control, liftBaseWith)
Packit 4b2029
import Control.Monad.Trans.Class (lift)
Packit 4b2029
import Control.Concurrent (forkIO, newEmptyMVar, putMVar, takeMVar, MVar, ThreadId)
Packit 4b2029
import qualified Data.Streaming.Network as SN
Packit 4b2029
Packit 4b2029
-- | Stream data from the socket.
Packit 4b2029
--
Packit 4b2029
-- This function does /not/ automatically close the socket.
Packit 4b2029
--
Packit 4b2029
-- Since 0.0.0
Packit 4b2029
sourceSocket :: MonadIO m => Socket -> Producer m ByteString
Packit 4b2029
sourceSocket socket =
Packit 4b2029
    loop
Packit 4b2029
  where
Packit 4b2029
    loop = do
Packit 4b2029
        bs <- lift $ liftIO $ SN.safeRecv socket 4096
Packit 4b2029
        if S.null bs
Packit 4b2029
            then return ()
Packit 4b2029
            else yield bs >> loop
Packit 4b2029
Packit 4b2029
-- | Stream data to the socket.
Packit 4b2029
--
Packit 4b2029
-- This function does /not/ automatically close the socket.
Packit 4b2029
--
Packit 4b2029
-- Since 0.0.0
Packit 4b2029
sinkSocket :: MonadIO m => Socket -> Consumer ByteString m ()
Packit 4b2029
sinkSocket socket =
Packit 4b2029
    loop
Packit 4b2029
  where
Packit 4b2029
    loop = await >>= maybe (return ()) (\bs -> lift (liftIO $ sendAll socket bs) >> loop)
Packit 4b2029
Packit 4b2029
serverSettings :: Int -> SN.HostPreference -> SN.ServerSettings
Packit 4b2029
serverSettings = SN.serverSettingsTCP
Packit 4b2029
Packit 4b2029
clientSettings :: Int -> ByteString -> SN.ClientSettings
Packit 4b2029
clientSettings = SN.clientSettingsTCP
Packit 4b2029
Packit 4b2029
appSource :: (SN.HasReadWrite ad, MonadIO m) => ad -> Producer m ByteString
Packit 4b2029
appSource ad =
Packit 4b2029
    loop
Packit 4b2029
  where
Packit 4b2029
    read' = SN.appRead ad
Packit 4b2029
    loop = do
Packit 4b2029
        bs <- liftIO read'
Packit 4b2029
        unless (S.null bs) $ do
Packit 4b2029
            yield bs
Packit 4b2029
            loop
Packit 4b2029
Packit 4b2029
appSink :: (SN.HasReadWrite ad, MonadIO m) => ad -> Consumer ByteString m ()
Packit 4b2029
appSink ad = awaitForever $ \d -> liftIO $ SN.appWrite ad d >> Conc.yield
Packit 4b2029
Packit 4b2029
addBoundSignal::MVar ()-> SN.ServerSettings -> SN.ServerSettings
Packit 4b2029
addBoundSignal isBound set = SN.setAfterBind ( \socket -> originalAfterBind socket >>  signalBound socket) set
Packit 4b2029
                             where originalAfterBind :: Socket -> IO ()
Packit 4b2029
                                   originalAfterBind = SN.getAfterBind set
Packit 4b2029
                                   signalBound :: Socket -> IO ()
Packit 4b2029
                                   signalBound _socket = putMVar isBound ()
Packit 4b2029
Packit 4b2029
-- | Fork a TCP Server
Packit 4b2029
--
Packit 4b2029
-- Will fork the runGeneralTCPServer function but will only return from
Packit 4b2029
-- this call when the server is bound to the port and accepting incoming
Packit 4b2029
-- connections. Will return the thread id of the server
Packit 4b2029
--
Packit 4b2029
-- Since 1.1.4
Packit 4b2029
forkTCPServer :: MonadBaseControl IO m
Packit 4b2029
                    => SN.ServerSettings
Packit 4b2029
                    -> (SN.AppData -> m ())
Packit 4b2029
                    -> m ThreadId
Packit 4b2029
forkTCPServer set f =
Packit 4b2029
       liftBaseWith $ \run -> do
Packit 4b2029
         isBound <- newEmptyMVar
Packit 4b2029
         let setWithWaitForBind = addBoundSignal isBound set
Packit 4b2029
         threadId <- forkIO . void . run $ runGeneralTCPServer setWithWaitForBind f
Packit 4b2029
         takeMVar isBound
Packit 4b2029
         return threadId
Packit 4b2029
Packit 4b2029
Packit 4b2029
Packit 4b2029
-- | Run a general TCP server
Packit 4b2029
--
Packit 4b2029
-- Same as 'SN.runTCPServer', except monad can be any instance of
Packit 4b2029
-- 'MonadBaseControl' 'IO'.
Packit 4b2029
--
Packit 4b2029
-- Note that any changes to the monadic state performed by individual
Packit 4b2029
-- client handlers will be discarded. If you have mutable state you want
Packit 4b2029
-- to share among multiple handlers, you need to use some kind of mutable
Packit 4b2029
-- variables.
Packit 4b2029
--
Packit 4b2029
-- Since 1.1.3
Packit 4b2029
runGeneralTCPServer :: MonadBaseControl IO m
Packit 4b2029
                    => SN.ServerSettings
Packit 4b2029
                    -> (SN.AppData -> m ())
Packit 4b2029
                    -> m a
Packit 4b2029
runGeneralTCPServer set f = liftBaseWith $ \run ->
Packit 4b2029
    SN.runTCPServer set $ void . run . f
Packit 4b2029
Packit 4b2029
-- | Run a general TCP client
Packit 4b2029
--
Packit 4b2029
-- Same as 'SN.runTCPClient', except monad can be any instance of 'MonadBaseControl' 'IO'.
Packit 4b2029
--
Packit 4b2029
-- Since 1.1.3
Packit 4b2029
runGeneralTCPClient :: MonadBaseControl IO m
Packit 4b2029
                    => SN.ClientSettings
Packit 4b2029
                    -> (SN.AppData -> m a)
Packit 4b2029
                    -> m a
Packit 4b2029
runGeneralTCPClient set f = control $ \run ->
Packit 4b2029
    SN.runTCPClient set $ run . f