Skip to content
Snippets Groups Projects
Commit 4a47d1a4 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #297 from JoshRosen/fix/ec2-spot-instances

Cancel spot instance requests when exiting spark-ec2
parents 51477e88 96c9bcfd
No related branches found
No related tags found
No related merge requests found
...@@ -233,27 +233,37 @@ def launch_cluster(conn, opts, cluster_name): ...@@ -233,27 +233,37 @@ def launch_cluster(conn, opts, cluster_name):
block_device_map = block_map) block_device_map = block_map)
my_req_ids = [req.id for req in slave_reqs] my_req_ids = [req.id for req in slave_reqs]
print "Waiting for spot instances to be granted..." print "Waiting for spot instances to be granted..."
while True: try:
time.sleep(10) while True:
reqs = conn.get_all_spot_instance_requests() time.sleep(10)
id_to_req = {} reqs = conn.get_all_spot_instance_requests()
for r in reqs: id_to_req = {}
id_to_req[r.id] = r for r in reqs:
active = 0 id_to_req[r.id] = r
instance_ids = [] active_instance_ids = []
for i in my_req_ids: for i in my_req_ids:
if id_to_req[i].state == "active": if i in id_to_req and id_to_req[i].state == "active":
active += 1 active_instance_ids.append(id_to_req[i].instance_id)
instance_ids.append(id_to_req[i].instance_id) if len(active_instance_ids) == opts.slaves:
if active == opts.slaves: print "All %d slaves granted" % opts.slaves
print "All %d slaves granted" % opts.slaves reservations = conn.get_all_instances(active_instance_ids)
reservations = conn.get_all_instances(instance_ids) slave_nodes = []
slave_nodes = [] for r in reservations:
for r in reservations: slave_nodes += r.instances
slave_nodes += r.instances break
break else:
else: print "%d of %d slaves granted, waiting longer" % (
print "%d of %d slaves granted, waiting longer" % (active, opts.slaves) len(active_instance_ids), opts.slaves)
except:
print "Canceling spot instance requests"
conn.cancel_spot_instance_requests(my_req_ids)
# Log a warning if any of these requests actually launched instances:
(master_nodes, slave_nodes, zoo_nodes) = get_existing_cluster(
conn, opts, cluster_name, die_on_error=False)
running = len(master_nodes) + len(slave_nodes) + len(zoo_nodes)
if running:
print >> stderr, ("WARNING: %d instances are still running" % running)
sys.exit(0)
else: else:
# Launch non-spot instances # Launch non-spot instances
slave_res = image.run(key_name = opts.key_pair, slave_res = image.run(key_name = opts.key_pair,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment