diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 1069e27513aaf8c5d80a9136d1ed3bffc2dbfec8..80c65dfebd2ad1dd872c919ed9104bb91ed32d52 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -267,16 +267,20 @@ class SparkContext(
       localProperties.value = new Properties()
   }
 
-  def addLocalProperty(key: String, value: String) {
-    if(localProperties.value == null) {
+  def setLocalProperty(key: String, value: String) {
+    if (localProperties.value == null) {
       localProperties.value = new Properties()
     }
-    localProperties.value.setProperty(key,value)
+    if (value == null) {
+      localProperties.value.remove(key)
+    } else {
+      localProperties.value.setProperty(key, value)
+    }
   }
 
   /** Set a human readable description of the current job. */
-  def setDescription(value: String) {
-    addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
+  def setJobDescription(value: String) {
+    setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
   }
 
   // Post init
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
index b2d089f31d9d01c0399006cb8b9e54ceb536003d..2fc8a76a0575b868fad3f076bac068879fafdb42 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -17,19 +17,14 @@
 
 package spark.scheduler.cluster
 
-import java.io.{File, FileInputStream, FileOutputStream}
+import java.io.{File, FileInputStream, FileOutputStream, FileNotFoundException}
+import java.util.Properties
 
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.util.control.Breaks._
-import scala.xml._
+import scala.xml.XML
 
 import spark.Logging
 import spark.scheduler.cluster.SchedulingMode.SchedulingMode
 
-import java.util.Properties
 
 /**
  * An interface to build Schedulable tree
@@ -56,7 +51,7 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
 private[spark] class FairSchedulableBuilder(val rootPool: Pool)
   extends SchedulableBuilder with Logging {
 
-  val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
+  val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file")
   val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
   val DEFAULT_POOL_NAME = "default"
   val MINIMUM_SHARES_PROPERTY = "minShare"
@@ -69,39 +64,44 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
   val DEFAULT_WEIGHT = 1
 
   override def buildPools() {
+    if (schedulerAllocFile != null) {
     val file = new File(schedulerAllocFile)
-    if (file.exists()) {
-      val xml = XML.loadFile(file)
-      for (poolNode <- (xml \\ POOLS_PROPERTY)) {
-
-        val poolName = (poolNode \ POOL_NAME_PROPERTY).text
-        var schedulingMode = DEFAULT_SCHEDULING_MODE
-        var minShare = DEFAULT_MINIMUM_SHARE
-        var weight = DEFAULT_WEIGHT
-
-        val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
-        if (xmlSchedulingMode != "") {
-          try {
-            schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
-          } catch {
-            case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+      if (file.exists()) {
+        val xml = XML.loadFile(file)
+        for (poolNode <- (xml \\ POOLS_PROPERTY)) {
+
+          val poolName = (poolNode \ POOL_NAME_PROPERTY).text
+          var schedulingMode = DEFAULT_SCHEDULING_MODE
+          var minShare = DEFAULT_MINIMUM_SHARE
+          var weight = DEFAULT_WEIGHT
+
+          val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
+          if (xmlSchedulingMode != "") {
+            try {
+              schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
+            } catch {
+              case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
+            }
           }
-        }
 
-        val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
-        if (xmlMinShare != "") {
-          minShare = xmlMinShare.toInt
-        }
+          val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
+          if (xmlMinShare != "") {
+            minShare = xmlMinShare.toInt
+          }
 
-        val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
-        if (xmlWeight != "") {
-          weight = xmlWeight.toInt
-        }
+          val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
+          if (xmlWeight != "") {
+            weight = xmlWeight.toInt
+          }
 
-        val pool = new Pool(poolName, schedulingMode, minShare, weight)
-        rootPool.addSchedulable(pool)
-        logInfo("Create new pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
-          poolName, schedulingMode, minShare, weight))
+          val pool = new Pool(poolName, schedulingMode, minShare, weight)
+          rootPool.addSchedulable(pool)
+          logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+            poolName, schedulingMode, minShare, weight))
+        }
+      } else {
+        throw new java.io.FileNotFoundException(
+          "Fair scheduler allocation file not found: " + schedulerAllocFile)
       }
     }
 
@@ -110,7 +110,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
       val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
         DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
       rootPool.addSchedulable(pool)
-      logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
+      logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
         DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
     }
   }
@@ -127,7 +127,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool)
         parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
           DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
         rootPool.addSchedulable(parentPool)
-        logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
+        logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
           poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
       }
     }
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
index 97ea644021fff6c7c4490064850c3b5a3e73df4b..0dfb1a064ccd697f5baed9d01353f25bff46e637 100644
--- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -48,9 +48,9 @@ private[spark] object UIWorkloadGenerator {
 
     def setProperties(s: String) = {
       if(schedulingMode == SchedulingMode.FAIR) {
-        sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s)
+        sc.setLocalProperty("spark.scheduler.cluster.fair.pool", s)
       }
-      sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
+      sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
     }
 
     val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
index 66fd59e8bbee62853c13dc77e2ba2c0546307eae..a79b8bf256064e8834174ea1ba7ff916ff7c6d0a 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
@@ -57,23 +57,23 @@ object TaskThreadInfo {
  * 1. each thread contains one job.
  * 2. each job contains one stage.
  * 3. each stage only contains one task.
- * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure 
- *    it will get cpu core resource, and will wait to finished after user manually 
- *    release "Lock" and then cluster will contain another free cpu cores. 
- * 5. each task(pending) must use "sleep" to  make sure it has been added to taskSetManager queue, 
+ * 4. each task(launched) must be lanched orderly(using threadToStarted) to make sure
+ *    it will get cpu core resource, and will wait to finished after user manually
+ *    release "Lock" and then cluster will contain another free cpu cores.
+ * 5. each task(pending) must use "sleep" to  make sure it has been added to taskSetManager queue,
  *    thus it will be scheduled later when cluster has free cpu cores.
  */
 class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
 
   def createThread(threadIndex: Int, poolName: String, sc: SparkContext, sem: Semaphore) {
-   
+
     TaskThreadInfo.threadToRunning(threadIndex) = false
     val nums = sc.parallelize(threadIndex to threadIndex, 1)
     TaskThreadInfo.threadToLock(threadIndex) = new Lock()
     TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
     new Thread {
       if (poolName != null) {
-        sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
+        sc.setLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
       }
       override def run() {
         val ans = nums.map(number => {
@@ -88,7 +88,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
       }
     }.start()
   }
-  
+
   test("Local FIFO scheduler end-to-end test") {
     System.setProperty("spark.cluster.schedulingmode", "FIFO")
     sc = new SparkContext("local[4]", "test")
@@ -103,8 +103,8 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
     createThread(4,null,sc,sem)
     TaskThreadInfo.threadToStarted(4).await()
     // thread 5 and 6 (stage pending)must meet following two points
-    // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager 
-    //    queue before executing TaskThreadInfo.threadToLock(1).jobFinished() 
+    // 1. stages (taskSetManager) of jobs in thread 5 and 6 should be add to taskSetManager
+    //    queue before executing TaskThreadInfo.threadToLock(1).jobFinished()
     // 2. priority of stage in thread 5 should be prior to priority of stage in thread 6
     // So I just use "sleep" 1s here for each thread.
     // TODO: any better solution?
@@ -112,24 +112,24 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
     Thread.sleep(1000)
     createThread(6,null,sc,sem)
     Thread.sleep(1000)
-    
+
     assert(TaskThreadInfo.threadToRunning(1) === true)
     assert(TaskThreadInfo.threadToRunning(2) === true)
     assert(TaskThreadInfo.threadToRunning(3) === true)
     assert(TaskThreadInfo.threadToRunning(4) === true)
     assert(TaskThreadInfo.threadToRunning(5) === false)
     assert(TaskThreadInfo.threadToRunning(6) === false)
-    
+
     TaskThreadInfo.threadToLock(1).jobFinished()
     TaskThreadInfo.threadToStarted(5).await()
-    
+
     assert(TaskThreadInfo.threadToRunning(1) === false)
     assert(TaskThreadInfo.threadToRunning(2) === true)
     assert(TaskThreadInfo.threadToRunning(3) === true)
     assert(TaskThreadInfo.threadToRunning(4) === true)
     assert(TaskThreadInfo.threadToRunning(5) === true)
     assert(TaskThreadInfo.threadToRunning(6) === false)
-    
+
     TaskThreadInfo.threadToLock(3).jobFinished()
     TaskThreadInfo.threadToStarted(6).await()
 
@@ -139,7 +139,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
     assert(TaskThreadInfo.threadToRunning(4) === true)
     assert(TaskThreadInfo.threadToRunning(5) === true)
     assert(TaskThreadInfo.threadToRunning(6) === true)
-   
+
     TaskThreadInfo.threadToLock(2).jobFinished()
     TaskThreadInfo.threadToLock(4).jobFinished()
     TaskThreadInfo.threadToLock(5).jobFinished()
@@ -160,18 +160,18 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
     TaskThreadInfo.threadToStarted(20).await()
     createThread(30,"3",sc,sem)
     TaskThreadInfo.threadToStarted(30).await()
-   
+
     assert(TaskThreadInfo.threadToRunning(10) === true)
     assert(TaskThreadInfo.threadToRunning(20) === true)
     assert(TaskThreadInfo.threadToRunning(30) === true)
-    
+
     createThread(11,"1",sc,sem)
     TaskThreadInfo.threadToStarted(11).await()
     createThread(21,"2",sc,sem)
     TaskThreadInfo.threadToStarted(21).await()
     createThread(31,"3",sc,sem)
     TaskThreadInfo.threadToStarted(31).await()
-    
+
     assert(TaskThreadInfo.threadToRunning(11) === true)
     assert(TaskThreadInfo.threadToRunning(21) === true)
     assert(TaskThreadInfo.threadToRunning(31) === true)
@@ -185,19 +185,19 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
     assert(TaskThreadInfo.threadToRunning(12) === true)
     assert(TaskThreadInfo.threadToRunning(22) === true)
     assert(TaskThreadInfo.threadToRunning(32) === false)
-    
+
     TaskThreadInfo.threadToLock(10).jobFinished()
     TaskThreadInfo.threadToStarted(32).await()
-    
+
     assert(TaskThreadInfo.threadToRunning(32) === true)
 
-    //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager 
+    //1. Similar with above scenario, sleep 1s for stage of 23 and 33 to be added to taskSetManager
     //   queue so that cluster will assign free cpu core to stage 23 after stage 11 finished.
     //2. priority of 23 and 33 will be meaningless as using fair scheduler here.
     createThread(23,"2",sc,sem)
     createThread(33,"3",sc,sem)
     Thread.sleep(1000)
-    
+
     TaskThreadInfo.threadToLock(11).jobFinished()
     TaskThreadInfo.threadToStarted(23).await()
 
@@ -206,7 +206,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
 
     TaskThreadInfo.threadToLock(12).jobFinished()
     TaskThreadInfo.threadToStarted(33).await()
-    
+
     assert(TaskThreadInfo.threadToRunning(33) === true)
 
     TaskThreadInfo.threadToLock(20).jobFinished()
@@ -217,7 +217,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
     TaskThreadInfo.threadToLock(31).jobFinished()
     TaskThreadInfo.threadToLock(32).jobFinished()
     TaskThreadInfo.threadToLock(33).jobFinished()
-    
-    sem.acquire(11) 
+
+    sem.acquire(11)
   }
 }