/*
   Copyright 2010 Aaron J. Radke

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
*/
package cc.drx

object Stat{  //Todo generallize this to a Typeclass of Statable that works on Doubles, Vec, and maybe others???
   // 2017-07-30("use the apply constructor instead","v0.2.11")
   // def from(ds:Iterable[Double]):Stat = apply(ds)
   // 2017-07-30("use Stat(xs map f) instead of Stat.by(xs)(f) (but make sure it is an traversable view intead of maying a new collection)","v0.2.15")
   // def by[A](as:Iterable[A])(f:A=>Double):Stat = (Stat() /: as){(stat,a) => stat + f(a)} //used in collection statsBy

   //TODO deprecate to use StatVec.apply instead??
   // def apply(ds:Iterable[Vec])(implicit dummy:DummyImplicit):StatVec = StatVec(ds)
   // def by[A](as:Iterable[A])(f:A=>Vec)(implicit dummy:DummyImplicit):StatVec = (StatVec() /: as){(stat,a) => stat + f(a)} //TODO make this guy work

   def apply = new Stat //TODO test if this helps the ETA expansion problem
   def empty = new Stat
   def apply(v:Double) = new Stat + v
   def harmonicMean(a:Double,b:Double):Double = 2.0*a*b/(a+b)

   def from(stringMap:StringMap):Option[Stat] = {
     for(count <- stringMap.get[Long]("count");
       mean  <- stringMap.get[Double]("mean");
       M2    <- stringMap.get[Double]("M2");
       min   <- stringMap.get[Double]("min");
       max   <- stringMap.get[Double]("max");
       last  =  stringMap.get[Double]("last")  getOrElse Double.NaN;
       first =  stringMap.get[Double]("first") getOrElse Double.NaN
     ) yield Stat(count,mean,M2,min,max,last,first)
   }


   //TODO make the normalizer an option type for more generality like the StatTest example
  // def norm[A](xs:Iterable[A],b:Bound[A])(f:A=>Double):Iterable[Double] = {val stat = Stat.by(xs)(f); xs map {x => stat norm f(x)}}
  /*
   def norm[A,K](xs:Iterable[A], keys:Iterable[K], get:(A,K)=>Double):Iterable[Map[K,Double]] = {
     val stats:Map[K,Stat] = keys mapWith {k => Stat.by(xs){x => get(x,k)} }
     def norms(x:A):Map[K,Double] = keys mapWith {k => stats(k) norm get(x,k)}
     xs map norms
   }
   */

   //--optimized single add loop without creating new objects and without checking for NaN //TODO add number of elements checks
   // def apply(it:Iterator[Numeric])(implicit d:DummyImplicit) = apply(it.map{_.toDouble})
   def apply(it:Iterator[Double]):Stat = {
     val s = new StatBuilder
     while(it.hasNext) s += it.next()
     s.get
   }
   def apply(xs:Iterable[Double]):Stat = apply(xs.iterator)
   def apply[A](xs:Iterable[A])(implicit num:Numeric[A]):Stat = apply(xs.iterator map num.toDouble)
   def apply[A](it:Iterator[A])(implicit num:Numeric[A]):Stat = apply(it map num.toDouble)


   //----generic helper funtions
   /**generalized jsd with auto minVariance based on number of sig digits for 1 sigma*/
   // https://en.wikipedia.org/wiki/Jensen%E2%80%93Shannon_divergence#Definition
   def jsd(stats:Iterable[Stat], sigDigits:Int=16):Double = {
     val m  = join(stats)
     val hs = stats.map{_.entropy(sigDigits)}
     m.entropy(sigDigits) - Stat(hs).mean
   }
   def join(stats:Iterable[Stat]):Stat = stats.foldLeft(Stat()){_ ++ _}

   val minVarianceDouble:Double = {
     // considering half
     // def xs(n:Int) = Seq.fill(n){-Eps.double} ++ Seq.fill(n){Eps.double} ++ Seq.fill(n){0d}
     // Stat(xs(100)).std      = 1.87E-16
     // Stat(xs(100)).variance = 3.5E-32   ~= Eps.double**2
     3.5E-32 //Treturn to double precision minVariance
     // 0.001 //min variance
   }


}
/**due to erasure, types within types need extra construction types*/
object StatN{
  /**An empty StatN needs to know the dimmension ahead of time*/
  def empty(n:Int):StatN = new StatN(Array.fill(n)(Stat.empty))

  /**fast construct with mutables under the hood*/
  // def apply(it:Iterator[Array[Double]])(implicit d:DummyImplicit):StatN = 
  //   apply(it.map(_.iterator))

  /**fast construct with mutables under the hood*/
  def apply(xs:Iterable[Iterable[Double]]):StatN = apply(xs.iterator.map{_.toArray})

  /**fast construct with mutables under the hood*/
  def apply(xs:Iterator[Iterable[Double]])(implicit d:DummyImplicit):StatN = apply(xs.map{_.toArray})
  def apply(xs:Iterable[Array[Double]])(implicit d:DummyImplicit):StatN = apply(xs.iterator)

  /**fast construct with mutables under the hood*/
  // def apply(it:Iterator[Iterable[Double]])(implicit d:DummyImplicit):StatN = {
  //   if(!it.hasNext) StatN.empty(0) //empty condition
  //   else {
  //     val x0 = it.next.toArray //array is needed only one the first pass to get a size
  //     val stats = new StatBuilderN(x0.size)
  //     stats ! x0
  //     while(it.hasNext) stats ! it.next
  //     stats.get
  //   }
  // }

  /**fast construct with mutables under the hood*/
  def apply(xss:Iterator[Array[Double]]):StatN = {
    if(xss.isEmpty) StatN.empty(0)
    else {
      var init = true;
      var stats:StatBuilderN = null //don't load it yet because we don't know the size
      xss.foreach{xs =>
        if(init){
          init = false
          stats = new StatBuilderN(xs.size)
        }
        stats ! xs
      }
      stats.get
    }
  }
}
class StatBuilderN(n:Int){
  private val stats = Array.fill(n)(new StatBuilder)
  def !(ds:Iterable[Double]) = for( (stat,d) <- stats zip ds) stat += d
  // def !(ds:Iterable[Double]) = {
  //   var i = 0;
  //   for( d <- ds){ stats(i) += d; i += 1}
  // }
  def get:StatN = new StatN(stats.map(_.get))
}
class StatN(val stats:Array[Stat]){// extends Iterable[Stat]{
  // def iterator = stats.iterator
  def apply(i:Int) = stats(i)
  def size = stats.size
  def +(ds:Iterable[Double]):StatN = {
    // require(this.size == ds.size)
    new StatN(stats zip ds map {case (s,d) => s+d})
  }
  def ++(that:StatN):StatN = {
    if(this.stats.size == 0) that
    else if(that.stats.size == 0) this
    else {
      require(this.size == that.size)
      new StatN(this.stats zip that.stats map {case (a,b) => a ++ b})
    }
  }
  // 2017-07-30("use StatN.stats.map{_.mean} instead", "dk")
  def means = stats.map{_.mean}
}

/**a mutable stat collector to provide memory efficient collections without intermediate classes to support iterators, and traversables, and custom values*/
class StatBuilder extends StatTrait[Double]{
  //--private mutables
  private[drx] var n:Long      = 0L //count of objects
  private[drx] var m:Double    = Double.NaN //mean
  private[drx] var m2:Double   = Double.NaN //moment
  private[drx] var mn:Double   = Double.NaN
  private[drx] var mx:Double   = Double.NaN
  private[drx] var last:Double = Double.NaN
  private[drx] var first:Double = Double.NaN

  //--methods
  //TODO maybe make this add thread safe so it behaves like an actor however the user should use it safely since the user here should be a more advanced user
  //TODO add kurtosis https://www.johndcook.com/blog/skewness_kurtosis
  2017-07-30("use += instead for more symantic mutations","do")
  def !(x:Double):Unit = +=(x)
  def +=(x:Double):Unit = {
    if(n == 0){n  = 1; m = x; m2 = 0; mn=x; mx = x; last=x; first=x}
    else{
      //Note: the order in this mutable mess(efficiency) is extremely important
      //--intermediates
      val nlast = n  //last n or (n-1)
      n += 1 //new count
      val d = x - m //delta
      val dn = d/n //delta/n
      // val dn2 = dn*dn //(delta/n)^2  //used for M4 calc
      val t  = d*dn*nlast  //term1 for a happier numeric stability
      //--updates
      m += dn //numerically happy increment versus m := m*p + x/n   //adjusted mean
      // m4 += t*dn2*(n*n-3*n+3) + 6*dn2*m2 - 4*dn*m3  //update the skewness moment
      // m3 += t*dn*(n-2) - 3*dn*m2  //update the skewness moment
      m2 += t  // numerically happy increment instead of m2 += (m-x).sq/((n-1)/n) //adjusted moment
      //--extremes
      if(x < mn) mn = x
      if(x > mx) mx = x
      //--setup next
      last = x; // the last value to be processed
    }
  }
  def get:Stat = Stat(count=n, mean=m, M2=m2, min=mn, max=mx, last=last,first=first)
  // def stat:Stat = get

