package scalaz
package concurrent

import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.ConcurrentLinkedQueue
import Scalaz._
                  
sealed case class Actor[A](val e: A => Unit, val onError: Throwable => Unit = throw(_))(implicit val strategy: Strategy) { 
  private val suspended = new AtomicBoolean(true)
  private val mbox = new ConcurrentLinkedQueue[A]

  private def work = {
    val mt = mbox.isEmpty
    if (mt) () => ()
    else if (suspended.compareAndSet(!mt, false)) act ! (())
    else () => ()
  }

  val toEffect: Effect[A] = effect[A]((a) => this ! a)

  def !(a: A) = if (mbox offer a) work else toEffect ! a

  def apply(a: A) = this ! a
  
  private val act: Effect[Unit] = effect((u: Unit) => {
    var go = true
    var i = 0 
    while (go && i < 1000) {
      val m = mbox.poll
      if (m != null) try {
        e(m)
        i = i + 1
      } catch { case e => onError(e) }
      else {
        suspended.set(true)
        work
        go = false
      }
    }
    if (mbox.peek != null) act ! u else ()
  })
}

trait Actors {
  def actor[A](e: A => Unit, err: Throwable => Unit = throw(_))(implicit s: Strategy): Actor[A] = Actor[A](e,err)
  implicit def ActorFrom[A](a: Actor[A]): A => Unit = a ! _ 
}