Skip to content
Snippets Groups Projects
Commit edf8a56a authored by Kay Ousterhout's avatar Kay Ousterhout Committed by Patrick Wendell
Browse files

Remote BlockFetchTracker trait

This trait seems to have been created a while ago when there
were multiple implementations; now that there's just one, I think it
makes sense to merge it into the BlockFetcherIterator trait.

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #39 from kayousterhout/remove_tracker and squashes the following commits:

8173939 [Kay Ousterhout] Remote BlockFetchTracker.
parent 40e080a6
No related branches found
No related tags found
No related merge requests found
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.storage
private[spark] trait BlockFetchTracker {
def totalBlocks : Int
def numLocalBlocks: Int
def numRemoteBlocks: Int
def remoteFetchTime : Long
def fetchWaitTime: Long
def remoteBytesRead : Long
}
......@@ -44,9 +44,14 @@ import org.apache.spark.util.Utils
*/
private[storage]
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])]
with Logging with BlockFetchTracker {
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
def initialize()
def totalBlocks: Int
def numLocalBlocks: Int
def numRemoteBlocks: Int
def remoteFetchTime: Long
def fetchWaitTime: Long
def remoteBytesRead: Long
}
......@@ -233,7 +238,16 @@ object BlockFetcherIterator {
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
}
//an iterator that will read fetched blocks off the queue as they arrive.
override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote
override def remoteFetchTime: Long = _remoteFetchTime
override def fetchWaitTime: Long = _fetchWaitTime
override def remoteBytesRead: Long = _remoteBytesRead
// Implementing the Iterator methods with an iterator that reads fetched blocks off the queue
// as they arrive.
@volatile protected var resultsGotten = 0
override def hasNext: Boolean = resultsGotten < _numBlocksToFetch
......@@ -251,14 +265,6 @@ object BlockFetcherIterator {
}
(result.blockId, if (result.failed) None else Some(result.deserialize()))
}
// Implementing BlockFetchTracker trait.
override def totalBlocks: Int = numLocal + numRemote
override def numLocalBlocks: Int = numLocal
override def numRemoteBlocks: Int = numRemote
override def remoteFetchTime: Long = _remoteFetchTime
override def fetchWaitTime: Long = _fetchWaitTime
override def remoteBytesRead: Long = _remoteBytesRead
}
// End of BasicBlockFetcherIterator
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment