Weird try/catch behaviour with Scala + Akka

Go To StackoverFlow.com

2

I'm trying to use Akka to implement a TCP server for a custom application protocol. I'm trying to follow the example given here: http://doc.akka.io/docs/akka/2.0/scala/io.html to do non-blocking IO inside a for...yield loop.

I find that when I throw an exception from inside the yield block, I can't catch it from outside the block. I think I've got a fundamental misunderstanding of how Akka or Scala is working here and I'd appreciate any tips.

I've boiled down the code to this:

import akka.actor._
import java.net.InetSocketAddress

class EchoServer(port: Int) extends Actor {

  val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher)

  override def preStart {
    IOManager(context.system) listen new InetSocketAddress(port)
  }

  def receive = {
    case IO.NewClient(server) =>
      val socket = server.accept()
      state(socket) flatMap (_ => EchoServer.processRequest(socket))
    case IO.Read(socket, bytes) =>
      state(socket)(IO.Chunk(bytes))
    case IO.Closed(socket, cause) =>
      state(socket)(IO.EOF(None))
      state -= socket
  }
}

object EchoServer extends App
{
  def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] =
  {
    println( "In process request")
    try {
      for {
        bs <- IO take 1
      } yield {
        println("I'll get here")
        throw new Exception("Hey-o!")
        println("But not here ... as expected")
      }
    } catch {
      case e: Exception => println("And not here ... wtf?"); IO.Done()  // NEVER GETS HERE
    }
  }

  ActorSystem().actorOf(Props(new EchoServer(8080)))
}

Maybe more convenient to follow the gist here: https://gist.github.com/2296554

Can anybody explain why control does not reach my catch block in this situation?

I noticed that if I turn on debug logging in Akka, I see this message in the output:

[DEBUG] [04/03/2012 22:42:25.106] [EchoServerActorSystem-akka.actor.default-dispatcher-1] [Future] Hey-o!

So I guess the exception is getting handled by the Akka dispatcher? Can anybody explain how that's possible?

2012-04-04 05:09
by Russ Weeks


6

The point of non-blocking IO is of course that there is no guarantee when and where it is executed. Remember that one can write the for comprehension as

(IO take 1).map(bs => {
  println("I'll get here"); throw // ...
}

What does this code do? IO take 1 returns some non-blocking Future-like thing, which is then appended a transforming function through the map method. I.e. whenever (and wherever) IO take 1 is ready, it will apply the map on the result.

All of this happens in some other thread (or using some other way of implementing the non-blocking semantics), so there is no way for the trycatch to react on any Exceptions being thrown. Nor would the bs => println(…) … method know of your exception handling. All it knows it that it should transform some input bs and have a result when it’s finished.

The lesson to be learned: When using non-blocking code avoid side-effects. Especially so, if the side effects are being used to change the flow of execution.

In order to actually catch the exception, I think you’ll have to structure it as follows (untested; see API):

def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] =
  {
    println( "In process request")
    (
      for {
        bs <- IO take 1
      } yield {
        println("I'll get here")
        throw new Exception("Hey-o!")
        println("But not here ... as expected")
      }
    ) recover {
      case e: Exception => println("And here ...?"); IO.Done()
    }
  }
2012-04-04 07:10
by Debilski
Thanks for the very helpful response, it really clears things up - Russ Weeks 2012-04-10 18:26
Ads