package scalaz
package concurrent
import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch}
import Scalaz._
sealed class Promise[A](implicit val strategy: Strategy) extends Function0[A] {
import Promise._
private val latch = new CountDownLatch(1)
private val waiting = new ConcurrentLinkedQueue[A => Unit]
@volatile private var v: Promise.State[A] = Promise.Unfulfilled
@volatile private var borked: Boolean = false
private val e = actor[Signal[A]](_.eval)
def get = {
latch.await
v.get
}
def to(k: A => Unit): Unit = e ! new Cont(k, this)
def fulfill(a: => A): Unit = e ! new Done(a, this)
def fulfilled: Boolean = v.fulfilled
def threw: Boolean = v.threw
def broken = borked
def break {
borked = true
e ! new Break(this)
}
def map[B](f: A => B) = flatMap(a => Promise(f(a)))
def flatMap[B](f: A => Promise[B]) = {
val r = new Promise[B]()
to(a => f(a) to effect[B](b => r fulfill b))
r
}
def filter(p: A => Boolean): Promise[A] = {
val r = new Promise[A]()
to(a => promise(p(a)) to effect[Boolean](b => if (b) r fulfill a))
r
}
def apply = get
override def toString = "<promise>"
def spec[B](f: A => B, actual: Promise[A])(implicit equality: Equal[A]): Promise[B] = {
val speculation = this map f
actual flatMap (a => this flatMap (g => if (a === g) speculation else {
speculation.break
promise(f(a))
}))
}
}
trait Promises {
def promise[A](a: => A)(implicit s: Strategy): Promise[A] = Promise(a)
}
object Promise {
def apply[A](a: => A)(implicit s: Strategy): Promise[A] = {
val p = new Promise[A]()(s)
p.e ! new Done(a, p)
p
}
private sealed abstract class State[+A] {
def get: A
def fulfill[B>:A](a: => B, promise: Promise[B]): Unit
val fulfilled: Boolean
val threw: Boolean
def break(promise: Promise[_]): Unit
}
private class Thrown(e: Throwable) extends State[Nothing] {
def get: Nothing = throw e
def fulfill[B](a: => B, promise: Promise[B]) {
}
val fulfilled = true
val threw = true
def break(promise: Promise[_]) {}
}
private class Fulfilled[A](val get: A) extends State[A] {
def fulfill[B>:A](a: => B, promise: Promise[B]) {}
val fulfilled = true
val threw = false
def break(promise: Promise[_]) {}
}
private object Unfulfilled extends State[Nothing] {
def get: Nothing = throw new BrokenException
def fulfill[B](a: => B, promise: Promise[B]) {
if (!promise.borked) {
try {
promise.v = new Fulfilled(a)
promise.latch.countDown
val as = promise.waiting
while (!as.isEmpty) as.remove()(a)
} catch {
case e : Throwable => {
promise.v = new Thrown(e)
promise.latch.countDown
promise.waiting.clear
}
}
}
}
val fulfilled = false
val threw = false
def break(promise: Promise[_]) {
promise.latch.countDown
}
}
sealed class BrokenException extends Exception
private abstract sealed class Signal[+A] {
def eval: Unit
}
private class Done[+A](a: => A, promise: Promise[A]) extends Signal[A] {
def eval {
promise.v.fulfill(a, promise)
}
}
private class Cont[+A](k: A => Unit, promise: Promise[A]) extends Signal[A] {
def eval {
if (promise.v.fulfilled) k(promise.v.get)
else promise.waiting.offer(k)
}
}
private class Break(promise: Promise[_]) extends Signal[Nothing] {
def eval {
promise.v.break(promise)
}
}
}