  def count:Long  = n
  def mean:Double = m
  def max:Double  = mx
  def min:Double  = mn
  // def moment:Double = m2
  def variance:Double = if(n== 1) 0 else m2/(n-1)
  def std:Double  = variance.sqrt

  override def toString = get.toString
}


class StatRate extends StatBaseValue[Time]{
  private[drx] var lastTickTime:Long = System.nanoTime //keep the measures as Long
  protected val baseStat:StatBuilder = new StatBuilder  //note the explicit type annotation is required to prevent reduction to a plain Stat
  protected val baseValueBuilder = Time.BaseValueBuilderTime
  def tick:Unit = {
    val now:Long = System.nanoTime
    val dt:Long = now - lastTickTime
    baseStat += dt.toDouble/1E9 //nanosecods to the baseValue of time given in seconds
    lastTickTime = now
  }
}

//TODO use this mechanism for all stat types so even the basic Stat is really a Stat[Double], 
trait StatTrait[A]{
  //--required
  def count:Long
  def std:A
  def mean:A
  def min:A
  def max:A

  //--implmented
  // def s2 = std.sq
  // def stat:Stat = Stat(count=count, mean=mean, M2=s2*count-1d, min=min, max=max, last=g.mean,first=g.mean)

  override def toString = f"StatTrait(count:$count min:$min max:$max mean:$mean std:$std)"
}
trait StatBaseValue[A] extends StatTrait[A]{
  //--required
  protected def baseStat:StatTrait[Double]
  protected def baseValueBuilder:BaseValueBuilder[A]
  //--derived
  def count:Long = baseStat.count
  def std:A      = baseValueBuilder.apply(baseStat.std)
  def mean:A     = baseValueBuilder.apply(baseStat.mean)
  def min:A      = baseValueBuilder.apply(baseStat.min)
  def max:A      = baseValueBuilder.apply(baseStat.max)
}

//TODO make all Stats builder types so the extraction doesn't create overhead and isn't required, however side effects are then possible how to hide that fact??  
//TODO make a mutable/builder parent that does not use the mutable ! op?
class StatBuilderBaseValue[A](implicit b:BaseValueBuilder[A]) extends StatBaseValue[A]{
  protected val baseValueBuilder = b
  protected val baseStat:StatBuilder = new StatBuilder
  2017-07-30("use += instead for more symantic mutations","do")
  def !(x:A):Unit = +=(x)
  def +=(x:A):Unit = baseStat += baseValueBuilder.baseValueOf(x)
}

object StatVec{
   def apply():StatVec = new StatVec(Stat(),Stat(),Double.NaN)
   def apply(v:Vec):StatVec = new StatVec(Stat(v.x),Stat(v.y),0)
   //def applyFold(vs:Iterable[Vec]):StatVec = (StatVec() /: vs){(stat,v) => stat + v}
   // def apply(vs:Iterable[Vec]):StatVec = {
   //Note: IterableOnce is an Iterator like TraversableOnce is a Iterable and it is is an Iterator through `foreach` (it works for a wide range of applicable collections
   def apply(vs:Iterable[Vec]):StatVec = apply(vs.iterator)
   def apply(vs:Iterator[Vec]):StatVec = { //TODO 2d for now so z is ignored 
     val x = new StatBuilder
     val y = new StatBuilder
     var com:Double = 0d //co-moment or covariance S_xy
     for(v <- vs){
      //--use last mean delta's to update cov
      val n = x.n //last n
      val dx = if(n == 0) -v.x else (x.m - v.x)
      val dy = if(n == 0) -v.y else (y.m - v.y)
      val comUpdate = dx*dy*n/(n+1)
      com += comUpdate //co-moment //FIXME not calculating correctly
      //--update next means
      x += v.x
      y += v.y
     }
     new StatVec(x.get,y.get,com)
   }
}
//FIXME make the cov (via co-moment calculation correctly from diffrent schemes)
class StatVec(val x:Stat,val y:Stat, val com:Double/*co-moment*/){ //TODO 2d for now so z is ignored
   lazy val cov:Double = if(count == 1) 0 else com/(count-1) //sample covariance; bessel's correction for sample variance
   // lazy val covPopulation:Double = if(count == 1) 0 else com/count //population covariance
   lazy val variance:Vec = Vec(x.variance, y.variance)
   def sxx:Double = x.variance
   def syy:Double = y.variance
   def sxy:Double = cov
   private val nlast = count-1
   lazy val cor:Double = com / (nlast*x.std*y.std)
   lazy val slope:Double = sxy / sxx
   lazy val intercept:Double = y.mean - slope*x.mean
   def interpolate(xValue:Double):Double = intercept + slope*xValue
   lazy val std:Vec = Vec(x.std, y.std)
   lazy val mid:Vec = Vec(x.min, y.mid)
   lazy val max:Vec = Vec(x.max, y.max)
   lazy val min:Vec = Vec(x.min, y.min)
   lazy val mean:Vec = Vec(x.mean, y.mean)
   lazy val bound:Bound[Vec] = Bound(min,max)
   lazy val rect:Rect = Rect(min,max) //should this be Bound[Vec] or rect, for now we want more rect
   lazy val xBound:Bound[Double] = Bound(x.min,x.max)
   lazy val yBound:Bound[Double] = Bound(y.min,y.max)
   lazy val corr:Double = cov/(x.variance*y.variance) //correlation coefficient
   def count = x.count //x.count should equal y.count
   //assert(x.count == y.count)
   def +(v:Vec):StatVec = this ++ StatVec(v)
   def ++(that:StatVec):StatVec = {
      if     (this.count == 0) that //nothing to merge (use the other one)
      else if(that.count == 0) this //nothing to merge (use the other one)
      else{ //do the merge
        //--merge individual stats
        val mergedX = this.x ++ that.x
        val mergedY = this.y ++ that.y
        val n = mergedX.count //assert(mergedX.count == mergedY.count)
        //--calc cov stats
        val dx = that.x.mean - this.x.mean
        val dy = that.y.mean - this.y.mean
        // val scale = Stat.harmonicMean(this.count,that.count)/2.0 //is harmonic mean right?
        // val scale = this.count*that.count/(this.count + that.count)
        // val mergedCom = this.com + that.com + dxx*dyy*scale  // Wikipedia parallel algorithm for computing cov with co-moment iterative
        // val thisNan = this.com == Double.NaN 
        // val thatNan = that.com == Double.NaN 
        // val comSum = if(!thisNan && !thatNan) this.com + that.com else if(thisNan && thatNan) 0.0 else if(thatNan) this.com else that.com
        val mergedCom = this.com + that.com + dx*dy*this.count*that.count/n // john d. cook better numeric stability

        new StatVec(mergedX, mergedY, if(mergedCom.isNaN) 0 else mergedCom)
      }
   }
   lazy val covMatrix:Matrix.Matrix2 = Matrix(x.variance, cov, cov, y.variance)
   lazy val covEllipse = covMatrix.ellipse

