I'm trying to use Akka to implement a TCP server for a custom application protocol. I'm trying to follow the example given here: http://doc.akka.io/docs/akka/2.0/scala/io.html to do non-blocking IO inside a for...yield loop.
I find that when I throw an exception from inside the yield block, I can't catch it from outside the block. I think I've got a fundamental misunderstanding of how Akka or Scala is working here and I'd appreciate any tips.
I've boiled down the code to this:
import akka.actor._
import java.net.InetSocketAddress
class EchoServer(port: Int) extends Actor {
val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher)
override def preStart {
IOManager(context.system) listen new InetSocketAddress(port)
}
def receive = {
case IO.NewClient(server) =>
val socket = server.accept()
state(socket) flatMap (_ => EchoServer.processRequest(socket))
case IO.Read(socket, bytes) =>
state(socket)(IO.Chunk(bytes))
case IO.Closed(socket, cause) =>
state(socket)(IO.EOF(None))
state -= socket
}
}
object EchoServer extends App
{
def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] =
{
println( "In process request")
try {
for {
bs <- IO take 1
} yield {
println("I'll get here")
throw new Exception("Hey-o!")
println("But not here ... as expected")
}
} catch {
case e: Exception => println("And not here ... wtf?"); IO.Done() // NEVER GETS HERE
}
}
ActorSystem().actorOf(Props(new EchoServer(8080)))
}
Maybe more convenient to follow the gist here: https://gist.github.com/2296554
Can anybody explain why control does not reach my catch block in this situation?
I noticed that if I turn on debug logging in Akka, I see this message in the output:
[DEBUG] [04/03/2012 22:42:25.106] [EchoServerActorSystem-akka.actor.default-dispatcher-1] [Future] Hey-o!
So I guess the exception is getting handled by the Akka dispatcher? Can anybody explain how that's possible?
The point of non-blocking IO is of course that there is no guarantee when and where it is executed. Remember that one can write the for comprehension as
(IO take 1).map(bs => {
println("I'll get here"); throw // ...
}
What does this code do? IO take 1
returns some non-blocking Future
-like thing, which is then appended a transforming function through the map
method. I.e. whenever (and wherever) IO take 1
is ready, it will apply the map
on the result.
All of this happens in some other thread (or using some other way of implementing the non-blocking semantics), so there is no way for the try
–catch
to react on any Exception
s being thrown. Nor would the bs => println(…) …
method know of your exception handling. All it knows it that it should transform some input bs
and have a result when it’s finished.
The lesson to be learned: When using non-blocking code avoid side-effects. Especially so, if the side effects are being used to change the flow of execution.
In order to actually catch the exception, I think you’ll have to structure it as follows (untested; see API):
def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] =
{
println( "In process request")
(
for {
bs <- IO take 1
} yield {
println("I'll get here")
throw new Exception("Hey-o!")
println("But not here ... as expected")
}
) recover {
case e: Exception => println("And here ...?"); IO.Done()
}
}