Newer
Older
module Queue where
import Control.Monad
import Control.Concurrent.MVar
import Control.Concurrent
data TaskQueue a = TaskQueue {
taskChan :: Chan a
, taskCount :: MVar Int
, taskQSem :: QSem
}
-- | create a new Queue
newQueue :: IO (TaskQueue a)
newQueue = do
ch <- newChan
count <- newMVar 0
sem <- newQSem 0
return TaskQueue { taskChan=ch, taskCount=count, taskQSem=sem }
-- | add element to a Queue, block when size is full.
writeQueue :: TaskQueue a -> a -> IO ()
writeQueue queue item = do
writeChan (taskChan queue) item
modifyMVar_ (taskCount queue) (return . (+1))
-- | get element from a Queue, block when queue is empty.
readQueue :: TaskQueue a -> IO a
readQueue queue = readChan (taskChan queue)
-- | when get element from Queue, this mark the element is processed
-- successfully.
taskDone :: TaskQueue a -> IO ()
taskDone queue = signalQSem (taskQSem queue)
-- | block until all elements in Queue are processed successfully.
joinQueue :: TaskQueue a -> IO ()
joinQueue queue = do
count <- readMVar (taskCount queue)
replicateM_ count (waitQSem (taskQSem queue))