package scalaz
package concurrent

import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.ConcurrentLinkedQueue
import Scalaz._
                  
final case class Actor[A](e: A => Unit, onError: Throwable => Unit = throw(_))(implicit val strategy: Strategy) {
  private val suspended = new AtomicBoolean(true)
  private val mbox = new ConcurrentLinkedQueue[A]

  private def work =
    if (!mbox.isEmpty && suspended.compareAndSet(true, false)) act(())

  val toEffect: Effect[A] = effect[A]((a) => this ! a)

  def !(a: A) {
    mbox offer a
    work
  }

  def apply(a: A) = this ! a
  
  private val act: Effect[Unit] = effect {(u: Unit) =>
    var i = 0
    while (i < 1000) {
      val m = mbox.poll
      if (m != null) try {
        e(m)
        i = i + 1
      } catch { case e => onError(e) }
      else i = 1000
    }
    suspended.set(true)
    work
  }
}

trait Actors {
  def actor[A](e: A => Unit, err: Throwable => Unit = throw(_))(implicit s: Strategy): Actor[A] = Actor[A](e,err)
  implicit def ActorFrom[A](a: Actor[A]): A => Unit = a ! _ 
}