queue is:module
Contains the public fields of a [Queue][glib-Double-ended-Queues].
This module uses the
immortal library to build a pool of worker
threads that process a queue of tasks asynchronously.
First build an
ImmortalQueue for your task type and queue
backend. Then you can launch the pool using
processImmortalQueue and stop the pool with
closeImmortalQueue.
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TQueue
import Control.Exception (Exception)
import Control.Immortal.Queue
data Task
= Print String
deriving (Show)
queueConfig :: TQueue Task -> ImmortalQueue Task
queueConfig queue =
ImmortalQueue
{ qThreadCount = 2
, qPollWorkerTime = 1000
, qPop = atomically $ readTQueue queue
, qPush = atomically . writeTQueue queue
, qHandler = performTask
, qFailure = printError
}
where
performTask :: Task -> IO ()
performTask t = case t of
Print str ->
putStrLn str
printError :: Exception e => Task -> e -> IO ()
printError t err =
let description = case t of
Print str ->
"print"
in putStrLn $ "Task `" ++ description ++ "` failed with: " ++ show err
main :: IO ()
main = do
queue <- newTQueueIO
workers <- processImmortalQueue $ queueConfig queue
atomically $ mapM_ (writeTQueue queue . Print) ["hello", "world"]
closeImmortalQueue workers
A queue data structure with <math> (worst-case) operations, as
described in
- Okasaki, Chris. "Simple and efficient purely functional queues and
deques." Journal of functional programming 5.4 (1995):
583-592.
- Okasaki, Chris. Purely Functional Data Structures. Diss.
Princeton University, 1996.
TBQueue is a bounded version of
TQueue. The queue has
a maximum capacity set when it is created. If the queue already
contains the maximum number of elements, then
writeTBQueue
retries until an element is removed from the queue.
The implementation is based on an array to obtain
O(1) enqueue
and dequeue operations.
A
TQueue is like a
TChan, with two important
differences:
- it has faster throughput than both TChan and
Chan (although the costs are amortised, so the cost of
individual operations can vary a lot).
- it does not provide equivalents of the dupTChan
and cloneTChan operations.
The implementation is based on the traditional purely-functional queue
representation that uses two lists to obtain amortised
O(1)
enqueue and dequeue operations.
An opaque data structure which represents an asynchronous queue.
It should only be accessed through the g_async_queue_*
functions.
A version of
Control.Concurrent.STM.TQueue where the queue is
closeable. This is similar to a
TQueue (Maybe a) with a
monotonicity guarantee that once there's a
Nothing there will
always be
Nothing.
Since: 2.0.0
Concurrent queue for single reader, single writer
Clone of Control.Concurrent.STM.TQueue with support for mkWeakTQueue
Not all functionality from the original module is available:
unGetTQueue, peekTQueue and tryPeekTQueue are missing. In order to
implement these we'd need to be able to touch# the write end of the
queue inside unGetTQueue, but that means we need a version of touch#
that works within the STM monad.
Contains a simple source and sink linking together conduits in
different threads. For extended examples of usage and bottlenecks see
TMChan.
TQueue is an amoritized FIFO queue behaves like TChan, with two
important differences:
- it's faster (but amortized thus the cost of individual operations
may vary a lot)
- it doesn't provide equivalent of the dupTChan and cloneTChan
operations
Here is short description of data structures:
- TQueue - unbounded infinite queue
- TBQueue - bounded infinite queue
- TMQueue - unbounded finite (closable) queue
- TBMQueue - bounded finite (closable) queue
Caveats
Infinite operations means that source doesn't know when stream is
ended so one need to use other methods of finishing stream like
sending an exception or finish conduit in downstream.
This internal module is exposed only for testing and benchmarking. You
don't need to import it.
A
priority search queue (henceforth
queue) efficiently
supports the operations of both a search tree and a priority queue. A
Binding is a product of a key and a priority. Bindings can be
inserted, deleted, modified and queried in logarithmic time, and the
binding with the least priority can be retrieved in constant time. A
queue can be built from a list of bindings, sorted by keys, in linear
time.
This implementation is due to Ralf Hinze.