   override def toString = f"StatVec(count:$count min:$min max:$max mean:$mean stddev:$std var:$variance cov:$cov"
}
case class Stat(
   count:Long=0L,
   mean:Double=Double.NaN,
   M2:Double=Double.NaN,
   min:Double=Double.NaN,
   max:Double=Double.NaN,
   last:Double=Double.NaN,
   first:Double=Double.NaN
) extends StatTrait[Double]{
   //---calculated values
   lazy val variance:Double = if(count == 1) 0 else M2/(count-1)
   lazy val std:Double = variance.sqrt
   lazy val mid:Double = (min + max)/2

   /**alias for standard deviation*/
   def s:Double = std
   /**alias for variance*/
   def s2:Double = variance
   /**alias for mean*/
   def mu:Double = mean


   //lazy val dist:Double = max - min
   2017-07-30("use bound.dist instead","v0.1.18")
   lazy val range:Double = max - min

   def parEquals(that:Stat):Boolean = this.count==that.count && this.mean==that.mean && this.M2==that.M2 && this.min==that.min && this.max==that.max

   /**max and min bound*/
   def bound:Bound[Double] = Bound(min,max)
   /**use Chebyshev's inequality to build around the specified fraction of data
    * where k = 1/sqrt(1-p) where where p = percentOfPopulation 
    * note this better matches unknown distributions of data
    * this calculation also does not require the `erf` calculation if the Gaussian distribution is assumed
    * wp:Standard_deviation
    * wp:Chebyshev%27s_inequality
    * */
   def boundFrac(populationFraction:Double):Bound[Double] = boundSigma(populationFraction.not.sqrt.inv)
   /**bound around the mean by +/- k standard deviations, but don't go beyond the max or min*/
   def boundSigma(kSigma:Double):Bound[Double] = Bound(mean-std*kSigma, mean+std*kSigma).satBy(bound)
   /**3 sigma bound*/
   def boundMost:Bound[Double] = boundSigma(3) //implement the mathematics to convert a std deviation to percent

   def norm(x:Double):Double = (x - min)/(max - min) //TODO include a feature like this in the Stat2 object

   def +(x:Double):Stat = this ++ Stat(
      count = if(x.isNaN) 0 else 1,
      mean = x,
      M2 = 0.0,
      min = x,
      max = x,
      last = x,
      first = x
   )
   def ++(that:Stat):Stat = {
      val (n,m,m2) = stat(this,that)
      Stat(
         count = n,
         mean = m,
         M2 = m2,
         min = if(this.min.isNaN) that.min else if(that.min.isNaN) this.min else if(this.min < that.min) this.min else that.min,
         max = if(this.max.isNaN) that.max else if(that.max.isNaN) this.max else if(this.max > that.max) this.max else that.max,
         last = that.last,
         first = this.first
      )
   }
   def nice = f"[$min%+.2f ($mean%+.2f+-$std%.2fs) $max%+.2f] n=$count%-8d"
   def toKson = s"count:$count min:$min max:$max mean:$mean stddev:$std M2:$M2 last:$last first:$first"
   override def toString = s"Stat($toKson)"

   private def stat(a:Stat, b:Stat):(Long,Double,Double)= {
      //returns (totalCount.toLong, totalMean, totalM2)
      val n = a.count + b.count
      if (n <= 0) (0,Double.NaN,0)
      else if (a.mean.isNaN && b.mean.isNaN) (0,Double.NaN,0) //empty empty
      else if (a.mean.isNaN) (b.count,b.mean,b.M2)  //empty a
      else if (b.mean.isNaN) (a.count,a.mean,a.M2)  //empty b
      else{
         val d = (b.mean - a.mean)
         val d2 = d*d
         val m = (a.mean*a.count +  b.mean*b.count)/n
         val m2 = a.M2 + b.M2 + d2*a.count*b.count/n
         //println(f"a.mean:${a.mean}  b.mean:${b.mean}  d:$d  m:$m  a.M2:${a.M2}  b.M2:${b.M2}  m2:$m2  n:$n")
         (n,m,m2)
      }
   }
   //--optimized add a single value without creating an internal dummy Stat object without checking for isNaN
   2017-07-30("use the Stat batch construtor instead","v0.2.15")
   def addFast(x:Double):Stat = {
     val n = count + 1 //new count
     val d = mean - x //delta
     val m = mean*count/n +  x/n  //adjusted mean
     val m2 = M2 + d*d/n*count

     val newMin = if(x < min) x else min
     val newMax = if(x > max) x else max

     Stat(count=n, mean=m, M2=m2, min=newMin, max=newMax, last=x, first=first)
   }


   //--- entropy / jsd measures
   /**min variance where sigDigits on the mean is 1 sigma */
   //auto sig digits by mean
   //ex 3 sig digits
   // 1000 -> 1
   //  100 -> 0.1
   //   10 -> 0.1
   //    1 -> 0.01
   def minVariance(sigDigits:Int):Double = {
     // val result = mean.sq / (10d**(2*sigDigits)) //  minStd = mean/(10**d); minVar => minStd^2
     val result = (mean / (10**sigDigits)).sq //  minStd = mean/(10**d); minVar => minStd^2
     if(result < Stat.minVarianceDouble) Stat.minVarianceDouble else result
   }

   /**used for easier this vs that binary op comparisons with a & b */
   private val a:Stat = this

   /** KL Divergence between two univariate Gaussian's [units of Nats]
    * url: https://en.wikipedia.org/wiki/Normal_distribution*/
   def kld(b:Stat):Double = {
     //--quick check (which also covers the double zero variance case
     if(a.variance == b.variance && a.mean == b.mean) 0d
     else if (a.variance == 0d) Double.NaN
     else if (b.variance == 0d) Double.NegativeInfinity
     else{
       //-- closed for solution derived at url: https://stats.stackexchange.com/a/7449/156248
       (b.std/a.std).ln + (a.variance+(a.mean-b.mean).sq)/b.variance*0.5 - 0.5
       //-- equivalent more involved closed for solution listed on url: https://en.wikipedia.org/wiki/Normal_distribution
       // 0.5d*((a.std/b.std).sq + (b.mean-a.mean).sq/b.variance - 1d + 2d*(b.std/a.std).ln)
     }
   }

   /** Jensen–Shannon divergence for a Gaussian
    * url: https://en.wikipedia.org/wiki/Jensen%E2%80%93Shannon_divergence */
   def jsdByKLD(b:Stat):Double = {
     // Stat.jsd( Seq(a,b) )
     val m = a ++ b
     0.5*(a kld m) + 0.5*(b kld m)
   }

   /**the jsd distance metric includes a sqrt term*/
   def jsDist(b:Stat):Double = Stat.jsd(Seq(a,b)).sqrt

   /**max Shannon entropy for a Gaussian
    * Default to a min variance of 16significant digits for floating point double
    * url: https://en.wikipedia.org/wiki/Entropy_(information_theory) 
    * */
   def entropy:Double = entropy(sigDigits=16)
   /**max Shannon entropy for a Gaussian
    * Note: entropy of zero variance is infinite.  But rarely is that the case, with Doubles, so use a user defined variance metric
    * url: https://en.wikipedia.org/wiki/Entropy_(information_theory) 
    * */
   def entropy(sigDigits:Int):Double = {
     val minS2 = a.minVariance(sigDigits)
     val s2 = if(a.variance < minS2) minS2 else a.variance
     0.5*(tau*E*s2).ln
     // + a.count.ln  //LDDP suggests adding this log(N) term, but this term dwarfs the second term... https://en.wikipedia.org/wiki/Limiting_density_of_discrete_points
   }

   def isZero:Boolean  = a.mean == 0d && a.variance == 0d
   def nonZero:Boolean = !isZero
}

//TODO check to see how much this is used by other libs
//TODO make a better histograming like mechanism
case class StatSet[A](
   items:Map[A,Long]=Map[A,Long]()
){
   def +(newItem:A):StatSet[A] = this ++ StatSet(
      items = Map(newItem->1L)
   )
   def ++(that:StatSet[A]):StatSet[A] = {
     val agg = scala.collection.mutable.Map.empty[A,Long]
     val allItems = this.items.toIterable ++ that.items.toIterable
     allItems.foreach{case (k,count) =>
       agg.get(k) match {
         case Some(n) => agg(k) = count + n
         case None    => agg(k) = count
       }
     }
     StatSet(agg.toMap)
   }
   lazy val sortedByFreq:Seq[(A,Long)] = items.toSeq.sortBy(_._2)
   lazy val size:Long = items.size
   lazy val keys:Seq[A] = items.keys.toSeq
}

object Hist{
  def empty[A] = new Hist[A](Map.empty[A,Long])
  def apply[A](xs:Iterable[A]):Hist[A] = {
    val hist = new HistBuilder[A]
    hist ++= xs
    hist.toHist
  }
  def apply[A](xs:Iterator[A]):Hist[A] = {
    val hist = new HistBuilder[A]
    hist ++= xs
    hist.toHist
  }
}
class HistBuilder[A]{ //TODO add a generic histogram train that makes the builder also a getter
  private val itemCount = TrieMap.empty[A,Long]
  private def add(x:A, count:Long):Unit = {
    itemCount += (x ->  (itemCount.getOrElse(x,0L) + count) )
    ()
  }

  /** add */
  def +=(x:A):Unit = add(x,1L)
  /** batch add */
  def ++=(xs:Iterable[A]):Unit = for(x <- xs) add(x,1L)
  /** batch add */
  def ++=(xs:Iterator[A]):Unit = for(x <- xs) add(x,1L)

  /** pull out an immutable hist result*/
  def toHist:Hist[A] = new Hist(itemCount.toMap)

  /** get the counted items for this element*/
  def apply(x:A):Long = itemCount.getOrElse(x,0L)
  /** get the bins*/
  def keys:Iterable[A] = itemCount.keys

  def iterator:Iterator[A] = itemCount.keysIterator

  def total:Long = itemCount.values.sum

  // def bins = itemCount
  // def size:Long = itemCount.size
}
/**histogram counter*/
class Hist[A](protected val itemCount:Map[A,Long]){
  //helper function for the intermediate add without boxing another Hist
  private def add(map:Map[A,Long], x:A, count:Long):Map[A,Long] =
    map + (x ->  (map.getOrElse(x,0L)+count) )
  /** add */
  def +(x:A):Hist[A] = new Hist(add(itemCount, x, 1L)) //TODO possibly optimize batch add with the HistBuilder
  /** batch add */
  def ++(xs:Iterable[A]):Hist[A] = new Hist(
    xs.foldLeft(itemCount){case (c,x) => add(c,x,1L)}
  )
  /** merge two statCounts*/
  def ++(that:Hist[A]):Hist[A] = new Hist(
    that.itemCount.foldLeft(this.itemCount){ //iterate through `that.itemCount` 
      case (c,(x,xc)) => add(c,x,xc)  //roll them into `this.itemCount`
    }
  )
  /** get the counted items for this element*/
  def apply(x:A):Long = itemCount.getOrElse(x,0L) //default should be zero entries

  def get(x:A):Option[Long] = itemCount.get(x)

  /** get the bins*/
  def keys:Iterable[A] = itemCount.keys

  def iterator = itemCount.iterator

  // def foreach //TODO use this to prevent extra keys from Iterable

  lazy val count:Long = if(itemCount.isEmpty) 0L else itemCount.values.sum
  lazy val maxCount:Long = if(itemCount.isEmpty) 0L else itemCount.values.max
  def bins = itemCount
}

//case class StatMaxN[Ordering[A]]() //TODO implement a keep the top N values or percent

/**Gaussian distribution $ τ^-k/2^ |Σ|^-1/2^ e^{ -1/2 (x-μ)' Σ^-1^ (x-u) } $  */
case class Gaussian(u:Double,s2:Double){
  lazy val s:Double = s2.sqrt
  def pdf(x:Double):Double = ((x-u).sq/2/s2).neg.exp / (tau * s2).sqrt
  def cdf(x:Double):Double = ((x-u)/(s2*2.sqrt)).erf.next/2
  def stat(n:Long):Stat = {
     val xSigma = 100d*s
     Stat(count=n, mean=u, M2=s2*n-1, min=u-xSigma, max=u+xSigma, last=u,first=u)
  }
}
/**2d Gaussian (ellipse like structure)
  * t:quantity u:mean s:std-dev */
case class Gaussian2(u:Vec,s:Vec,rho:Double=0d){
  /**t:quantity u:mean s:std-dev*/
  def pdf(t:Vec):Double = { //wikipedia 2d gaussian function pdf
    val qrho = 1.0 - rho*rho
    val sxsy = s.x*s.y
    val a = 1.0/(tau*sxsy*qrho.sqrt)
    val b = -1.0/(2*qrho);
    val dx:Double = t.x - u.x
    val dy:Double = t.y - u.y
    val q = dx*dx/(s.x*s.x)  -  2*rho*dx*dy/(sxsy)  +   dy*dy/(s.y*s.y)
    a*math.exp(b*q)
  }
  /**t:quantity u:mean s2:variance*/
  /*
  def pdf2Angle(t:Vec, u:Vec=Vec(0,0), s2:Vec=Vec(1,1),theta:Angle=Angle(0)):Double = { //wikipedia 2d gaussian function pdf
    val a = 1.0 //TODO what is the a amplitude value here??  integration under the 3d surface curve
    //--
    val cos2 = theta.cos2
    val sin2 = theta.sin2
    val sinDouble = (theta*2).sin
    val _a = (cos2/s2.x + sin2/s2.y)/2
    val _b = (s2.y.inv - s2.x.inv)/4*sinDouble
    val _c = (sin2/s2.x + cos2/s2.y)/2
    //--
    val dx:Double = t.x - u.x
    val dy:Double = t.y - u.y
    val q = _a*dx*dx - 2*_b*dx*dy + _c*dy*dy
    a*math.exp(-q)
  }
  */
}
/**Gaussian distribution $ τ^-k/2^ |Σ|^-1/2^ e^{ -1/2 (x-μ)' Σ^-1^ (x-u) } $  */
object Gaussian{
  def φ(x:Double):Double = pdf(x,0,1)
  def Φ(x:Double):Double = cdf(x,0,1)
  def cdf(x:Double):Double =  cdf(x,0,1)
  def cdf(x:Double,u:Double,s2:Double):Double =  ((x-u)/(s2*2.sqrt)).erf.next/2
  def pdf(x:Double):Double = pdf(x,0,1)
  def pdf(x:Double,u:Double,s2:Double):Double = ((x-u).sq/2/s2).neg.exp / (tau * s2).sqrt
  /**t:quantity u:mean s:std-dev*/
  def pdf2(t:Vec, u:Vec=Vec(0,0), s:Vec=Vec(1,1),rho:Double=0.0):Double = { //wikipedia 2d gaussian function pdf
    val qrho = 1.0 - rho*rho
    val sxsy = s.x*s.y
    val a = 1.0/(tau*sxsy*qrho.sqrt)
    val b = -1.0/(2*qrho);
    val dx:Double = t.x - u.x
    val dy:Double = t.y - u.y
    val q = dx*dx/(s.x*s.x)  -  2*rho*dx*dy/(sxsy)  +   dy*dy/(s.y*s.y)
    a*math.exp(b*q)
  }
}

object StudentizedRange {
  def cdf(q:Double, k:Int, v:Double):Double = coef(k,v)*intOuter(q,k,v)
  def icdf(p:Double,k:Int,v:Double):Double = Bound(0d,50d).bisect{(q:Double) => cdf(q,k,v) >= p }

  private def inner(u:Double, x:Double, q:Double, k:Int):Double = {
    val e = Gaussian.cdf(u) - Gaussian.cdf(u - q*x)
    Gaussian.pdf(u)*(e ** k.prev)
  }

  private def intInner(x:Double, q:Double, k:Int):Double = -10 ~ 10 take 100 integrate {inner(_, x, q, k)}

  private def outer(x:Double, q:Double, k:Int, v:Double) = x**v.prev * (-v*x.sq/2).exp * intInner(x,q,k)

  private def intOuter(q:Double, k:Int, v:Double) = 0d ~ 10d take 100 integrate {outer(_, q, k, v)}

  private def coef(k:Int, v:Double):Double = { val v2 = v/2d; (v ** v2 * k) / (2d ** v2.prev * v2.gamma) }
}

object Distinct{
  def empty = new Distinct(0)
  def apply(xs:Iterable[Any]):Distinct = empty ++ xs

  val depth = 64
  val correctionFactor = 0.79402
  val probability = 0.5**3

  val binDepth:Int =2
  val binCount:Int = (2**binDepth).toInt
  val binWidth:Int = depth/binCount
  val indexMask:Int = binCount - 1
  val binMask:Int = (2**binWidth - 1).toInt

  // def error =  ???
  // def maxCount =  ???

}
class Distinct(val hash:Long) extends AnyVal{
  import Distinct._

  def ones = (0 until depth) count hash.bit
  def zeros = depth - ones

  def count:Int = ones match {
    case 0 => 0  //depth => maxCount
    case i if i == depth => 10  //depth => maxCount
    case i => ((ones#/depth).not.log / probability.not.log * correctionFactor).round.toInt
  }
  private def bloom(x:Any):Long = {
    val h:Long = x.##.uniformHash & 0xFFFFFFFFL  //TODO directly use MurmurHash instead of going through ## hashCode

    val binIndex:Long = (h >> (32-binDepth)) & indexMask //use upper part of the hash
    val binValue:Long = h & binMask
    binValue << binWidth*binIndex
  }
  def mayContain(x:Any):Boolean = (this + x).hash == hash  //TODO think about the effectiveness of this as a bloom filter, it may not be useful. also is a doesNotContain work always?

  def +(x:Any):Distinct = if(hash == -1L/*already full*/) this else new Distinct(hash | bloom(x))
  def ++(that:Distinct):Distinct = new Distinct(this.hash | that.hash)
  def ++(xs:Iterable[Any]):Distinct = xs.foldLeft(this){_ + _}
  private def binaryString = hash.toByteArray map {b => (b & 0xffL).base(2,8)} mkString ""
  override def toString = f"Distinct(~$count, $ones%2d ones in $binaryString)"
}

class ArrayStatBuilder(n:Int){
  private var _stats = Array.fill(n)(new StatBuilder)
  def +=(xs:Array[Double]):Unit =
    _stats zip xs foreach { case (stat,x) => stat += x }
  def dist(xs:Array[Double]):Double = xs.dist(means)
  def means:Array[Double] = _stats.map{_.m} //FIXME optimize this call so it doesn't have to construct another stat
  def stats:Array[Stat] = _stats.map{_.get}

  def count:Long = _stats.head.count //not assumes n > =0
}
class Kmeans(k:Int, n:Int){
  private val _clusters = Array.fill(k)( new ArrayStatBuilder(n) )

  private var count:Long = 0L

  def +=(xs:Array[Double]):Unit = {
    if(count < k) _clusters(count.toInt) += xs
    else {
      val dists = _clusters.map(_ dist xs)//find closest cluster
      val i = dists.zipWithIndex.sortBy(_._1).head._2 //index to min dist cluster
      _clusters(i) += xs
    }
    count += 1
  }

  def clusters:Array[Array[Stat]] = _clusters.map{_.stats}
}