Open a connection to the Redis server, register to all channels in the
PubSubController, and process messages and subscription change
requests forever. The only way this will ever exit is if there is an
exception from the network code or an unhandled exception in a
MessageCallback or
PMessageCallback. For example, if the
network connection to Redis dies,
pubSubForever will throw a
ConnectionLost. When such an exception is thrown, you can
recall
pubSubForever with the same
PubSubController
which will open a new connection and resubscribe to all the channels
which are tracked in the
PubSubController.
The general pattern is therefore during program startup create a
PubSubController and fork a thread which calls
pubSubForever in a loop (using an exponential backoff algorithm
such as the
retry package to not hammer the Redis server if it
does die). For example,
myhandler :: ByteString -> IO ()
myhandler msg = putStrLn $ unpack $ decodeUtf8 msg
onInitialComplete :: IO ()
onInitialComplete = putStrLn "Redis acknowledged that mychannel is now subscribed"
main :: IO ()
main = do
conn <- connect defaultConnectInfo
pubSubCtrl <- newPubSubController [("mychannel", myhandler)] []
concurrently ( forever $
pubSubForever conn pubSubCtrl onInitialComplete
`catch` (\(e :: SomeException) -> do
putStrLn $ "Got error: " ++ show e
threadDelay $ 50*1000) -- TODO: use exponential backoff
) $ restOfYourProgram
{- elsewhere in your program, use pubSubCtrl to change subscriptions -}
At most one active
pubSubForever can be running against a
single
PubSubController at any time. If two active calls to
pubSubForever share a single
PubSubController there will
be deadlocks. If you do want to process messages using multiple
connections to Redis, you can create more than one
PubSubController. For example, create one PubSubController for
each
getNumCapabilities and then create a Haskell thread bound
to each capability each calling
pubSubForever in a loop. This
will create one network connection per controller/capability and allow
you to register separate channels and callbacks for each controller,
spreading the load across the capabilities.