Sunday, March 3, 2013

Adding Concurrency to Frege (part II)

In the first part we demonstrated how to implement the important forkIO operation. You can now implement the remaining simple examples Simon Marlow gives us in chapter 4 of his upcoming book. They  deal with threads that need not communicate with each other or with the main program.

Whenever we need some additional primitive operation, we look up the Java API if similar functionality is available, and add appropriate native definitions and perhaps some glue code to adapt the interface. For example, in the "reminders" example, Simon introduces an operation "that waits for some time to elapse"

--| do nothing for some microseconds
threadDelay    :: Int -> IO ()

What we have in Java is a similar function we can make usable in Frege through:

--- let the current thread sleep for some milliseconds
native sleep   java.lang.Thread.sleep
               :: Long   -> IO () throws InterruptedException

and then use it to implement the Haskell interface, that differs in the type of the argument and, very important, in that it interprets its argument as microseconds, whereas the Java API doesn't allow us such fine grained naps, and thinks it is better for our health to sleep in amounts of milliseconds.

--- convert microseconds to milliseconds, and Int to Long
threadDelay   :: Int -> IO ()
threadDelay n =  sleep ((n.long + 500L) `div` 1000L)  -- give or take some 

However, despite the many similarities and easy adoption of Haskell stuff,
it is important to keep in mind that there are differences that are not that
easy to bridge. For example, as Simon demonstrates us, the Haskell run time system terminates all threads as soon as the main program ends. This is not so in Java (whose run time system we are using in Frege), where all (non deamon) threads are born equal, and the JVM won't exit until all of them are finished. It is possible to simulate the Haskell runtime behaviour by running

System.exit 0

instead of

return ()

in the main function. This will exit the JVM and all still running threads.

Implementing MVars

A MVar is a data structure that serves as basic communication mechanism between threads in Concurrent Haskell. It is like a box that can be empty or full. Trying to take something from an empty box as well as trying to put something into a full box will cause the corresponding thread to block, until some other thread puts or takes something there.

In Java, we don't have anything like that, but we have blocking queues in the java.util.concurrent package, and we can make blocking queues with limited capacity. One way of implementing MVars would thus be to regard them as degenerated blocking queues with a maximum capacity of 1. We first "import" the interface java.util.concurrent.BlockingQueue with the operations we are interested in to Frege:

data BlockingQueue e = mutable native java.util.concurrent.BlockingQueue where
    --- add element to blocking queue, return false if not possible
    native offer    :: BlockingQueue e -> e -> IO Bool
    --- add element to blocking queue, block until possible
    native put      :: BlockingQueue e -> e -> IO () throws InterruptedException
    --- get and remove element from blocking queue, return null if it is empty
    native poll     :: BlockingQueue e -> IO (Maybe e)
    --- get and remove element from blocking queue, block until something is available
    native take     :: BlockingQueue e -> IO e throws InterruptedException

Because this is an interface and not a class, we do not have a constructor that allows us to actually create a blocking queue. For this we need to import a class that actually implements the interface, like:

data ArrayBlockingQueue e = mutable native java.util.concurrent.ArrayBlockingQueue where
    --- make a blocking queue with the given maximum capacity
    native new      :: Int -> IO (ArrayBlockingQueue e)

As a side note: while the Frege compiler does not explore the available Java classes and interfaces on its own, it automatically knows the relations between Java classes and Java interfaces we introduced through native data declarations and it applies this knowledge during type checking. It will thus be possible to pass a value of type ArrayBlockingQueue e when BlockingQueue e is expected (but not the other way around). This greatly simplifies and reduces the native boilerplate code we have to write.

The implementation of MVar is now straightforward:

abstract data MVar a = MV (BlockingQueue a) where
    newEmpty        = 1 >>= return . MV
    new a           = do m <- newEmpty; m.put a; return m
    put   (MV q) a  = q.put a         
    take  (MV q)    = q.take
    offer (MV q) a  = q.offer a
    poll  (MV q)    = q.poll  

The abstract keyword basically makes the MV value constructor inaccessible, except for the code that lives in the MVar namespace. For this reason, we give the basic MVar operations in the where clause of the data definition. Note that a data definition with a single constructor that has a single field is equivalent to a Haskell newtype definition - there will be no runtime overhead.

The only ways to create MVars are MVar.newEmpty and, which uses the former. In newEmpty, the capacity of the underlying blocking queue is limited to 1 element, thus giving us the desired functionality.

It is worth mentioning a big difference to Haskell's MVars: the values put into them will be evaluated to weak head normal form in the process of doing so. This is because the MVar operations are written in terms of native functions and arguments to native functions must all be strict.

We can give the following definitions, if only to make it easier to copy/paste Simons examples:

-- Haskell compatibility
newEmptyMVar    = MVar.newEmpty
newMVar         = 
tryTakeMVar     = MVar.poll
tryPutMVar      = MVar.offer
takeMVar        = MVar.take
putMVar         = MVar.put 

Let's check if it works, and at the same time show the dangers that come with MVars:

main _ = do
    m <- newEmptyMVar
    m.take  -- or, if you like: takeMVar m

Here we find another difference between the Haskell and the Java runtime regarding concurrency: The Haskell runtime detects the deadlock, i.e., it notices that there is no other thread that could possibly put something into m and terminates the program. The JVM, on the other hand, does not care. It just sits there and waits. One will need to get a thread dump (using some tools like jvisualvm or the like) to diagnose deadlock.

Now that we have MVars, it should be easy to also implement the other examples that demonstrate usage of MVars as simple channels and shared state, by merely copying the Haskell code. Of course, we won't really use MVars as building blocks for unbounded channels in Frege, as we already have native unbounded blocking queues.

In the next part, we will turn to chapter 5, where Simon Marlow shows us overlapping input/output.

1 comment:

  1. >> In Java, we don't have anything like that
    java.util.concurrent.SynchronousQueue comes to mind, but of course it's implementation of the mentioned java.util.concurrent.BlockingQueue


Comments will be censored by me as I see fit, most likely if they contain insults or propaganda for ideologies I do not like. Comments that are on topic will not be censored. If I leave a comment uncensored this does not imply that I agree with the opinions expressed therein.