Skip to content
Snippets Groups Projects
Commit 0402be90 authored by Ankur Dave's avatar Ankur Dave Committed by Reynold Xin
Browse files

Internal cleanup for aggregateMessages

1. Add EdgeActiveness enum to represent activeness criteria more cleanly than using booleans.
2. Comments and whitespace.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #3231 from ankurdave/aggregateMessages-followup and squashes the following commits:

3d485c3 [Ankur Dave] Internal cleanup for aggregateMessages
parent aa43a8da
No related branches found
No related tags found
No related merge requests found
......@@ -207,8 +207,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* }}}
*
*/
def mapTriplets[ED2: ClassTag](
map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
mapTriplets((pid, iter) => iter.map(map), TripletFields.All)
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.graphx.impl;
/**
* Criteria for filtering edges based on activeness. For internal use only.
*/
public enum EdgeActiveness {
/** Neither the source vertex nor the destination vertex need be active. */
Neither,
/** The source vertex must be active. */
SrcOnly,
/** The destination vertex must be active. */
DstOnly,
/** Both vertices must be active. */
Both,
/** At least one vertex must be active. */
Either
}
......@@ -64,6 +64,7 @@ class EdgePartition[
activeSet: Option[VertexSet])
extends Serializable {
/** No-arg constructor for serialization. */
private def this() = this(null, null, null, null, null, null, null, null)
/** Return a new `EdgePartition` with the specified edge data. */
......@@ -375,12 +376,7 @@ class EdgePartition[
* @param sendMsg generates messages to neighboring vertices of an edge
* @param mergeMsg the combiner applied to messages destined to the same vertex
* @param tripletFields which triplet fields `sendMsg` uses
* @param srcMustBeActive if true, edges will only be considered if their source vertex is in the
* active set
* @param dstMustBeActive if true, edges will only be considered if their destination vertex is in
* the active set
* @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be
* considered
* @param activeness criteria for filtering edges based on activeness
*
* @return iterator aggregated messages keyed by the receiving vertex id
*/
......@@ -388,9 +384,7 @@ class EdgePartition[
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields,
srcMustBeActive: Boolean,
dstMustBeActive: Boolean,
maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)
......@@ -401,10 +395,13 @@ class EdgePartition[
val srcId = local2global(localSrcId)
val localDstId = localDstIds(i)
val dstId = local2global(localDstId)
val srcIsActive = !srcMustBeActive || isActive(srcId)
val dstIsActive = !dstMustBeActive || isActive(dstId)
val edgeIsActive =
if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive
if (activeness == EdgeActiveness.Neither) true
else if (activeness == EdgeActiveness.SrcOnly) isActive(srcId)
else if (activeness == EdgeActiveness.DstOnly) isActive(dstId)
else if (activeness == EdgeActiveness.Both) isActive(srcId) && isActive(dstId)
else if (activeness == EdgeActiveness.Either) isActive(srcId) || isActive(dstId)
else throw new Exception("unreachable")
if (edgeIsActive) {
val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else null.asInstanceOf[VD]
val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
......@@ -424,12 +421,7 @@ class EdgePartition[
* @param sendMsg generates messages to neighboring vertices of an edge
* @param mergeMsg the combiner applied to messages destined to the same vertex
* @param tripletFields which triplet fields `sendMsg` uses
* @param srcMustBeActive if true, edges will only be considered if their source vertex is in the
* active set
* @param dstMustBeActive if true, edges will only be considered if their destination vertex is in
* the active set
* @param maySatisfyEither if true, only one vertex need be in the active set for an edge to be
* considered
* @param activeness criteria for filtering edges based on activeness
*
* @return iterator aggregated messages keyed by the receiving vertex id
*/
......@@ -437,9 +429,7 @@ class EdgePartition[
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields,
srcMustBeActive: Boolean,
dstMustBeActive: Boolean,
maySatisfyEither: Boolean): Iterator[(VertexId, A)] = {
activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
val aggregates = new Array[A](vertexAttrs.length)
val bitset = new BitSet(vertexAttrs.length)
......@@ -448,8 +438,16 @@ class EdgePartition[
val clusterSrcId = cluster._1
val clusterPos = cluster._2
val clusterLocalSrcId = localSrcIds(clusterPos)
val srcIsActive = !srcMustBeActive || isActive(clusterSrcId)
if (srcIsActive || maySatisfyEither) {
val scanCluster =
if (activeness == EdgeActiveness.Neither) true
else if (activeness == EdgeActiveness.SrcOnly) isActive(clusterSrcId)
else if (activeness == EdgeActiveness.DstOnly) true
else if (activeness == EdgeActiveness.Both) isActive(clusterSrcId)
else if (activeness == EdgeActiveness.Either) true
else throw new Exception("unreachable")
if (scanCluster) {
var pos = clusterPos
val srcAttr =
if (tripletFields.useSrc) vertexAttrs(clusterLocalSrcId) else null.asInstanceOf[VD]
......@@ -457,9 +455,13 @@ class EdgePartition[
while (pos < size && localSrcIds(pos) == clusterLocalSrcId) {
val localDstId = localDstIds(pos)
val dstId = local2global(localDstId)
val dstIsActive = !dstMustBeActive || isActive(dstId)
val edgeIsActive =
if (maySatisfyEither) srcIsActive || dstIsActive else srcIsActive && dstIsActive
if (activeness == EdgeActiveness.Neither) true
else if (activeness == EdgeActiveness.SrcOnly) true
else if (activeness == EdgeActiveness.DstOnly) isActive(dstId)
else if (activeness == EdgeActiveness.Both) isActive(dstId)
else if (activeness == EdgeActiveness.Either) isActive(clusterSrcId) || isActive(dstId)
else throw new Exception("unreachable")
if (edgeIsActive) {
val dstAttr =
if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
......
......@@ -218,30 +218,30 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
case Some(EdgeDirection.Both) =>
if (activeFraction < 0.8) {
edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
true, true, false)
EdgeActiveness.Both)
} else {
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
true, true, false)
EdgeActiveness.Both)
}
case Some(EdgeDirection.Either) =>
// TODO: Because we only have a clustered index on the source vertex ID, we can't filter
// the index here. Instead we have to scan all edges and then do the filter.
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
true, true, true)
EdgeActiveness.Either)
case Some(EdgeDirection.Out) =>
if (activeFraction < 0.8) {
edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
true, false, false)
EdgeActiveness.SrcOnly)
} else {
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
true, false, false)
EdgeActiveness.SrcOnly)
}
case Some(EdgeDirection.In) =>
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
false, true, false)
EdgeActiveness.DstOnly)
case _ => // None
edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
false, false, false)
EdgeActiveness.Neither)
}
}).setName("GraphImpl.aggregateMessages - preAgg")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment