Thursday, March 7, 2013

Adding Concurrency to Frege (part III)

In the second part of this series, we implemented MVars and noted some differences in the concurrency support of Haskell and Frege.

In this third and last part, we turn to another example taken from chapter 5 of Simon Marlows upcoming book on parallelism and concurrency in Haskell and conclude with a discussion of the lessons learned.

Overlapping IO without error handling

The following program loads two websites concurrently and prints their sizes:

main _ =  do
    m1 <- MVar.newEmpty
    m2 <- MVar.newEmpty
    
    forkIO do
        r <- getURL "http://www.wikipedia.org/wiki/Shovel"
        m1.put r
    
    forkIO do
        r <- getURL "http://www.wikipedia.org/wiki/Spade"
        m2.put r
    
    r1 <- m1.take
    r2 <- m2.take
    println (sum (map length r1), sum (map length r2))

The output will be:

(80278, 66924)

This is, however, sure to produce deadlock whenever, for some reason, one of the threads cannot complete to the point where it writes the result to its MVar - or, in other words, if the getURL action throws exceptions. We need not go into the details of the implementation of getURL, suffice it to say that it is a composition of several native methods that each can throw exceptions for a variety of reasons: bad URL syntax, IO error, network failure, and so on. For example, if we change the protocol in the second URL to htto (simulating a typo, where someone typed 'o' instead of 'p'), the output will look like:

Exception in thread "Thread-1" frege.runtime.WrappedCheckedException:
        at frege.runtime.WrappedCheckedException.wrapIfNeeded(WrappedCh
        ...
Caused by: java.net.MalformedURLException: unknown protocol: htto

In this case, the main thread will forever block in the attempt to get the result of the second thread.

Overlapping IO with error handling


To correct this issue, we can follow the approaches Simon Marlow explains in his book. Remember though, that exceptions in Frege are based on Java exceptions. Introducing new exceptions is not yet possible in Frege. Instead, one simply introduces appropriate Java classes that are subclasses of java.lang.Throwable. However, exceptions thrown from native methods are usually already implemented, and so is the "catch all" type Throwable. Hence all we need to make the examples work is the following:

type SomeException = Throwable

try :: IO a -> IO (SomeException|a)
try action = action >>= return . Right
        `catch` any
    where
        any :: SomeException -> IO (SomeException|a)
        any = return . Left 

To show this in action without using the more elaborate and better abstracted approaches of Simon Marlow, here is a non-deadlocking version of the program above:

main _ =  do
    m1 <- MVar.newEmpty
    m2 <- MVar.newEmpty
    
    forkIO do
        r <- (try . getURL) "http://www.wikipedia.org/wiki/Shovel"
        m1.put r
    
    forkIO do
        r <- (try . getURL) "htto://www.wikipedia.org/wiki/Spade"
        m2.put r
    
    r1 <- m1.take
    r2 <- m2.take
    println (result r1, result r2)
  where
    result :: (SomeException|[String]) -> String
    result (Left x)  = x.getClass.getName
    result (Right y) = (show . sum . map length)  y


When we run this, it produces the following output:

("80278", "java.net.MalformedURLException")


Not very nice, but it does now terminate despite of errors.

Lessons learned

It turns out that it was easy to add concurrency to Frege, thanks to the concurrency support that comes with Java and the JVM and the ability of Frege to make use of those feature in a seamless way.

It was even easy to do this in such a way that Haskell programmers would feel comfortable, by approximating the abstractions they are used to: forkIO, MVars, exceptions, etc. to such an extend that porting Haskell code via copy/paste was possible.

But we also observed important differences in the respective run time concurrency support:
  • Termination of the main thread does not terminate any other thread in the JVM.
  • MVars as implemented here are strict.
  • The JVM does not detect deadlocks.
But there is more, and it could not be easily ignored:
  • The forkIO we implemented in Frege is really what Haskell knows as forkOS, as genuine OS threads are forked, as opposed to Haskell's lightweight "green" threads. The Java concurrency API has nothing comparable to the latter, as far as I know. One could probably use thread pools, executor services and asynchronous tasks to achieve something like it, but there remains a basic difference: if an asynchronous task performs a blocking action like reading from a MVar, it will block the OS thread it is running on. Hence, in the Frege/Java/JVM concurrency model, a fixed number of OS threads cannot - on principle - support an arbitrary number of asynchronous tasks that may potentially block.
  • In the JVM, there is no way to throw an asynchronous exception in another thread. The only possibility is to interrupt another thread, which the other thread may or may not honor. But, if one interrupts a thread that is in a blocking operation, that operation will throw an InterruptedException.
  • In the JVM world, there is - to my knowledge - no support for software transactional memory (STM) out of the box, one would need to implement this manually and it would probably not be easy.
  • ...
The conclusion is, unfortunately, that one could not get very far in Frege if one only knows the Haskell concurrency model. To do serious concurrent work  in an JVM environment is different and it will thus require different knowledge. One way to obtain this knowledge would be to read books like "Java Concurrency in Practice".

Sunday, March 3, 2013

Adding Concurrency to Frege (part II)

In the first part we demonstrated how to implement the important forkIO operation. You can now implement the remaining simple examples Simon Marlow gives us in chapter 4 of his upcoming book. They  deal with threads that need not communicate with each other or with the main program.

Whenever we need some additional primitive operation, we look up the Java API if similar functionality is available, and add appropriate native definitions and perhaps some glue code to adapt the interface. For example, in the "reminders" example, Simon introduces an operation "that waits for some time to elapse"

--| do nothing for some microseconds
threadDelay    :: Int -> IO ()

What we have in Java is a similar function we can make usable in Frege through:

--- let the current thread sleep for some milliseconds
native sleep   java.lang.Thread.sleep
               :: Long   -> IO () throws InterruptedException

and then use it to implement the Haskell interface, that differs in the type of the argument and, very important, in that it interprets its argument as microseconds, whereas the Java API doesn't allow us such fine grained naps, and thinks it is better for our health to sleep in amounts of milliseconds.

--- convert microseconds to milliseconds, and Int to Long
threadDelay   :: Int -> IO ()
threadDelay n =  sleep ((n.long + 500L) `div` 1000L)  -- give or take some 

However, despite the many similarities and easy adoption of Haskell stuff,
it is important to keep in mind that there are differences that are not that
easy to bridge. For example, as Simon demonstrates us, the Haskell run time system terminates all threads as soon as the main program ends. This is not so in Java (whose run time system we are using in Frege), where all (non deamon) threads are born equal, and the JVM won't exit until all of them are finished. It is possible to simulate the Haskell runtime behaviour by running

System.exit 0

instead of

return ()

in the main function. This will exit the JVM and all still running threads.

Implementing MVars

A MVar is a data structure that serves as basic communication mechanism between threads in Concurrent Haskell. It is like a box that can be empty or full. Trying to take something from an empty box as well as trying to put something into a full box will cause the corresponding thread to block, until some other thread puts or takes something there.

In Java, we don't have anything like that, but we have blocking queues in the java.util.concurrent package, and we can make blocking queues with limited capacity. One way of implementing MVars would thus be to regard them as degenerated blocking queues with a maximum capacity of 1. We first "import" the interface java.util.concurrent.BlockingQueue with the operations we are interested in to Frege:

data BlockingQueue e = mutable native java.util.concurrent.BlockingQueue where
    --- add element to blocking queue, return false if not possible
    native offer    :: BlockingQueue e -> e -> IO Bool
    --- add element to blocking queue, block until possible
    native put      :: BlockingQueue e -> e -> IO () throws InterruptedException
    
    --- get and remove element from blocking queue, return null if it is empty
    native poll     :: BlockingQueue e -> IO (Maybe e)
    --- get and remove element from blocking queue, block until something is available
    native take     :: BlockingQueue e -> IO e throws InterruptedException

Because this is an interface and not a class, we do not have a constructor that allows us to actually create a blocking queue. For this we need to import a class that actually implements the interface, like:

data ArrayBlockingQueue e = mutable native java.util.concurrent.ArrayBlockingQueue where
    --- make a blocking queue with the given maximum capacity
    native new      :: Int -> IO (ArrayBlockingQueue e)

As a side note: while the Frege compiler does not explore the available Java classes and interfaces on its own, it automatically knows the relations between Java classes and Java interfaces we introduced through native data declarations and it applies this knowledge during type checking. It will thus be possible to pass a value of type ArrayBlockingQueue e when BlockingQueue e is expected (but not the other way around). This greatly simplifies and reduces the native boilerplate code we have to write.

The implementation of MVar is now straightforward:

abstract data MVar a = MV (BlockingQueue a) where
    newEmpty        = ArrayBlockingQueue.new 1 >>= return . MV
    new a           = do m <- newEmpty; m.put a; return m
    put   (MV q) a  = q.put a         
    take  (MV q)    = q.take
    offer (MV q) a  = q.offer a
    poll  (MV q)    = q.poll  

The abstract keyword basically makes the MV value constructor inaccessible, except for the code that lives in the MVar namespace. For this reason, we give the basic MVar operations in the where clause of the data definition. Note that a data definition with a single constructor that has a single field is equivalent to a Haskell newtype definition - there will be no runtime overhead.

The only ways to create MVars are MVar.newEmpty and MVar.new, which uses the former. In newEmpty, the capacity of the underlying blocking queue is limited to 1 element, thus giving us the desired functionality.

It is worth mentioning a big difference to Haskell's MVars: the values put into them will be evaluated to weak head normal form in the process of doing so. This is because the MVar operations are written in terms of native functions and arguments to native functions must all be strict.

We can give the following definitions, if only to make it easier to copy/paste Simons examples:

-- Haskell compatibility
newEmptyMVar    = MVar.newEmpty
newMVar         = MVar.new 
tryTakeMVar     = MVar.poll
tryPutMVar      = MVar.offer
takeMVar        = MVar.take
putMVar         = MVar.put 

Let's check if it works, and at the same time show the dangers that come with MVars:

main _ = do
    m <- newEmptyMVar
    m.take  -- or, if you like: takeMVar m

Here we find another difference between the Haskell and the Java runtime regarding concurrency: The Haskell runtime detects the deadlock, i.e., it notices that there is no other thread that could possibly put something into m and terminates the program. The JVM, on the other hand, does not care. It just sits there and waits. One will need to get a thread dump (using some tools like jvisualvm or the like) to diagnose deadlock.

Now that we have MVars, it should be easy to also implement the other examples that demonstrate usage of MVars as simple channels and shared state, by merely copying the Haskell code. Of course, we won't really use MVars as building blocks for unbounded channels in Frege, as we already have native unbounded blocking queues.

In the next part, we will turn to chapter 5, where Simon Marlow shows us overlapping input/output.