Blame Network/TCP.hs

Packit acf257
{-# LANGUAGE TypeSynonymInstances #-}
Packit acf257
-----------------------------------------------------------------------------
Packit acf257
-- |
Packit acf257
-- Module      :  Network.TCP
Packit acf257
-- Copyright   :  See LICENSE file
Packit acf257
-- License     :  BSD
Packit acf257
--
Packit acf257
-- Maintainer  :  Ganesh Sittampalam <ganesh@earth.li>
Packit acf257
-- Stability   :  experimental
Packit acf257
-- Portability :  non-portable (not tested)
Packit acf257
--
Packit acf257
-- Some utility functions for working with the Haskell @network@ package. Mostly
Packit acf257
-- for internal use by the @Network.HTTP@ code.
Packit acf257
--
Packit acf257
-----------------------------------------------------------------------------
Packit acf257
module Network.TCP
Packit acf257
   ( Connection
Packit acf257
   , EndPoint(..)
Packit acf257
   , openTCPPort
Packit acf257
   , isConnectedTo
Packit acf257
Packit acf257
   , openTCPConnection
Packit acf257
   , socketConnection
Packit acf257
   , isTCPConnectedTo
Packit acf257
   
Packit acf257
   , HandleStream
Packit acf257
   , HStream(..)
Packit acf257
   
Packit acf257
   , StreamHooks(..)
Packit acf257
   , nullHooks
Packit acf257
   , setStreamHooks
Packit acf257
   , getStreamHooks
Packit acf257
   , hstreamToConnection
Packit acf257
Packit acf257
   ) where
Packit acf257
Packit acf257
import Network.Socket
Packit acf257
   ( Socket, SocketOption(KeepAlive)
Packit acf257
   , SocketType(Stream), connect
Packit acf257
   , shutdown, ShutdownCmd(..)
Packit acf257
   , sClose, setSocketOption, getPeerName
Packit acf257
   , socket, Family(AF_UNSPEC), defaultProtocol, getAddrInfo
Packit acf257
   , defaultHints, addrFamily, withSocketsDo
Packit acf257
   , addrSocketType, addrAddress
Packit acf257
   )
Packit acf257
import qualified Network.Stream as Stream
Packit acf257
   ( Stream(readBlock, readLine, writeBlock, close, closeOnEnd) )
Packit acf257
import Network.Stream
Packit acf257
   ( ConnError(..)
Packit acf257
   , Result
Packit acf257
   , failWith
Packit acf257
   , failMisc
Packit acf257
   )
Packit acf257
import Network.BufferType
Packit acf257
Packit acf257
import Network.HTTP.Base ( catchIO )
Packit acf257
import Network.Socket ( socketToHandle )
Packit acf257
Packit acf257
import Data.Char  ( toLower )
Packit acf257
import Data.Word  ( Word8 )
Packit acf257
import Control.Concurrent
Packit acf257
import Control.Exception ( onException )
Packit acf257
import Control.Monad ( liftM, when )
Packit acf257
import System.IO ( Handle, hFlush, IOMode(..), hClose )
Packit acf257
import System.IO.Error ( isEOFError )
Packit acf257
Packit acf257
import qualified Data.ByteString      as Strict
Packit acf257
import qualified Data.ByteString.Lazy as Lazy
Packit acf257
Packit acf257
-----------------------------------------------------------------
Packit acf257
------------------ TCP Connections ------------------------------
Packit acf257
-----------------------------------------------------------------
Packit acf257
Packit acf257
-- | The 'Connection' newtype is a wrapper that allows us to make
Packit acf257
-- connections an instance of the Stream class, without GHC extensions.
Packit acf257
-- While this looks sort of like a generic reference to the transport
Packit acf257
-- layer it is actually TCP specific, which can be seen in the
Packit acf257
-- implementation of the 'Stream Connection' instance.
Packit acf257
newtype Connection = Connection (HandleStream String)
Packit acf257
Packit acf257
newtype HandleStream a = HandleStream {getRef :: MVar (Conn a)}
Packit acf257
Packit acf257
data EndPoint = EndPoint { epHost :: String, epPort :: Int }
Packit acf257
Packit acf257
instance Eq EndPoint where
Packit acf257
   EndPoint host1 port1 == EndPoint host2 port2 =
Packit acf257
     map toLower host1 == map toLower host2 && port1 == port2
Packit acf257
Packit acf257
data Conn a 
Packit acf257
 = MkConn { connSock      :: ! Socket
Packit acf257
          , connHandle    :: Handle
Packit acf257
          , connBuffer    :: BufferOp a
Packit acf257
          , connInput     :: Maybe a
Packit acf257
          , connEndPoint  :: EndPoint
Packit acf257
          , connHooks     :: Maybe (StreamHooks a)
Packit acf257
          , connCloseEOF  :: Bool -- True => close socket upon reaching end-of-stream.
Packit acf257
          }
Packit acf257
 | ConnClosed
Packit acf257
   deriving(Eq)
Packit acf257
Packit acf257
hstreamToConnection :: HandleStream String -> Connection
Packit acf257
hstreamToConnection h = Connection h
Packit acf257
Packit acf257
connHooks' :: Conn a -> Maybe (StreamHooks a)
Packit acf257
connHooks' ConnClosed{} = Nothing
Packit acf257
connHooks' x = connHooks x
Packit acf257
Packit acf257
-- all of these are post-op hooks
Packit acf257
data StreamHooks ty
Packit acf257
 = StreamHooks
Packit acf257
     { hook_readLine   :: (ty -> String) -> Result ty -> IO ()
Packit acf257
     , hook_readBlock  :: (ty -> String) -> Int -> Result ty -> IO ()
Packit acf257
     , hook_writeBlock :: (ty -> String) -> ty  -> Result () -> IO ()
Packit acf257
     , hook_close      :: IO ()
Packit acf257
     , hook_name       :: String -- hack alert: name of the hook itself.
Packit acf257
     }
Packit acf257
Packit acf257
instance Eq ty => Eq (StreamHooks ty) where
Packit acf257
  (==) _ _ = True
Packit acf257
Packit acf257
nullHooks :: StreamHooks ty
Packit acf257
nullHooks = StreamHooks 
Packit acf257
     { hook_readLine   = \ _ _   -> return ()
Packit acf257
     , hook_readBlock  = \ _ _ _ -> return ()
Packit acf257
     , hook_writeBlock = \ _ _ _ -> return ()
Packit acf257
     , hook_close      = return ()
Packit acf257
     , hook_name       = ""
Packit acf257
     }
Packit acf257
Packit acf257
setStreamHooks :: HandleStream ty -> StreamHooks ty -> IO ()
Packit acf257
setStreamHooks h sh = modifyMVar_ (getRef h) (\ c -> return c{connHooks=Just sh})
Packit acf257
Packit acf257
getStreamHooks :: HandleStream ty -> IO (Maybe (StreamHooks ty))
Packit acf257
getStreamHooks h = readMVar (getRef h) >>= return.connHooks
Packit acf257
Packit acf257
-- | @HStream@ overloads the use of 'HandleStream's, letting you
Packit acf257
-- overload the handle operations over the type that is communicated
Packit acf257
-- across the handle. It comes in handy for @Network.HTTP@ 'Request'
Packit acf257
-- and 'Response's as the payload representation isn't fixed, but overloaded.
Packit acf257
--
Packit acf257
-- The library comes with instances for @ByteString@s and @String@, but
Packit acf257
-- should you want to plug in your own payload representation, defining
Packit acf257
-- your own @HStream@ instance _should_ be all that it takes.
Packit acf257
-- 
Packit acf257
class BufferType bufType => HStream bufType where
Packit acf257
  openStream       :: String -> Int -> IO (HandleStream bufType)
Packit acf257
  openSocketStream :: String -> Int -> Socket -> IO (HandleStream bufType)
Packit acf257
  readLine         :: HandleStream bufType -> IO (Result bufType)
Packit acf257
  readBlock        :: HandleStream bufType -> Int -> IO (Result bufType)
Packit acf257
  writeBlock       :: HandleStream bufType -> bufType -> IO (Result ())
Packit acf257
  close            :: HandleStream bufType -> IO ()
Packit acf257
  closeQuick       :: HandleStream bufType -> IO ()
Packit acf257
  closeOnEnd       :: HandleStream bufType -> Bool -> IO ()
Packit acf257
  
Packit acf257
instance HStream Strict.ByteString where
Packit acf257
  openStream       = openTCPConnection
Packit acf257
  openSocketStream = socketConnection
Packit acf257
  readBlock c n    = readBlockBS c n
Packit acf257
  readLine c       = readLineBS c
Packit acf257
  writeBlock c str = writeBlockBS c str
Packit acf257
  close c          = closeIt c Strict.null True
Packit acf257
  closeQuick c     = closeIt c Strict.null False
Packit acf257
  closeOnEnd c f   = closeEOF c f
Packit acf257
Packit acf257
instance HStream Lazy.ByteString where
Packit acf257
    openStream       = \ a b -> openTCPConnection_ a b True
Packit acf257
    openSocketStream = \ a b c -> socketConnection_ a b c True
Packit acf257
    readBlock c n    = readBlockBS c n
Packit acf257
    readLine c       = readLineBS c
Packit acf257
    writeBlock c str = writeBlockBS c str
Packit acf257
    close c          = closeIt c Lazy.null True
Packit acf257
    closeQuick c     = closeIt c Lazy.null False
Packit acf257
    closeOnEnd c f   = closeEOF c f
Packit acf257
Packit acf257
instance Stream.Stream Connection where
Packit acf257
  readBlock (Connection c)     = Network.TCP.readBlock c
Packit acf257
  readLine (Connection c)      = Network.TCP.readLine c
Packit acf257
  writeBlock (Connection c)    = Network.TCP.writeBlock c 
Packit acf257
  close (Connection c)         = Network.TCP.close c
Packit acf257
  closeOnEnd (Connection c) f  = Network.TCP.closeEOF c f
Packit acf257
  
Packit acf257
instance HStream String where
Packit acf257
    openStream      = openTCPConnection
Packit acf257
    openSocketStream = socketConnection
Packit acf257
    readBlock ref n = readBlockBS ref n
Packit acf257
Packit acf257
    -- This function uses a buffer, at this time the buffer is just 1000 characters.
Packit acf257
    -- (however many bytes this is is left to the user to decypher)
Packit acf257
    readLine ref = readLineBS ref
Packit acf257
    -- The 'Connection' object allows no outward buffering, 
Packit acf257
    -- since in general messages are serialised in their entirety.
Packit acf257
    writeBlock ref str = writeBlockBS ref str -- (stringToBuf str)
Packit acf257
Packit acf257
    -- Closes a Connection.  Connection will no longer
Packit acf257
    -- allow any of the other Stream functions.  Notice that a Connection may close
Packit acf257
    -- at any time before a call to this function.  This function is idempotent.
Packit acf257
    -- (I think the behaviour here is TCP specific)
Packit acf257
    close c = closeIt c null True
Packit acf257
    
Packit acf257
    -- Closes a Connection without munching the rest of the stream.
Packit acf257
    closeQuick c = closeIt c null False
Packit acf257
Packit acf257
    closeOnEnd c f = closeEOF c f
Packit acf257
    
Packit acf257
-- | @openTCPPort uri port@  establishes a connection to a remote
Packit acf257
-- host, using 'getHostByName' which possibly queries the DNS system, hence 
Packit acf257
-- may trigger a network connection.
Packit acf257
openTCPPort :: String -> Int -> IO Connection
Packit acf257
openTCPPort uri port = openTCPConnection uri port >>= return.Connection
Packit acf257
Packit acf257
-- Add a "persistent" option?  Current persistent is default.
Packit acf257
-- Use "Result" type for synchronous exception reporting?
Packit acf257
openTCPConnection :: BufferType ty => String -> Int -> IO (HandleStream ty)
Packit acf257
openTCPConnection uri port = openTCPConnection_ uri port False
Packit acf257
Packit acf257
openTCPConnection_ :: BufferType ty => String -> Int -> Bool -> IO (HandleStream ty)
Packit acf257
openTCPConnection_ uri port stashInput = do
Packit acf257
    -- HACK: uri is sometimes obtained by calling Network.URI.uriRegName, and this includes
Packit acf257
    -- the surrounding square brackets for an RFC 2732 host like [::1]. It's not clear whether
Packit acf257
    -- it should, or whether all call sites should be using something different instead, but
Packit acf257
    -- the simplest short-term fix is to strip any surrounding square brackets here.
Packit acf257
    -- It shouldn't affect any as this is the only situation they can occur - see RFC 3986.
Packit acf257
    let fixedUri =
Packit acf257
         case uri of
Packit acf257
            '[':(rest@(c:_)) | last rest == ']'
Packit acf257
              -> if c == 'v' || c == 'V'
Packit acf257
                     then error $ "Unsupported post-IPv6 address " ++ uri
Packit acf257
                     else init rest
Packit acf257
            _ -> uri
Packit acf257
Packit acf257
Packit acf257
    -- use withSocketsDo here in case the caller hasn't used it, which would make getAddrInfo fail on Windows
Packit acf257
    -- although withSocketsDo is supposed to wrap the entire program, in practice it is safe to use it locally
Packit acf257
    -- like this as it just does a once-only installation of a shutdown handler to run at program exit,
Packit acf257
    -- rather than actually shutting down after the action
Packit acf257
    addrinfos <- withSocketsDo $ getAddrInfo (Just $ defaultHints { addrFamily = AF_UNSPEC, addrSocketType = Stream }) (Just fixedUri) (Just . show $ port)
Packit acf257
    case addrinfos of
Packit acf257
        [] -> fail "openTCPConnection: getAddrInfo returned no address information"
Packit acf257
        (a:_) -> do
Packit acf257
                s <- socket (addrFamily a) Stream defaultProtocol
Packit acf257
                onException (do
Packit acf257
                            setSocketOption s KeepAlive 1
Packit acf257
                            connect s (addrAddress a)
Packit acf257
                            socketConnection_ fixedUri port s stashInput
Packit acf257
                            ) (sClose s)
Packit acf257
Packit acf257
-- | @socketConnection@, like @openConnection@ but using a pre-existing 'Socket'.
Packit acf257
socketConnection :: BufferType ty
Packit acf257
                 => String
Packit acf257
                 -> Int
Packit acf257
                 -> Socket
Packit acf257
                 -> IO (HandleStream ty)
Packit acf257
socketConnection hst port sock = socketConnection_ hst port sock False
Packit acf257
Packit acf257
-- Internal function used to control the on-demand streaming of input
Packit acf257
-- for /lazy/ streams.
Packit acf257
socketConnection_ :: BufferType ty
Packit acf257
                  => String
Packit acf257
                  -> Int
Packit acf257
                  -> Socket
Packit acf257
                  -> Bool
Packit acf257
                  -> IO (HandleStream ty)
Packit acf257
socketConnection_ hst port sock stashInput = do
Packit acf257
    h <- socketToHandle sock ReadWriteMode
Packit acf257
    mb <- case stashInput of { True -> liftM Just $ buf_hGetContents bufferOps h; _ -> return Nothing }
Packit acf257
    let conn = MkConn 
Packit acf257
         { connSock     = sock
Packit acf257
         , connHandle   = h
Packit acf257
         , connBuffer   = bufferOps
Packit acf257
         , connInput    = mb
Packit acf257
         , connEndPoint = EndPoint hst port
Packit acf257
         , connHooks    = Nothing
Packit acf257
         , connCloseEOF = False
Packit acf257
         }
Packit acf257
    v <- newMVar conn
Packit acf257
    return (HandleStream v)
Packit acf257
Packit acf257
closeConnection :: HStream a => HandleStream a -> IO Bool -> IO ()
Packit acf257
closeConnection ref readL = do
Packit acf257
    -- won't hold onto the lock for the duration
Packit acf257
    -- we are draining it...ToDo: have Connection
Packit acf257
    -- into a shutting-down state so that other
Packit acf257
    -- threads will simply back off if/when attempting
Packit acf257
    -- to also close it.
Packit acf257
  c <- readMVar (getRef ref)
Packit acf257
  closeConn c `catchIO` (\_ -> return ())
Packit acf257
  modifyMVar_ (getRef ref) (\ _ -> return ConnClosed)
Packit acf257
 where
Packit acf257
   -- Be kind to peer & close gracefully.
Packit acf257
  closeConn ConnClosed = return ()
Packit acf257
  closeConn conn = do
Packit acf257
    let sk = connSock conn
Packit acf257
    hFlush (connHandle conn)
Packit acf257
    shutdown sk ShutdownSend
Packit acf257
    suck readL
Packit acf257
    hClose (connHandle conn)
Packit acf257
    shutdown sk ShutdownReceive
Packit acf257
    sClose sk
Packit acf257
Packit acf257
  suck :: IO Bool -> IO ()
Packit acf257
  suck rd = do
Packit acf257
    f <- rd
Packit acf257
    if f then return () else suck rd
Packit acf257
Packit acf257
-- | Checks both that the underlying Socket is connected
Packit acf257
-- and that the connection peer matches the given
Packit acf257
-- host name (which is recorded locally).
Packit acf257
isConnectedTo :: Connection -> EndPoint -> IO Bool
Packit acf257
isConnectedTo (Connection conn) endPoint = isTCPConnectedTo conn endPoint
Packit acf257
Packit acf257
isTCPConnectedTo :: HandleStream ty -> EndPoint -> IO Bool
Packit acf257
isTCPConnectedTo conn endPoint = do
Packit acf257
   v <- readMVar (getRef conn)
Packit acf257
   case v of
Packit acf257
     ConnClosed -> return False
Packit acf257
     _ 
Packit acf257
      | connEndPoint v == endPoint ->
Packit acf257
          catchIO (getPeerName (connSock v) >> return True) (const $ return False)
Packit acf257
      | otherwise -> return False
Packit acf257
Packit acf257
readBlockBS :: HStream a => HandleStream a -> Int -> IO (Result a)
Packit acf257
readBlockBS ref n = onNonClosedDo ref $ \ conn -> do
Packit acf257
   x <- bufferGetBlock ref n
Packit acf257
   maybe (return ())
Packit acf257
         (\ h -> hook_readBlock h (buf_toStr $ connBuffer conn) n x)
Packit acf257
         (connHooks' conn)
Packit acf257
   return x
Packit acf257
Packit acf257
-- This function uses a buffer, at this time the buffer is just 1000 characters.
Packit acf257
-- (however many bytes this is is left for the user to decipher)
Packit acf257
readLineBS :: HStream a => HandleStream a -> IO (Result a)
Packit acf257
readLineBS ref = onNonClosedDo ref $ \ conn -> do
Packit acf257
   x <- bufferReadLine ref
Packit acf257
   maybe (return ())
Packit acf257
         (\ h -> hook_readLine h (buf_toStr $ connBuffer conn) x)
Packit acf257
         (connHooks' conn)
Packit acf257
   return x
Packit acf257
Packit acf257
-- The 'Connection' object allows no outward buffering, 
Packit acf257
-- since in general messages are serialised in their entirety.
Packit acf257
writeBlockBS :: HandleStream a -> a -> IO (Result ())
Packit acf257
writeBlockBS ref b = onNonClosedDo ref $ \ conn -> do
Packit acf257
  x    <- bufferPutBlock (connBuffer conn) (connHandle conn) b
Packit acf257
  maybe (return ())
Packit acf257
        (\ h -> hook_writeBlock h (buf_toStr $ connBuffer conn) b x)
Packit acf257
        (connHooks' conn)
Packit acf257
  return x
Packit acf257
Packit acf257
closeIt :: HStream ty => HandleStream ty -> (ty -> Bool) -> Bool -> IO ()
Packit acf257
closeIt c p b = do
Packit acf257
   closeConnection c (if b
Packit acf257
                      then readLineBS c >>= \ x -> case x of { Right xs -> return (p xs); _ -> return True}
Packit acf257
                      else return True)
Packit acf257
   conn <- readMVar (getRef c)
Packit acf257
   maybe (return ())
Packit acf257
         (hook_close)
Packit acf257
         (connHooks' conn)
Packit acf257
Packit acf257
closeEOF :: HandleStream ty -> Bool -> IO ()
Packit acf257
closeEOF c flg = modifyMVar_ (getRef c) (\ co -> return co{connCloseEOF=flg})
Packit acf257
Packit acf257
bufferGetBlock :: HStream a => HandleStream a -> Int -> IO (Result a)
Packit acf257
bufferGetBlock ref n = onNonClosedDo ref $ \ conn -> do
Packit acf257
   case connInput conn of
Packit acf257
    Just c -> do
Packit acf257
      let (a,b) = buf_splitAt (connBuffer conn) n c
Packit acf257
      modifyMVar_ (getRef ref) (\ co -> return co{connInput=Just b})
Packit acf257
      return (return a)
Packit acf257
    _ -> do
Packit acf257
      catchIO (buf_hGet (connBuffer conn) (connHandle conn) n >>= return.return)
Packit acf257
              (\ e ->
Packit acf257
                       if isEOFError e
Packit acf257
                        then do
Packit acf257
                          when (connCloseEOF conn) $ catchIO (closeQuick ref) (\ _ -> return ())
Packit acf257
                          return (return (buf_empty (connBuffer conn)))
Packit acf257
                        else return (failMisc (show e)))
Packit acf257
Packit acf257
bufferPutBlock :: BufferOp a -> Handle -> a -> IO (Result ())
Packit acf257
bufferPutBlock ops h b = 
Packit acf257
  catchIO (buf_hPut ops h b >> hFlush h >> return (return ()))
Packit acf257
          (\ e -> return (failMisc (show e)))
Packit acf257
Packit acf257
bufferReadLine :: HStream a => HandleStream a -> IO (Result a)
Packit acf257
bufferReadLine ref = onNonClosedDo ref $ \ conn -> do
Packit acf257
  case connInput conn of
Packit acf257
   Just c -> do
Packit acf257
    let (a,b0)  = buf_span (connBuffer conn) (/='\n') c
Packit acf257
    let (newl,b1) = buf_splitAt (connBuffer conn) 1 b0
Packit acf257
    modifyMVar_ (getRef ref) (\ co -> return co{connInput=Just b1})
Packit acf257
    return (return (buf_append (connBuffer conn) a newl))
Packit acf257
   _ -> catchIO
Packit acf257
              (buf_hGetLine (connBuffer conn) (connHandle conn) >>= 
Packit acf257
                    return . return . appendNL (connBuffer conn))
Packit acf257
              (\ e ->
Packit acf257
                 if isEOFError e
Packit acf257
                  then do
Packit acf257
                    when (connCloseEOF conn) $ catchIO (closeQuick ref) (\ _ -> return ())
Packit acf257
                    return (return   (buf_empty (connBuffer conn)))
Packit acf257
                  else return (failMisc (show e)))
Packit acf257
 where
Packit acf257
   -- yes, this s**ks.. _may_ have to be addressed if perf
Packit acf257
   -- suggests worthiness.
Packit acf257
  appendNL ops b = buf_snoc ops b nl
Packit acf257
  
Packit acf257
  nl :: Word8
Packit acf257
  nl = fromIntegral (fromEnum '\n')
Packit acf257
Packit acf257
onNonClosedDo :: HandleStream a -> (Conn a -> IO (Result b)) -> IO (Result b)
Packit acf257
onNonClosedDo h act = do
Packit acf257
  x <- readMVar (getRef h)
Packit acf257
  case x of
Packit acf257
    ConnClosed{} -> return (failWith ErrorClosed)
Packit acf257
    _ -> act x
Packit acf257