Skip to content
Snippets Groups Projects
Commit 03d0b858 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Made use of spark.executor.memory setting consistent and documented it

Conflicts:

	core/src/main/scala/spark/SparkContext.scala
parent ccfe953a
No related branches found
No related tags found
No related merge requests found
...@@ -115,13 +115,14 @@ class SparkContext( ...@@ -115,13 +115,14 @@ class SparkContext(
// Environment variables to pass to our executors // Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]() private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) {
"SPARK_TESTING")) {
val value = System.getenv(key) val value = System.getenv(key)
if (value != null) { if (value != null) {
executorEnvs(key) = value executorEnvs(key) = value
} }
} }
// Since memory can be set with a system property too, use that
executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m"
if (environment != null) { if (environment != null) {
executorEnvs ++= environment executorEnvs ++= environment
} }
...@@ -156,14 +157,12 @@ class SparkContext( ...@@ -156,14 +157,12 @@ class SparkContext(
scheduler scheduler
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang. // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt val memoryPerSlaveInt = memoryPerSlave.toInt
val sparkMemEnv = System.getenv("SPARK_MEM") if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512
if (sparkMemEnvInt > memoryPerSlaveInt) {
throw new SparkException( throw new SparkException(
"Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format( "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sparkMemEnvInt)) memoryPerSlaveInt, SparkContext.executorMemoryRequested))
} }
val scheduler = new ClusterScheduler(this) val scheduler = new ClusterScheduler(this)
...@@ -881,6 +880,15 @@ object SparkContext { ...@@ -881,6 +880,15 @@ object SparkContext {
/** Find the JAR that contains the class of a particular object */ /** Find the JAR that contains the class of a particular object */
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass) def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
/** Get the amount of memory per executor requested through system properties or SPARK_MEM */
private[spark] val executorMemoryRequested = {
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
Option(System.getProperty("spark.executor.memory"))
.orElse(Option(System.getenv("SPARK_MEM")))
.map(Utils.memoryStringToMb)
.getOrElse(512)
}
} }
......
package spark.scheduler.cluster package spark.scheduler.cluster
import spark.Utils import spark.{SparkContext, Utils}
/** /**
* A backend interface for cluster scheduling systems that allows plugging in different ones under * A backend interface for cluster scheduling systems that allows plugging in different ones under
...@@ -14,14 +14,7 @@ private[spark] trait SchedulerBackend { ...@@ -14,14 +14,7 @@ private[spark] trait SchedulerBackend {
def defaultParallelism(): Int def defaultParallelism(): Int
// Memory used by each executor (in megabytes) // Memory used by each executor (in megabytes)
protected val executorMemory = { protected val executorMemory: Int = SparkContext.executorMemoryRequested
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
Option(System.getProperty("spark.executor.memory"))
.orElse(Option(System.getenv("SPARK_MEM")))
.map(Utils.memoryStringToMb)
.getOrElse(512)
}
// TODO: Probably want to add a killTask too // TODO: Probably want to add a killTask too
} }
...@@ -25,23 +25,25 @@ Inside `spark-env.sh`, you *must* set at least the following two variables: ...@@ -25,23 +25,25 @@ Inside `spark-env.sh`, you *must* set at least the following two variables:
* `SCALA_HOME`, to point to your Scala installation. * `SCALA_HOME`, to point to your Scala installation.
* `MESOS_NATIVE_LIBRARY`, if you are [running on a Mesos cluster](running-on-mesos.html). * `MESOS_NATIVE_LIBRARY`, if you are [running on a Mesos cluster](running-on-mesos.html).
In addition, there are four other variables that control execution. These can be set *either in `spark-env.sh` In addition, there are four other variables that control execution. These should be set *in the environment that
or in each job's driver program*, because they will automatically be propagated to workers from the driver. launches the job's driver program* instead of `spark-env.sh`, because they will be automatically propagated to
For a multi-user environment, we recommend setting the in the driver program instead of `spark-env.sh`, so workers. Setting these per-job instead of in `spark-env.sh` ensures that different jobs can have different settings
that different user jobs can use different amounts of memory, JVM options, etc. for these variables.
* `SPARK_MEM`, to set the amount of memory used per node (this should be in the same format as the
JVM's -Xmx option, e.g. `300m` or `1g`)
* `SPARK_JAVA_OPTS`, to add JVM options. This includes any system properties that you'd like to pass with `-D`. * `SPARK_JAVA_OPTS`, to add JVM options. This includes any system properties that you'd like to pass with `-D`.
* `SPARK_CLASSPATH`, to add elements to Spark's classpath. * `SPARK_CLASSPATH`, to add elements to Spark's classpath.
* `SPARK_LIBRARY_PATH`, to add search directories for native libraries. * `SPARK_LIBRARY_PATH`, to add search directories for native libraries.
* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the
JVM's -Xmx option, e.g. `300m` or `1g`. Note that this option will soon be deprecated in favor of
the `spark.executor.memory` system property, so we recommend using that in new code.
Note that if you do set these in `spark-env.sh`, they will override the values set by user programs, which Beware that if you do set these variables in `spark-env.sh`, they will override the values set by user programs,
is undesirable; you can choose to have `spark-env.sh` set them only if the user program hasn't, as follows: which is undesirable; if you prefer, you can choose to have `spark-env.sh` set them only if the user program
hasn't, as follows:
{% highlight bash %} {% highlight bash %}
if [ -z "$SPARK_MEM" ] ; then if [ -z "$SPARK_JAVA_OPTS" ] ; then
SPARK_MEM="1g" SPARK_JAVA_OPTS="-verbose:gc"
fi fi
{% endhighlight %} {% endhighlight %}
...@@ -55,10 +57,17 @@ val sc = new SparkContext(...) ...@@ -55,10 +57,17 @@ val sc = new SparkContext(...)
{% endhighlight %} {% endhighlight %}
Most of the configurable system properties control internal settings that have reasonable default values. However, Most of the configurable system properties control internal settings that have reasonable default values. However,
there are at least four properties that you will commonly want to control: there are at least five properties that you will commonly want to control:
<table class="table"> <table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td>spark.executor.memory</td>
<td>512m</td>
<td>
Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. `512m`, `2g`).
</td>
</tr>
<tr> <tr>
<td>spark.serializer</td> <td>spark.serializer</td>
<td>spark.JavaSerializer</td> <td>spark.JavaSerializer</td>
......
...@@ -106,9 +106,8 @@ permissions on your private key file, you can run `launch` with the ...@@ -106,9 +106,8 @@ permissions on your private key file, you can run `launch` with the
# Configuration # Configuration
You can edit `/root/spark/conf/spark-env.sh` on each machine to set Spark configuration options, such You can edit `/root/spark/conf/spark-env.sh` on each machine to set Spark configuration options, such
as JVM options and, most crucially, the amount of memory to use per machine (`SPARK_MEM`). as JVM options. This file needs to be copied to **every machine** to reflect the change. The easiest way to
This file needs to be copied to **every machine** to reflect the change. The easiest way to do this do this is to use a script we provide called `copy-dir`. First edit your `spark-env.sh` file on the master,
is to use a script we provide called `copy-dir`. First edit your `spark-env.sh` file on the master,
then run `~/spark-ec2/copy-dir /root/spark/conf` to RSYNC it to all the workers. then run `~/spark-ec2/copy-dir /root/spark/conf` to RSYNC it to all the workers.
The [configuration guide](configuration.html) describes the available configuration options. The [configuration guide](configuration.html) describes the available configuration options.
......
...@@ -157,9 +157,9 @@ their work directories), *not* on your driver program. ...@@ -157,9 +157,9 @@ their work directories), *not* on your driver program.
**Cache Size Tuning** **Cache Size Tuning**
One important configuration parameter for GC is the amount of memory that should be used for One important configuration parameter for GC is the amount of memory that should be used for caching RDDs.
caching RDDs. By default, Spark uses 66% of the configured memory (`SPARK_MEM`) to cache RDDs. This means that By default, Spark uses 66% of the configured executor memory (`spark.executor.memory` or `SPARK_MEM`) to
33% of memory is available for any objects created during task execution. cache RDDs. This means that 33% of memory is available for any objects created during task execution.
In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of
memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can call memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can call
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment