Mark Hopkins
@antiselfdual
antiselfdual.com
20091230,GOOG,618.07,622.73,618.01,622.73,14663
20091231,GOOG,624.86,625.4,619.98,619.98,12202
20100104,GOOG,627.46,629.51,624.24,626.75,19579
20100105,GOOG,627.84,627.84,621.54,623.99,30078
20100106,GOOG,625.08,625.86,606.36,608.26,39806
...
Columns are: date, ticker symbol, opening, highest, lowest and closing prices, and volume of shares traded
Perform computations on this data
We want to be pretty flexible about the exact computation we perform
And do it in a single pass (there's a lot of data)
scala> (1 + 2) + 3 == 1 + (2 + 3) res0: Boolean = true scala> 9 + 0 res1: Int = 9 scala> 0 + 9 res2: Int = 9
scala> ("philip" + "k") + "dick" == "philip" + ("k" + "dick") res1: Boolean = true scala> "something" + "" res2: Boolean = "something" scala> "" + "something" res3: Boolean = "something"
scala> BigInt(30) gcd (BigInt(42) gcd BigInt(70)) == (BigInt(30) gcd (BigInt(42)) gcd BigInt(70) res0: Boolean = true scala> BigInt(345) gcd BigInt(0) res1: scala.math.BigInt = 345 scala> BigInt(0) gcd BigInt(345) res2: scala.math.BigInt = 345
What's similar in each case? In each we've got
Usually we call this the unit
In other words, the final result is independent of the way we "associate" the elements together, providing we keep them in the same order. This property is called associativity.
Something that satisfies this definition is called a monoid
Or a semigroup if we don't require a unit
mono is Greek for "one"
Because there's only one operation
As for the "-oid"...
...you'll have to ask the guy who came up with the name and popularised the concept
These last two are only a semigroup — not clear what the unit should be...
If we wanted extend to a monoid, for, say, the maximum on Int, we could:
123 ⊹ 456 = 123456
Now do the same in base 16, to build up binary data incrementally
Now do the same in binary, to build up bit flags incrementally
consists of constants LT, GT, EQ
EQ |+| x == x
LT |+| x == LT
GT |+| x == GT
This lets us to write chained comparisons with immaculate ease.
def compareUsers(a: User, b: User): Ordering = (a.followers ?|? b.followers) |+| (a.tweets ?|? b.tweets) |+| (a.name ?|? b.name)Note:
?|?
is just a little more scalaz syntactic sugar:
scala> 10 ?|? 12 res0: scalaz.Ordering = LT
the trivial monoid on type T.
Choose any z of type T and define x ⊹ y = z
eg. on Int we might have x ⊹ y = 0 for all x and y
This operation discards both operands.
These can actually be quite useful.
Tuples of monoids create new monoids:
scala> ("hi", List(3, 4, 5), 1.2) |+| (" Mum", List(7, 8, 9), 4.6) res0: (String, List[Int], Double) = (hi Mum,List(3, 4, 5, 7, 8, 9),5.8)
The |+|
function here is just scalaz syntactic sugar for append
.
If M is a monoid, then functions A => M are automatically a monoid:
implicit def pointwiseMonoid[A, M](implicit m: Monoid[M]) = new Monoid[A => M] { def zero = a => m.zero def append(f: A => M, g: A => M): A => M = a => m.append(f(a), g(a)) }
If V is a monoid, then Map[K, V] gets a monoid structure
scala> Map('finn -> 3, 'jake -> 4, 'beemo -> 22) |+| | Map('finn -> 1, 'princess_bubblegum -> 8, 'beemo -> -1) res0: scala.collection.immutable.Map[Symbol,Int] = Map( 'finn -> 4, 'princess_bubblegum -> 8, 'beemo -> 21, 'jake -> 4 )
Use "tags" (phantom types) to define a new monoid instance for a class that already has one
type Tagged[T] = {type Tag = T} type @@[+T, Tag] = T with Tagged[Tag]
sealed trait GCD implicit val GCDMonoid = new Monoid[BigInt @@ GCD] { def zero = Tag(BigInt(0)) def append(a: BigInt @@ GCD, b: => BigInt @@ GCD) = Tag(a gcd b) }
scala> GCD(16) |+| GCD(12) res0: scalaz.@@[BigInt,GCD] = 4
What are monoids about?
They abstract the notion of accumulation
Why are we using
def foldLeft[B](z: B)(op: (B, A) => B): B
In most cases, z is the unit of a monoid, and op is just collecting things onto an accumulator
val totalSize = collections foldMap (x => x.size)
instead of
val totalSize = collections foldLeft (0) ((a,x) => a + x.size)
If we define
def foldMap[B](f: A => B)(implicit F: Monoid[B]) = foldLeft[B](F.zero){ (b, a) => F.append(b, f(a)) }
foldMap is provided in scalaz's Foldable trait
Foldable instances are provided for subclasses of Iterable (and some scalaz types)
Why "foldMap"?
map(f): F[A] => F[M]
followed by a
fold: F[M] => M
flatMap(f)
conceptually does
map(f)
followed by flatten
.
Some kinds of calculations don't work as monoids. eg. averages
We can't combine two averages unless we know the size of the datasets they came from
But its the ratio of the sum and the count of elements, and they're both monoids...
Let's define a type class that allows some post-processing
abstract class Aggregate[T:Monoid] { type Result def result(a: T): Result }
And a helper method:
def aggregate(fa: F[A])(k: A => M) = fa.foldMap(k).result
Actually, it's a little messier:
def aggregate[F[_], A, M](fa: F[A])(k: A => M) (implicit f: Foldable[F], a: Aggregation[M], m: Monoid[M]): a.Result = a.result(fa.foldMap(k)(m))
Now let's implement Mean
case class Mean[N:Fractional](sum:N,count:Int) object Mean{ def apply[N:Fractional](n:N): Mean[N] = Mean(n, 1) }
implicit def MeanMonoid[N](implicit F:Fractional[N]) = new Monoid[Mean[N]] { import F._ def zero = Mean(F.zero, 0) def append(a:Mean[N], b: => Mean[N]) = Mean(a.sum + b.sum, a.count + b.count) }
implicit def MeanAggregation[N](implicit F: Fractional[N]) = new Aggregation[Mean[N]] { type Result = N import F._ def result(a: Mean[N]) = a.sum / fromInt(a.count) }
Now it works!
scala> aggregate(List[Double](1,2,3,4,5,6,7,8,9,10))(Mean(_)) res0: Double = 5.5
def filter[A, B:Monoid](p: A => Boolean)(f: A => B): A => B = a => if (p(a)) f(a) else implicitly[Monoid[B]].zero
scala> (1 to 30).toList.foldMap(filter (even) (_.toString)) res0: String = 24681012141618202224262830
def groupBy[A, K, M: Monoid](createKey: A => K, monoidValuedFunction: A => M): A => Map[K, M] = a => Map[K, M](createKey(a) -> monoidValuedFunction(a))
scala> val as = "monoids are a pretty simple concept really but are pretty handy in practice and find a wide range of pragmatic applications in programming".spl it("\\W").toList scala> as.foldMap(groupBy((_:String).head, a3(count, maxLength, minLength))) res0: Map[Char,(Int, aggregations.Aggregations.Max[Int], aggregations.Aggregations.Min[Int])] = Map(s -> (1,Max(Some(6)),Min(Some(6))), f -> (1,Max(Some(4)),Min(Some(4))), a -> (6,Max(Some(12)),Min(Some(1))), m -> (1,Max(Some(7)),Min(Some( 7))), i -> (2,Max(Some(2)),Min(Some(2))), b -> (1,Max(Some(3)),Min(Some(3))), p -> (5,Max(Some(11)),Min(Some(6))), c -> (1,Max(Some(7)),Min(Some(7))), h -> (1,Max(Some(5)),Min(Some(5))), r -> (2,Max(Some(6)),Min(Some(5))), w -> (1,Max(Some(4)),Min(Some(4))), o -> (1,Max(Some(2)),Min(Some(2))))
def a2[X,A,B](f: X => A,g: X => B): X => (A, B) = x => (f(x), g(x))
Similarly a3, a4, ...
scala> val l = List[Double](1, 2, 3, 4, 5, 6) res0: List[Double] = List(1.0, 2.0, 3.0, 4.0, 5.0, 6.0) scala> aggregate(l)( a2( filter((_:Double) >= 3) (sum), filter((_:Double) <= 4) (product))) ) res1: (Double, Double) = (18.0,24.0) scala> aggregate(l)( a3(max, min, mean) ) res1: (Double, Double, Double) = (6.0,1.0,3.5)
Instead of writing a2, a3 etc by hand...
Define a_{n} for all n using some Shapeless magic.
scala> val s = sum[Double] scala> aggregate(l)(flatten(s &&& s &&& s &&& s &&& s &&& s)) res0: (Double, Double, Double, Double, Double, Double) = (21.0,21.0,21.0,21.0,21.0,21.0)
&&& is an Arrow operator, it's just the same thing as our a2.
def linesOf(f: File) = new FileLineTraversable(f).view
class FileLineTraversable(file: File) extends Traversable[String] { def foreach[U](f: String => U) { val reader = new BufferedReader(new FileReader(file)) try { var line = reader.readLine() while (line != null) { f(line) line = reader.readLine() } } finally { reader.close() } } override def toString() = s"[lines of ${file.getAbsolutePath}]" }
case class Prices( date: LocalDate, ticker: String, open: BigDecimal, high: BigDecimal, low: BigDecimal, close: BigDecimal, volume: Int )
def parsePrices(line: String): Option[Prices] = { val csvColumn = "([^,]+)" val P = (List.fill(7)(csvColumn) mkString ",").r def option[T](t: => T) = catching(classOf[IllegalArgumentException]) opt t for { P(d,t,o,h,l,c,v) <- Some(line) date <- option(LocalDate.parse(d, YYYYMMdd)) ticker <- Some(t) open <- option(BigDecimal(o)) high <- option(BigDecimal(h)) low <- option(BigDecimal(l)) close <- option(BigDecimal(c)) volume <- option(v.toInt) } yield Prices(date, ticker, open, high, low, close, volume) }
def aggregateFile[M, P](file: File)(parse: String => Option[P])(f: P => M) (implicit a: Aggregation[M], m: Monoid[M]): a.Result = { val ps = for { line <- linesOf(file) p <- parse(line) } yield p aggregate(ps)(f) }
def aggregatePrices[M:Monoid:Aggregation](file:File)(f: Prices => M) = aggregateFile(file)(parsePrices)(f)
Range of opening and closing prices
val opening = (_:Prices).open val closing = (_:Prices).close
scala> aggregatePrices (f) ( (opening andThen range[BigDecimal]) &&& (closing andThen range[BigDecimal])) res0: ((_1.Result, _2.Result), (_1.Result, _2.Result)) = ((Some(1.36),Some(627.84)),(Some(1.35),Some(626.75)))
Range of opening and closing prices for Google
aggregatePrices(f)(filter (google) (range(opening) &&& range(closing)))
Range of opening and closing prices for Google, grouped by month
aggregatePrices(f)(filter (google) (groupBy (startOfMonth) (range(opening) &&& range(closing)))
Now that we've got this framework, it's not terribly difficult to adapt to another setting
def filesUnderDirectory(dir: File): Traversable[File] = ... val files = filesUnderDirectory(...).view
val size: File => Long = _.length aggregate (files) (size)
val objFile: File => Boolean = _.getName.endsWith(".o") aggregate (files) (filter (objFile) (size))
Get their total size, and zip them!
val (zipFile, totalSize) = aggregate(filter(objFile)(archive &&& size)
/
#