module Queue where import Control.Monad import Control.Concurrent.MVar import Control.Concurrent data TaskQueue a = TaskQueue { taskChan :: Chan a , taskCount :: MVar Int , taskQSem :: QSem } -- | create a new Queue newQueue :: IO (TaskQueue a) newQueue = do ch <- newChan count <- newMVar 0 sem <- newQSem 0 return $ TaskQueue { taskChan=ch, taskCount=count, taskQSem=sem } -- | add element to a Queue, block when size is full. writeQueue :: TaskQueue a -> a -> IO () writeQueue queue item = do writeChan (taskChan queue) item modifyMVar_ (taskCount queue) (return . (+1)) -- | get element from a Queue, block when queue is empty. readQueue :: TaskQueue a -> IO a readQueue queue = readChan (taskChan queue) -- | when get element from Queue, this mark the element is processed -- successfully. taskDone :: TaskQueue a -> IO () taskDone queue = signalQSem (taskQSem queue) -- | block until all elements in Queue are processed successfully. joinQueue :: TaskQueue a -> IO () joinQueue queue = do count <- readMVar (taskCount queue) replicateM_ count (waitQSem (taskQSem queue))