diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala index f19648ec6865ff78c8c80d527f19616f6c956fef..6a0617cc063cd2615becf3dbc885fa8c0badcc07 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -27,6 +27,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private var yarnAllocator: YarnAllocationHandler = null + private var isFinished:Boolean = false def run() { @@ -68,10 +69,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // Wait for the user class to Finish userThread.join() - - // Finish the ApplicationMaster - finishApplicationMaster() - // TODO: Exit based on success/failure + System.exit(0) } @@ -124,17 +122,30 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } } - + private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader) .getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { - // Copy - var mainArgs: Array[String] = new Array[String](args.userArgs.size()) - args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size()) - mainMethod.invoke(null, mainArgs) + var successed = false + try { + // Copy + var mainArgs: Array[String] = new Array[String](args.userArgs.size()) + args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size()) + mainMethod.invoke(null, mainArgs) + // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR + // userThread will stop here unless it has uncaught exception thrown out + // It need shutdown hook to set SUCCEEDED + successed = true + } finally { + if (successed) { + ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) + } else { + ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED) + } + } } } t.start() @@ -179,7 +190,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e logInfo("All workers have launched.") // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout - if (userThread.isAlive){ + if (userThread.isAlive) { // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) @@ -197,7 +208,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e val t = new Thread { override def run() { - while (userThread.isAlive){ + while (userThread.isAlive) { val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning if (missingWorkerCount > 0) { logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") @@ -235,14 +246,23 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } */ - - def finishApplicationMaster() { + + def finishApplicationMaster(status: FinalApplicationStatus) { + + synchronized { + if (isFinished) { + return + } + isFinished = true + } + + logInfo("finishApplicationMaster with " + status) val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) - // TODO: Check if the application has failed or succeeded - finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED) + finishReq.setFinishApplicationStatus(status) resourceManager.finishApplicationMaster(finishReq) + } } @@ -256,7 +276,7 @@ object ApplicationMaster { private val ALLOCATOR_LOOP_WAIT_COUNT = 30 def incrementAllocatorLoop(by: Int) { val count = yarnAllocatorLoop.getAndAdd(by) - if (count >= ALLOCATOR_LOOP_WAIT_COUNT){ + if (count >= ALLOCATOR_LOOP_WAIT_COUNT) { yarnAllocatorLoop.synchronized { // to wake threads off wait ... yarnAllocatorLoop.notifyAll() @@ -291,14 +311,16 @@ object ApplicationMaster { logInfo("Invoking sc stop from shutdown hook") sc.stop() // best case ... - for (master <- applicationMasters) master.finishApplicationMaster + for (master <- applicationMasters) { + master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) + } } } ) } // Wait for initialization to complete and atleast 'some' nodes can get allocated yarnAllocatorLoop.synchronized { - while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT){ + while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) { yarnAllocatorLoop.wait(1000L) } } diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala index d723ab7b1ec67e8da81f92acfc0d3cfcce2e3843..c7dbcc6fbcaa6278d56389cacec6bf606cfe0870 100644 --- a/core/src/main/scala/spark/KryoSerializer.scala +++ b/core/src/main/scala/spark/KryoSerializer.scala @@ -210,6 +210,10 @@ class KryoSerializer extends spark.serializer.Serializer with Logging { val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] reg.registerClasses(kryo) } + + // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops + kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean) + kryo } diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala index 7545ecf86851ccc2ba2116ffb77ff5760aafb600..5e3c5e064fd34cd518560ca50def0987a1832549 100644 --- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala @@ -103,7 +103,9 @@ private[spark] class IndexPage(parent: MasterWebUI) { <td> <a href={"app?appId=" + app.id}>{app.id}</a> </td> - <td>{app.desc.name}</td> + <td> + <a href={app.appUiUrl}>{app.desc.name}</a> + </td> <td> {app.coresGranted} </td> diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 30a648f50b4d2ef8e7c77dcb32fc6df5de388829..64ed91f5a05c2cb583bd1fabd08ee340a98a64a9 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -257,6 +257,15 @@ class DAGScheduler( if (partitions.size == 0) { return } + + // Check to make sure we are not launching a task on a partition that does not exist. + val maxPartitions = finalRdd.partitions.length + partitions.find(p => p >= maxPartitions).foreach { p => + throw new IllegalArgumentException( + "Attempting to access a non-existent partition: " + p + ". " + + "Total number of partitions: " + maxPartitions) + } + val (toSubmit, waiter) = prepareJob( finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties) eventQueue.put(toSubmit) diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala index 36d9c472451272a404a4b5d502231b0a2313aab2..fa46e2487df13745bced2c5438de203c791817b5 100644 --- a/core/src/main/scala/spark/ui/UIUtils.scala +++ b/core/src/main/scala/spark/ui/UIUtils.scala @@ -31,7 +31,7 @@ private[spark] object UIUtils { <link rel="stylesheet" href="/static/webui.css" type="text/css" /> <link rel="stylesheet" href="/static/bootstrap-responsive.min.css" type="text/css" /> <script src="/static/sorttable.js"></script> - <title>{title}</title> + <title>{sc.appName} - {title}</title> <style type="text/css"> table.sortable thead {{ cursor: pointer; }} </style> diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index aa3ee5f5eea17273ada672806c8d572580a99c5a..7f7d4c821154eafc181fbcab6a2fe9c96b394eea 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -302,4 +302,10 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements") } } + + test("runJob on an invalid partition") { + intercept[IllegalArgumentException] { + sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false) + } + } } diff --git a/docs/configuration.md b/docs/configuration.md index 5a805109593372df236a28e0a35595738f219f37..5c06897cae428fffe635dfaa35ed4c33a463776b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -197,9 +197,19 @@ Apart from these, the following properties are also available, and may be useful (e.g. map functions) reference large objects in the driver program. </td> </tr> +<tr> + <td>spark.kryo.referenceTracking</td> + <td>true</td> + <td> + Whether to track references to the same object when serializing data with Kryo, which is + necessary if your object graphs have loops and useful for efficiency if they contain multiple + copies of the same object. Can be disabled to improve performance if you know this is not the + case. + </td> +</tr> <tr> <td>spark.kryoserializer.buffer.mb</td> - <td>32</td> + <td>2</td> <td> Maximum object size to allow within Kryo (the library needs to create a buffer at least as large as the largest single object you'll serialize). Increase this if you get a "buffer limit diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala index 21eb21276ec166d3524510376d290b45a2fd854b..4c18cbdc6bc32494e5fb8f82de7f67be13863f32 100644 --- a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala @@ -6,8 +6,10 @@ import scala.util.Sorting import spark.{HashPartitioner, Partitioner, SparkContext, RDD} import spark.storage.StorageLevel +import spark.KryoRegistrator import spark.SparkContext._ +import com.esotericsoftware.kryo.Kryo import org.jblas.{DoubleMatrix, SimpleBlas, Solve} @@ -91,15 +93,15 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l */ def train(ratings: RDD[(Int, Int, Double)]): MatrixFactorizationModel = { val numBlocks = if (this.numBlocks == -1) { - math.max(ratings.context.defaultParallelism, ratings.partitions.size) + math.max(ratings.context.defaultParallelism, ratings.partitions.size / 2) } else { this.numBlocks } val partitioner = new HashPartitioner(numBlocks) - val ratingsByUserBlock = ratings.map{ case (u, p, r) => (u % numBlocks, (u, p, r)) } - val ratingsByProductBlock = ratings.map{ case (u, p, r) => (p % numBlocks, (p, u, r)) } + val ratingsByUserBlock = ratings.map{ case (u, p, r) => (u % numBlocks, Rating(u, p, r)) } + val ratingsByProductBlock = ratings.map{ case (u, p, r) => (p % numBlocks, Rating(p, u, r)) } val (userInLinks, userOutLinks) = makeLinkRDDs(numBlocks, ratingsByUserBlock) val (productInLinks, productOutLinks) = makeLinkRDDs(numBlocks, ratingsByProductBlock) @@ -179,12 +181,12 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l * the users (or (blockId, (p, u, r)) for the products). We create these simultaneously to avoid * having to shuffle the (blockId, (u, p, r)) RDD twice, or to cache it. */ - private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, (Int, Int, Double))]) + private def makeLinkRDDs(numBlocks: Int, ratings: RDD[(Int, Rating)]) : (RDD[(Int, InLinkBlock)], RDD[(Int, OutLinkBlock)]) = { val grouped = ratings.partitionBy(new HashPartitioner(numBlocks)) val links = grouped.mapPartitionsWithIndex((blockId, elements) => { - val ratings = elements.map{case (k, t) => Rating(t._1, t._2, t._3)}.toArray + val ratings = elements.map{_._2}.toArray val inLinkBlock = makeInLinkBlock(numBlocks, ratings) val outLinkBlock = makeOutLinkBlock(numBlocks, ratings) Iterator.single((blockId, (inLinkBlock, outLinkBlock))) @@ -383,21 +385,30 @@ object ALS { train(ratings, rank, iterations, 0.01, -1) } + private class ALSRegistrator extends KryoRegistrator { + override def registerClasses(kryo: Kryo) { + kryo.register(classOf[Rating]) + } + } + def main(args: Array[String]) { - if (args.length != 5) { - println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir>") + if (args.length != 5 && args.length != 6) { + println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]") System.exit(1) } val (master, ratingsFile, rank, iters, outputDir) = (args(0), args(1), args(2).toInt, args(3).toInt, args(4)) + val blocks = if (args.length == 6) args(5).toInt else -1 System.setProperty("spark.serializer", "spark.KryoSerializer") + System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName) + System.setProperty("spark.kryo.referenceTracking", "false") System.setProperty("spark.locality.wait", "10000") val sc = new SparkContext(master, "ALS") val ratings = sc.textFile(ratingsFile).map { line => val fields = line.split(',') (fields(0).toInt, fields(1).toInt, fields(2).toDouble) } - val model = ALS.train(ratings, rank, iters) + val model = ALS.train(ratings, rank, iters, 0.01, blocks) model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") } .saveAsTextFile(outputDir + "/userFeatures") model.productFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }