Skip to content
Queue.hs 1.09 KiB
Newer Older
module Queue where

import Control.Monad
import Control.Concurrent.MVar
import Control.Concurrent

data TaskQueue a = TaskQueue {
      taskChan :: Chan a
    , taskCount :: MVar Int
    , taskQSem :: QSem
    }

-- | create a new Queue
newQueue :: IO (TaskQueue a)
newQueue = do
  ch <- newChan
  count <- newMVar 0
  sem <- newQSem 0
  return TaskQueue { taskChan=ch, taskCount=count, taskQSem=sem }

-- | add element to a Queue, block when size is full.
writeQueue :: TaskQueue a -> a -> IO ()
writeQueue queue item = do
  writeChan (taskChan queue) item
  modifyMVar_ (taskCount queue) (return . (+1))

-- | get element from a Queue, block when queue is empty.
readQueue :: TaskQueue a -> IO a
readQueue queue = readChan (taskChan queue)

-- | when get element from Queue, this mark the element is processed
-- successfully.
taskDone :: TaskQueue a -> IO ()
taskDone queue = signalQSem (taskQSem queue)

-- | block until all elements in Queue are processed successfully.
joinQueue :: TaskQueue a -> IO ()
joinQueue queue = do
  count <- readMVar (taskCount queue)
  replicateM_ count (waitQSem (taskQSem queue))