diff --git a/core/pom.xml b/core/pom.xml index 3d70a195841342103dc41a19ee261c498791cd60..53696367e984f88682071abf03001337f76a92f2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -106,6 +106,10 @@ <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </dependency> + <dependency> + <groupId>net.liftweb</groupId> + <artifactId>lift-json_2.9.2</artifactId> + </dependency> <dependency> <groupId>it.unimi.dsi</groupId> <artifactId>fastutil</artifactId> diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala index 6b71b953dd782947b2bbc61fadb85bd51fc7078a..f8dcf025b44ecc4e0c65bb367199939cce5cbc4e 100644 --- a/core/src/main/scala/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala @@ -17,7 +17,7 @@ package spark.deploy -import scala.util.parsing.json.{JSONArray, JSONObject, JSONType} +import net.liftweb.json.JsonDSL._ import spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} import spark.deploy.master.{ApplicationInfo, WorkerInfo} @@ -25,63 +25,62 @@ import spark.deploy.worker.ExecutorRunner private[spark] object JsonProtocol { + def writeWorkerInfo(obj: WorkerInfo) = { + ("id" -> obj.id) ~ + ("host" -> obj.host) ~ + ("port" -> obj.port) ~ + ("webuiaddress" -> obj.webUiAddress) ~ + ("cores" -> obj.cores) ~ + ("coresused" -> obj.coresUsed) ~ + ("memory" -> obj.memory) ~ + ("memoryused" -> obj.memoryUsed) ~ + ("state" -> obj.state.toString) + } - def writeWorkerInfo(obj: WorkerInfo): JSONType = JSONObject(Map( - "id" -> obj.id, - "host" -> obj.host, - "port" -> obj.port, - "webuiaddress" -> obj.webUiAddress, - "cores" -> obj.cores, - "coresused" -> obj.coresUsed, - "memory" -> obj.memory, - "memoryused" -> obj.memoryUsed, - "state" -> obj.state.toString - )) + def writeApplicationInfo(obj: ApplicationInfo) = { + ("starttime" -> obj.startTime) ~ + ("id" -> obj.id) ~ + ("name" -> obj.desc.name) ~ + ("cores" -> obj.desc.maxCores) ~ + ("user" -> obj.desc.user) ~ + ("memoryperslave" -> obj.desc.memoryPerSlave) ~ + ("submitdate" -> obj.submitDate.toString) + } - def writeApplicationInfo(obj: ApplicationInfo): JSONType = JSONObject(Map( - "starttime" -> obj.startTime, - "id" -> obj.id, - "name" -> obj.desc.name, - "cores" -> obj.desc.maxCores, - "user" -> obj.desc.user, - "memoryperslave" -> obj.desc.memoryPerSlave, - "submitdate" -> obj.submitDate.toString - )) + def writeApplicationDescription(obj: ApplicationDescription) = { + ("name" -> obj.name) ~ + ("cores" -> obj.maxCores) ~ + ("memoryperslave" -> obj.memoryPerSlave) ~ + ("user" -> obj.user) + } - def writeApplicationDescription(obj: ApplicationDescription): JSONType = JSONObject(Map( - "name" -> obj.name, - "cores" -> obj.maxCores, - "memoryperslave" -> obj.memoryPerSlave, - "user" -> obj.user - )) + def writeExecutorRunner(obj: ExecutorRunner) = { + ("id" -> obj.execId) ~ + ("memory" -> obj.memory) ~ + ("appid" -> obj.appId) ~ + ("appdesc" -> writeApplicationDescription(obj.appDesc)) + } - def writeExecutorRunner(obj: ExecutorRunner): JSONType = JSONObject(Map( - "id" -> obj.execId, - "memory" -> obj.memory, - "appid" -> obj.appId, - "appdesc" -> writeApplicationDescription(obj.appDesc) - )) + def writeMasterState(obj: MasterStateResponse) = { + ("url" -> ("spark://" + obj.uri)) ~ + ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ + ("cores" -> obj.workers.map(_.cores).sum) ~ + ("coresused" -> obj.workers.map(_.coresUsed).sum) ~ + ("memory" -> obj.workers.map(_.memory).sum) ~ + ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~ + ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ + ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) + } - def writeMasterState(obj: MasterStateResponse): JSONType = JSONObject(Map( - "url" -> ("spark://" + obj.uri), - "workers" -> obj.workers.toList.map(writeWorkerInfo), - "cores" -> obj.workers.map(_.cores).sum, - "coresused" -> obj.workers.map(_.coresUsed).sum, - "memory" -> obj.workers.map(_.memory).sum, - "memoryused" -> obj.workers.map(_.memoryUsed).sum, - "activeapps" -> JSONArray(obj.activeApps.toList.map(writeApplicationInfo)), - "completedapps" -> JSONArray(obj.completedApps.toList.map(writeApplicationInfo)) - )) - - def writeWorkerState(obj: WorkerStateResponse): JSONType = JSONObject(Map( - "id" -> obj.workerId, - "masterurl" -> obj.masterUrl, - "masterwebuiurl" -> obj.masterWebUiUrl, - "cores" -> obj.cores, - "coresused" -> obj.coresUsed, - "memory" -> obj.memory, - "memoryused" -> obj.memoryUsed, - "executors" -> JSONArray(obj.executors.toList.map(writeExecutorRunner)), - "finishedexecutors" -> JSONArray(obj.finishedExecutors.toList.map(writeExecutorRunner)) - )) + def writeWorkerState(obj: WorkerStateResponse) = { + ("id" -> obj.workerId) ~ + ("masterurl" -> obj.masterUrl) ~ + ("masterwebuiurl" -> obj.masterWebUiUrl) ~ + ("cores" -> obj.cores) ~ + ("coresused" -> obj.coresUsed) ~ + ("memory" -> obj.memory) ~ + ("memoryused" -> obj.memoryUsed) ~ + ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~ + ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner)) + } } diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala index 494a9b914d753ffe2d814f4fc2dea3aa673bcf1e..405a1ec3a60cafc50fa548c609fb6729fa9bb6f7 100644 --- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala @@ -17,7 +17,6 @@ package spark.deploy.master.ui -import scala.util.parsing.json.JSONType import scala.xml.Node import akka.dispatch.Await @@ -26,6 +25,8 @@ import akka.util.duration._ import javax.servlet.http.HttpServletRequest +import net.liftweb.json.JsonAST.JValue + import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import spark.deploy.JsonProtocol import spark.deploy.master.ExecutorInfo @@ -36,7 +37,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { implicit val timeout = parent.timeout /** Executor details for a particular application */ - def renderJson(request: HttpServletRequest): JSONType = { + def renderJson(request: HttpServletRequest): JValue = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, 30 seconds) diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala index 28e421e3bc1c60278bfb7cc9f56050a346257a39..2000211b983df44cfac5a27dd34cc7157c53ecba 100644 --- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala @@ -19,13 +19,14 @@ package spark.deploy.master.ui import javax.servlet.http.HttpServletRequest -import scala.util.parsing.json.JSONType import scala.xml.Node import akka.dispatch.Await import akka.pattern.ask import akka.util.duration._ +import net.liftweb.json.JsonAST.JValue + import spark.Utils import spark.deploy.DeployWebUI import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} @@ -37,7 +38,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { val master = parent.masterActorRef implicit val timeout = parent.timeout - def renderJson(request: HttpServletRequest): JSONType = { + def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, 30 seconds) JsonProtocol.writeMasterState(state) diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala index 02993d58a0ab1d4af240f79954de70acff0c41b3..b67059068bf1bac92f9295a144ed072734f10858 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala @@ -19,13 +19,14 @@ package spark.deploy.worker.ui import javax.servlet.http.HttpServletRequest -import scala.util.parsing.json.JSONType import scala.xml.Node import akka.dispatch.Await import akka.pattern.ask import akka.util.duration._ +import net.liftweb.json.JsonAST.JValue + import spark.Utils import spark.deploy.JsonProtocol import spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} @@ -38,7 +39,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) { val worker = parent.worker val timeout = parent.timeout - def renderJson(request: HttpServletRequest): JSONType = { + def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, 30 seconds) JsonProtocol.writeWorkerState(workerState) diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala index ba58f3572991517157bc0f7548069404bd1143fd..f66fe39905228e3167dabf33a1ab87740e7c01f5 100644 --- a/core/src/main/scala/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/spark/ui/JettyUtils.scala @@ -21,9 +21,10 @@ import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import scala.annotation.tailrec import scala.util.{Try, Success, Failure} -import scala.util.parsing.json.JSONType import scala.xml.Node +import net.liftweb.json.{JValue, pretty, render} + import org.eclipse.jetty.server.{Server, Request, Handler} import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler} import org.eclipse.jetty.util.thread.QueuedThreadPool @@ -38,8 +39,8 @@ private[spark] object JettyUtils extends Logging { type Responder[T] = HttpServletRequest => T // Conversions from various types of Responder's to jetty Handlers - implicit def jsonResponderToHandler(responder: Responder[JSONType]): Handler = - createHandler(responder, "text/json", (in: JSONType) => in.toString) + implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler = + createHandler(responder, "text/json", (in: JValue) => pretty(render(in))) implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler = createHandler(responder, "text/html", (in: Seq[Node]) => "<!DOCTYPE html>" + in.toString) diff --git a/pom.xml b/pom.xml index de883e2abce6a3113d92450789ac3118e80df056..85bcd8696c2faacff5398bbd479bf6eaed7bf522 100644 --- a/pom.xml +++ b/pom.xml @@ -260,6 +260,11 @@ <version>10.4.2.0</version> <scope>test</scope> </dependency> + <dependency> + <groupId>net.liftweb</groupId> + <artifactId>lift-json_2.9.2</artifactId> + <version>2.5</version> + </dependency> <dependency> <groupId>com.codahale.metrics</groupId> <artifactId>metrics-core</artifactId> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index fbeae277076cc3112c97b5e7709bea2c2ab4022d..5fdcf19b62401e7fb742bd545b75da0fb5c1819a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -182,6 +182,7 @@ object SparkBuild extends Build { "com.typesafe.akka" % "akka-slf4j" % "2.0.5" excludeAll(excludeNetty), "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", + "net.liftweb" % "lift-json_2.9.2" % "2.5", "org.apache.mesos" % "mesos" % "0.12.1", "io.netty" % "netty-all" % "4.0.0.Beta2", "org.apache.derby" % "derby" % "10.4.2.0" % "test",