From 96c9bcfd8d3f45fe43b3857a80fa1a42f983970b Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@eecs.berkeley.edu>
Date: Tue, 30 Oct 2012 23:32:38 -0700
Subject: [PATCH] Cancel spot instance requests when exiting spark-ec2.

---
 ec2/spark_ec2.py | 52 +++++++++++++++++++++++++++++-------------------
 1 file changed, 31 insertions(+), 21 deletions(-)

diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 6a3647b218..c0926e214f 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -233,27 +233,37 @@ def launch_cluster(conn, opts, cluster_name):
         block_device_map = block_map)
     my_req_ids = [req.id for req in slave_reqs]
     print "Waiting for spot instances to be granted..."
-    while True:
-      time.sleep(10)
-      reqs = conn.get_all_spot_instance_requests()
-      id_to_req = {}
-      for r in reqs:
-        id_to_req[r.id] = r
-      active = 0
-      instance_ids = []
-      for i in my_req_ids:
-        if id_to_req[i].state == "active":
-          active += 1
-          instance_ids.append(id_to_req[i].instance_id)
-      if active == opts.slaves:
-        print "All %d slaves granted" % opts.slaves
-        reservations = conn.get_all_instances(instance_ids)
-        slave_nodes = []
-        for r in reservations:
-          slave_nodes += r.instances
-        break
-      else:
-        print "%d of %d slaves granted, waiting longer" % (active, opts.slaves)
+    try:
+      while True:
+        time.sleep(10)
+        reqs = conn.get_all_spot_instance_requests()
+        id_to_req = {}
+        for r in reqs:
+          id_to_req[r.id] = r
+        active_instance_ids = []
+        for i in my_req_ids:
+          if i in id_to_req and id_to_req[i].state == "active":
+            active_instance_ids.append(id_to_req[i].instance_id)
+        if len(active_instance_ids) == opts.slaves:
+          print "All %d slaves granted" % opts.slaves
+          reservations = conn.get_all_instances(active_instance_ids)
+          slave_nodes = []
+          for r in reservations:
+            slave_nodes += r.instances
+          break
+        else:
+          print "%d of %d slaves granted, waiting longer" % (
+            len(active_instance_ids), opts.slaves)
+    except:
+      print "Canceling spot instance requests"
+      conn.cancel_spot_instance_requests(my_req_ids)
+      # Log a warning if any of these requests actually launched instances:
+      (master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
+          conn, opts, cluster_name, die_on_error=False)
+      running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes)
+      if running:
+        print >> stderr, ("WARNING: %d instances are still running" % running)
+      sys.exit(0)
   else:
     # Launch non-spot instances
     slave_res = image.run(key_name = opts.key_pair,
-- 
GitLab