Skip to content
Snippets Groups Projects
Commit 40afe0d2 authored by Jey Kottalam's avatar Jey Kottalam
Browse files

Add Python timing instrumentation

parent 1057fccf
No related branches found
No related tags found
No related merge requests found
...@@ -47,6 +47,7 @@ private[spark] class PythonRDD[T: ClassManifest]( ...@@ -47,6 +47,7 @@ private[spark] class PythonRDD[T: ClassManifest](
currentEnvVars.put(variable, value) currentEnvVars.put(variable, value)
} }
val startTime = System.currentTimeMillis
val proc = pb.start() val proc = pb.start()
val env = SparkEnv.get val env = SparkEnv.get
...@@ -108,6 +109,17 @@ private[spark] class PythonRDD[T: ClassManifest]( ...@@ -108,6 +109,17 @@ private[spark] class PythonRDD[T: ClassManifest](
val obj = new Array[Byte](length) val obj = new Array[Byte](length)
stream.readFully(obj) stream.readFully(obj)
obj obj
case -3 =>
// Timing data from child
val bootTime = stream.readLong()
val initTime = stream.readLong()
val finishTime = stream.readLong()
val boot = bootTime - startTime
val init = initTime - bootTime
val finish = finishTime - initTime
val total = finishTime - startTime
logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot, init, finish))
read
case -2 => case -2 =>
// Signals that an exception has been thrown in python // Signals that an exception has been thrown in python
val exLength = stream.readInt() val exLength = stream.readInt()
......
...@@ -46,6 +46,10 @@ def read_long(stream): ...@@ -46,6 +46,10 @@ def read_long(stream):
return struct.unpack("!q", length)[0] return struct.unpack("!q", length)[0]
def write_long(value, stream):
stream.write(struct.pack("!q", value))
def read_int(stream): def read_int(stream):
length = stream.read(4) length = stream.read(4)
if length == "": if length == "":
......
""" """
Worker that receives input from Piped RDD. Worker that receives input from Piped RDD.
""" """
import time
preboot_time = time.time()
import os import os
import sys import sys
import traceback import traceback
...@@ -12,7 +14,7 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry ...@@ -12,7 +14,7 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry
from pyspark.cloudpickle import CloudPickler from pyspark.cloudpickle import CloudPickler
from pyspark.files import SparkFiles from pyspark.files import SparkFiles
from pyspark.serializers import write_with_length, read_with_length, write_int, \ from pyspark.serializers import write_with_length, read_with_length, write_int, \
read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
# Redirect stdout to stderr so that users must return values from functions. # Redirect stdout to stderr so that users must return values from functions.
...@@ -24,7 +26,16 @@ def load_obj(): ...@@ -24,7 +26,16 @@ def load_obj():
return load_pickle(standard_b64decode(sys.stdin.readline().strip())) return load_pickle(standard_b64decode(sys.stdin.readline().strip()))
def report_times(preboot, boot, init, finish):
write_int(-3, old_stdout)
write_long(1000 * preboot, old_stdout)
write_long(1000 * boot, old_stdout)
write_long(1000 * init, old_stdout)
write_long(1000 * finish, old_stdout)
def main(): def main():
boot_time = time.time()
split_index = read_int(sys.stdin) split_index = read_int(sys.stdin)
spark_files_dir = load_pickle(read_with_length(sys.stdin)) spark_files_dir = load_pickle(read_with_length(sys.stdin))
SparkFiles._root_directory = spark_files_dir SparkFiles._root_directory = spark_files_dir
...@@ -41,6 +52,7 @@ def main(): ...@@ -41,6 +52,7 @@ def main():
dumps = lambda x: x dumps = lambda x: x
else: else:
dumps = dump_pickle dumps = dump_pickle
init_time = time.time()
iterator = read_from_pickle_file(sys.stdin) iterator = read_from_pickle_file(sys.stdin)
try: try:
for obj in func(split_index, iterator): for obj in func(split_index, iterator):
...@@ -49,6 +61,8 @@ def main(): ...@@ -49,6 +61,8 @@ def main():
write_int(-2, old_stdout) write_int(-2, old_stdout)
write_with_length(traceback.format_exc(), old_stdout) write_with_length(traceback.format_exc(), old_stdout)
sys.exit(-1) sys.exit(-1)
finish_time = time.time()
report_times(preboot_time, boot_time, init_time, finish_time)
# Mark the beginning of the accumulators section of the output # Mark the beginning of the accumulators section of the output
write_int(-1, old_stdout) write_int(-1, old_stdout)
for aid, accum in _accumulatorRegistry.items(): for aid, accum in _accumulatorRegistry.items():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment