Note that this requires MongoDB 2.1+ (at the time of this writing, 2.1 is still the unstable release -- the aggregation framework will be in its first stable release in 2.2).  @rit is adding the aggregation framework to the Casbah DSL, which will make this method superfluous, but it is a good example of how to implement a mongodb command in scala before Casbah or the Java driver support it.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import com.mongodb.casbah.Imports._

/*
* Simple test of the MongoDB Aggregation Framework via Casbah
*
* [Note: only works on MongoDB 2.1+]
*
* The "students" collection consists of student test score records that [simplified] look like this:
*
{
"_id" : ObjectId("4f436e581a8819c9391c9e77"),
"id" : "1000000321",
"lname" : "Simpson",
"fname" : "Bobby",
"scores" : [
{
"score" : 14,
"dname" : "ACADEMICS PLUS SCHOOL DISTRICT",
"sname" : "Test School 6040702",
"tyear" : "2007",
"tname" : "Lit_Scale",
"grade" : "3"
},
{
"score" : 557,
"dname" : "ACADEMICS PLUS SCHOOL DISTRICT",
"sname" : "Test School 6040702",
"tyear" : "2007",
"tname" : "Lit_Perf",
"grade" : "3"
},
{
"score" : 15,
"dname" : "ACADEMICS PLUS SCHOOL DISTRICT",
"sname" : "Test School 6040702",
"tyear" : "2008",
"tname" : "Lit_Scale",
"grade" : "4"
}
]
}
*
* This example unwinds the scores and groups them
* to determine the unique values of the tests, years and grades fields.
*
*/

object AggregationFrameworkTest extends App {
  val mongoConn = MongoConnection()
  val db = mongoConn("hivedbmig")

  def aggregationResult(collectionName:String, pipeline: MongoDBList) = {
    db.command(MongoDBObject("aggregate" -> collectionName, "pipeline" -> pipeline)).get("result")
  }

  val pipebuilder = MongoDBList.newBuilder
  pipebuilder += MongoDBObject("$unwind" -> "$scores")
  pipebuilder += MongoDBObject("$group" -> MongoDBObject(
      "_id" -> "all", // constant, so we'll just create one bucket
      "tests" -> MongoDBObject("$addToSet" -> "$scores.tname"),
      "years" -> MongoDBObject("$addToSet" -> "$scores.tyear"),
      "grades" -> MongoDBObject("$addToSet" -> "$scores.grade")))
      
  val pipeline = pipebuilder.result()
  
  aggregationResult("students", pipeline) match {
    case list: BasicDBList => list.foreach(println(_))
    case _ => println("didn't work")
  }
  // { "_id" : "all" , "tests" : [ "ACT_Comp" , "FPQ4_Score" , "Lit_Growth" , "Math_Scale" , "Bio_Scale" , "Geo_Scale" , "Geo_Growth" , "Math_Growth" , "Alg_Scale" , "ACT_Math" , "ACT_Eng" , "Alg_Growth" , "Sci_Scale" , "Lit_Scale"] , "years" : [ "2011" , "2006" , "2008" , "2009" , "2007" , "2010"] , "grades" : [ "5" , "4" , "7" , "11" , "6" , "3" , "8"]}

}
comments powered by Disqus