Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
42d20fa8
Commit
42d20fa8
authored
12 years ago
by
Reynold Xin
Browse files
Options
Downloads
Patches
Plain Diff
Added a method to report slave memory status.
parent
63fe4e9d
No related branches found
No related tags found
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
core/src/main/scala/spark/SparkContext.scala
+12
-3
12 additions, 3 deletions
core/src/main/scala/spark/SparkContext.scala
core/src/main/scala/spark/storage/BlockManagerMaster.scala
+76
-64
76 additions, 64 deletions
core/src/main/scala/spark/storage/BlockManagerMaster.scala
with
88 additions
and
67 deletions
core/src/main/scala/spark/SparkContext.scala
+
12
−
3
View file @
42d20fa8
...
...
@@ -45,7 +45,6 @@ import spark.scheduler.TaskScheduler
import
spark.scheduler.local.LocalScheduler
import
spark.scheduler.cluster.
{
SparkDeploySchedulerBackend
,
SchedulerBackend
,
ClusterScheduler
}
import
spark.scheduler.mesos.
{
CoarseMesosSchedulerBackend
,
MesosSchedulerBackend
}
import
spark.storage.BlockManagerMaster
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
...
...
@@ -199,7 +198,7 @@ class SparkContext(
parallelize
(
seq
,
numSlices
)
}
/**
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
...
...
@@ -400,7 +399,7 @@ class SparkContext(
new
Accumulable
(
initialValue
,
param
)
}
/**
/**
* Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
* reading it in distributed functions. The variable will be sent to each cluster only once.
*/
...
...
@@ -426,6 +425,16 @@ class SparkContext(
logInfo
(
"Added file "
+
path
+
" at "
+
key
+
" with timestamp "
+
addedFiles
(
key
))
}
/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
*/
def
getSlavesMemoryStatus
:
Map
[
String
,
(
Long
,
Long
)]
=
{
env
.
blockManager
.
master
.
getMemoryStatus
.
map
{
case
(
blockManagerId
,
mem
)
=>
(
blockManagerId
.
ip
+
":"
+
blockManagerId
.
port
,
mem
)
}
}
/**
* Clear the job's list of files added by `addFile` so that they do not get donwloaded to
* any new nodes.
...
...
This diff is collapsed.
Click to expand it.
core/src/main/scala/spark/storage/BlockManagerMaster.scala
+
76
−
64
View file @
42d20fa8
...
...
@@ -71,76 +71,79 @@ object HeartBeat {
Some
((
h
.
blockManagerId
,
h
.
blockId
,
h
.
storageLevel
,
h
.
memSize
,
h
.
diskSize
))
}
}
private
[
spark
]
case
class
GetLocations
(
blockId
:
String
)
extends
ToBlockManagerMaster
private
[
spark
]
case
class
GetLocationsMultipleBlockIds
(
blockIds
:
Array
[
String
])
extends
ToBlockManagerMaster
private
[
spark
]
case
class
GetPeers
(
blockManagerId
:
BlockManagerId
,
size
:
Int
)
extends
ToBlockManagerMaster
private
[
spark
]
case
class
RemoveHost
(
host
:
String
)
extends
ToBlockManagerMaster
private
[
spark
]
case
object
StopBlockManagerMaster
extends
ToBlockManagerMaster
private
[
spark
]
case
object
GetMemoryStatus
extends
ToBlockManagerMaster
private
[
spark
]
class
BlockManagerMasterActor
(
val
isLocal
:
Boolean
)
extends
Actor
with
Logging
{
class
BlockManagerInfo
(
val
blockManagerId
:
BlockManagerId
,
timeMs
:
Long
,
val
maxMem
:
Long
)
{
private
var
lastSeenMs
=
timeMs
private
var
remainingMem
=
maxMem
private
val
blocks
=
new
JHashMap
[
String
,
StorageLevel
]
private
var
_
lastSeenMs
=
timeMs
private
var
_
remainingMem
=
maxMem
private
val
_
blocks
=
new
JHashMap
[
String
,
StorageLevel
]
logInfo
(
"Registering block manager %s:%d with %s RAM"
.
format
(
blockManagerId
.
ip
,
blockManagerId
.
port
,
Utils
.
memoryBytesToString
(
maxMem
)))
def
updateLastSeenMs
()
{
lastSeenMs
=
System
.
currentTimeMillis
()
/
1000
_
lastSeenMs
=
System
.
currentTimeMillis
()
/
1000
}
def
updateBlockInfo
(
blockId
:
String
,
storageLevel
:
StorageLevel
,
memSize
:
Long
,
diskSize
:
Long
)
:
Unit
=
synchronized
{
updateLastSeenMs
()
if
(
blocks
.
containsKey
(
blockId
))
{
if
(
_
blocks
.
containsKey
(
blockId
))
{
// The block exists on the slave already.
val
originalLevel
:
StorageLevel
=
blocks
.
get
(
blockId
)
val
originalLevel
:
StorageLevel
=
_
blocks
.
get
(
blockId
)
if
(
originalLevel
.
useMemory
)
{
remainingMem
+=
memSize
_
remainingMem
+=
memSize
}
}
if
(
storageLevel
.
isValid
)
{
// isValid means it is either stored in-memory or on-disk.
blocks
.
put
(
blockId
,
storageLevel
)
_
blocks
.
put
(
blockId
,
storageLevel
)
if
(
storageLevel
.
useMemory
)
{
remainingMem
-=
memSize
_
remainingMem
-=
memSize
logInfo
(
"Added %s in memory on %s:%d (size: %s, free: %s)"
.
format
(
blockId
,
blockManagerId
.
ip
,
blockManagerId
.
port
,
Utils
.
memoryBytesToString
(
memSize
),
Utils
.
memoryBytesToString
(
remainingMem
)))
Utils
.
memoryBytesToString
(
_
remainingMem
)))
}
if
(
storageLevel
.
useDisk
)
{
logInfo
(
"Added %s on disk on %s:%d (size: %s)"
.
format
(
blockId
,
blockManagerId
.
ip
,
blockManagerId
.
port
,
Utils
.
memoryBytesToString
(
diskSize
)))
}
}
else
if
(
blocks
.
containsKey
(
blockId
))
{
}
else
if
(
_
blocks
.
containsKey
(
blockId
))
{
// If isValid is not true, drop the block.
val
originalLevel
:
StorageLevel
=
blocks
.
get
(
blockId
)
blocks
.
remove
(
blockId
)
val
originalLevel
:
StorageLevel
=
_
blocks
.
get
(
blockId
)
_
blocks
.
remove
(
blockId
)
if
(
originalLevel
.
useMemory
)
{
remainingMem
+=
memSize
_
remainingMem
+=
memSize
logInfo
(
"Removed %s on %s:%d in memory (size: %s, free: %s)"
.
format
(
blockId
,
blockManagerId
.
ip
,
blockManagerId
.
port
,
Utils
.
memoryBytesToString
(
memSize
),
Utils
.
memoryBytesToString
(
remainingMem
)))
Utils
.
memoryBytesToString
(
_
remainingMem
)))
}
if
(
originalLevel
.
useDisk
)
{
logInfo
(
"Removed %s on %s:%d on disk (size: %s)"
.
format
(
...
...
@@ -149,20 +152,14 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
}
def
getLastSeenMs
:
Long
=
{
return
lastSeenMs
}
def
getRemainedMem
:
Long
=
{
return
remainingMem
}
def
remainingMem
:
Long
=
_remainingMem
override
def
toString
:
String
=
{
return
"BlockManagerInfo "
+
timeMs
+
" "
+
remainingMem
}
def
lastSeenMs
:
Long
=
_lastSeenMs
override
def
toString
:
String
=
"BlockManagerInfo "
+
timeMs
+
" "
+
_remainingMem
def
clear
()
{
blocks
.
clear
()
_
blocks
.
clear
()
}
}
...
...
@@ -170,7 +167,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
private
val
blockInfo
=
new
JHashMap
[
String
,
Pair
[
Int
,
HashSet
[
BlockManagerId
]]]
initLogging
()
def
removeHost
(
host
:
String
)
{
logInfo
(
"Trying to remove the host: "
+
host
+
" from BlockManagerMaster."
)
logInfo
(
"Previous hosts: "
+
blockManagerInfo
.
keySet
.
toSeq
)
...
...
@@ -197,7 +194,10 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case
GetPeers
(
blockManagerId
,
size
)
=>
getPeersDeterministic
(
blockManagerId
,
size
)
/*getPeers(blockManagerId, size)*/
case
GetMemoryStatus
=>
getMemoryStatus
case
RemoveHost
(
host
)
=>
removeHost
(
host
)
sender
!
true
...
...
@@ -207,10 +207,18 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender
!
true
context
.
stop
(
self
)
case
other
=>
case
other
=>
logInfo
(
"Got unknown message: "
+
other
)
}
// Return a map from the block manager id to max memory and remaining memory.
private
def
getMemoryStatus
()
{
val
res
=
blockManagerInfo
.
map
{
case
(
blockManagerId
,
info
)
=>
(
blockManagerId
,
(
info
.
maxMem
,
info
.
remainingMem
))
}.
toMap
sender
!
res
}
private
def
register
(
blockManagerId
:
BlockManagerId
,
maxMemSize
:
Long
)
{
val
startTimeMs
=
System
.
currentTimeMillis
()
val
tmp
=
" "
+
blockManagerId
+
" "
...
...
@@ -224,25 +232,25 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
logDebug
(
"Got in register 1"
+
tmp
+
Utils
.
getUsedTimeMs
(
startTimeMs
))
sender
!
true
}
private
def
heartBeat
(
blockManagerId
:
BlockManagerId
,
blockId
:
String
,
storageLevel
:
StorageLevel
,
memSize
:
Long
,
diskSize
:
Long
)
{
val
startTimeMs
=
System
.
currentTimeMillis
()
val
tmp
=
" "
+
blockManagerId
+
" "
+
blockId
+
" "
if
(
blockId
==
null
)
{
blockManagerInfo
(
blockManagerId
).
updateLastSeenMs
()
logDebug
(
"Got in heartBeat 1"
+
tmp
+
" used "
+
Utils
.
getUsedTimeMs
(
startTimeMs
))
sender
!
true
}
blockManagerInfo
(
blockManagerId
).
updateBlockInfo
(
blockId
,
storageLevel
,
memSize
,
diskSize
)
var
locations
:
HashSet
[
BlockManagerId
]
=
null
if
(
blockInfo
.
containsKey
(
blockId
))
{
locations
=
blockInfo
.
get
(
blockId
).
_2
...
...
@@ -250,19 +258,19 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
locations
=
new
HashSet
[
BlockManagerId
]
blockInfo
.
put
(
blockId
,
(
storageLevel
.
replication
,
locations
))
}
if
(
storageLevel
.
isValid
)
{
locations
+=
blockManagerId
}
else
{
locations
.
remove
(
blockManagerId
)
}
if
(
locations
.
size
==
0
)
{
blockInfo
.
remove
(
blockId
)
}
sender
!
true
}
private
def
getLocations
(
blockId
:
String
)
{
val
startTimeMs
=
System
.
currentTimeMillis
()
val
tmp
=
" "
+
blockId
+
" "
...
...
@@ -270,7 +278,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
if
(
blockInfo
.
containsKey
(
blockId
))
{
var
res
:
ArrayBuffer
[
BlockManagerId
]
=
new
ArrayBuffer
[
BlockManagerId
]
res
.
appendAll
(
blockInfo
.
get
(
blockId
).
_2
)
logDebug
(
"Got in getLocations 1"
+
tmp
+
" as "
+
res
.
toSeq
+
" at "
logDebug
(
"Got in getLocations 1"
+
tmp
+
" as "
+
res
.
toSeq
+
" at "
+
Utils
.
getUsedTimeMs
(
startTimeMs
))
sender
!
res
.
toSeq
}
else
{
...
...
@@ -279,7 +287,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
sender
!
res
}
}
private
def
getLocationsMultipleBlockIds
(
blockIds
:
Array
[
String
])
{
def
getLocations
(
blockId
:
String
)
:
Seq
[
BlockManagerId
]
=
{
val
tmp
=
blockId
...
...
@@ -295,7 +303,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
return
res
.
toSeq
}
}
logDebug
(
"Got in getLocationsMultipleBlockIds "
+
blockIds
.
toSeq
)
var
res
:
ArrayBuffer
[
Seq
[
BlockManagerId
]]
=
new
ArrayBuffer
[
Seq
[
BlockManagerId
]]
for
(
blockId
<-
blockIds
)
{
...
...
@@ -316,7 +324,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
sender
!
res
.
toSeq
}
private
def
getPeersDeterministic
(
blockManagerId
:
BlockManagerId
,
size
:
Int
)
{
var
peers
:
Array
[
BlockManagerId
]
=
blockManagerInfo
.
keySet
.
toArray
var
res
:
ArrayBuffer
[
BlockManagerId
]
=
new
ArrayBuffer
[
BlockManagerId
]
...
...
@@ -362,7 +370,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
logInfo
(
"Connecting to BlockManagerMaster: "
+
url
)
masterActor
=
actorSystem
.
actorFor
(
url
)
}
def
stop
()
{
if
(
masterActor
!=
null
)
{
communicate
(
StopBlockManagerMaster
)
...
...
@@ -389,7 +397,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
throw
new
SparkException
(
"Error reply received from BlockManagerMaster"
)
}
}
def
notifyADeadHost
(
host
:
String
)
{
communicate
(
RemoveHost
(
host
+
":"
+
DEFAULT_MANAGER_PORT
))
logInfo
(
"Removed "
+
host
+
" successfully in notifyADeadHost"
)
...
...
@@ -409,7 +417,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
val
startTimeMs
=
System
.
currentTimeMillis
()
val
tmp
=
" msg "
+
msg
+
" "
logDebug
(
"Got in syncRegisterBlockManager 0 "
+
tmp
+
Utils
.
getUsedTimeMs
(
startTimeMs
))
try
{
communicate
(
msg
)
logInfo
(
"BlockManager registered successfully @ syncRegisterBlockManager"
)
...
...
@@ -421,19 +429,19 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return
false
}
}
def
mustHeartBeat
(
msg
:
HeartBeat
)
{
while
(!
syncHeartBeat
(
msg
))
{
logWarning
(
"Failed to send heartbeat"
+
msg
)
Thread
.
sleep
(
REQUEST_RETRY_INTERVAL_MS
)
}
}
def
syncHeartBeat
(
msg
:
HeartBeat
)
:
Boolean
=
{
val
startTimeMs
=
System
.
currentTimeMillis
()
val
tmp
=
" msg "
+
msg
+
" "
logDebug
(
"Got in syncHeartBeat "
+
tmp
+
" 0 "
+
Utils
.
getUsedTimeMs
(
startTimeMs
))
try
{
communicate
(
msg
)
logDebug
(
"Heartbeat sent successfully"
)
...
...
@@ -445,7 +453,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return
false
}
}
def
mustGetLocations
(
msg
:
GetLocations
)
:
Seq
[
BlockManagerId
]
=
{
var
res
=
syncGetLocations
(
msg
)
while
(
res
==
null
)
{
...
...
@@ -455,7 +463,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
return
res
}
def
syncGetLocations
(
msg
:
GetLocations
)
:
Seq
[
BlockManagerId
]
=
{
val
startTimeMs
=
System
.
currentTimeMillis
()
val
tmp
=
" msg "
+
msg
+
" "
...
...
@@ -488,13 +496,13 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
return
res
}
def
syncGetLocationsMultipleBlockIds
(
msg
:
GetLocationsMultipleBlockIds
)
:
Seq
[
Seq
[
BlockManagerId
]]
=
{
val
startTimeMs
=
System
.
currentTimeMillis
val
tmp
=
" msg "
+
msg
+
" "
logDebug
(
"Got in syncGetLocationsMultipleBlockIds 0 "
+
tmp
+
Utils
.
getUsedTimeMs
(
startTimeMs
))
try
{
val
answer
=
askMaster
(
msg
).
asInstanceOf
[
Seq
[
Seq
[
BlockManagerId
]]]
if
(
answer
!=
null
)
{
...
...
@@ -512,7 +520,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return
null
}
}
def
mustGetPeers
(
msg
:
GetPeers
)
:
Seq
[
BlockManagerId
]
=
{
var
res
=
syncGetPeers
(
msg
)
while
((
res
==
null
)
||
(
res
.
length
!=
msg
.
size
))
{
...
...
@@ -520,10 +528,10 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
Thread
.
sleep
(
REQUEST_RETRY_INTERVAL_MS
)
res
=
syncGetPeers
(
msg
)
}
return
res
}
def
syncGetPeers
(
msg
:
GetPeers
)
:
Seq
[
BlockManagerId
]
=
{
val
startTimeMs
=
System
.
currentTimeMillis
val
tmp
=
" msg "
+
msg
+
" "
...
...
@@ -545,4 +553,8 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
return
null
}
}
def
getMemoryStatus
:
Map
[
BlockManagerId
,
(
Long
,
Long
)]
=
{
askMaster
(
GetMemoryStatus
).
asInstanceOf
[
Map
[
BlockManagerId
,
(
Long
,
Long
)]]
}
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment