Skip to content
Snippets Groups Projects
Commit 94053a7b authored by Vida Ha's avatar Vida Ha Committed by Josh Rosen
Browse files

SPARK-2333 - spark_ec2 script should allow option for existing security group

    - Uses the name tag to identify machines in a cluster.
    - Allows overriding the security group name so it doesn't need to coincide with the cluster name.
    - Outputs the request id's of up to 10 pending spot instance requests.

Author: Vida Ha <vida@databricks.com>

Closes #1899 from vidaha/vida/ec2-reuse-security-group and squashes the following commits:

c80d5c3 [Vida Ha] wrap retries in a try catch block
b2989d5 [Vida Ha] SPARK-2333: spark_ec2 script should allow option for existing security group
parent 31f0b071
No related branches found
No related tags found
No related merge requests found
...@@ -12,14 +12,16 @@ on the [Amazon Web Services site](http://aws.amazon.com/). ...@@ -12,14 +12,16 @@ on the [Amazon Web Services site](http://aws.amazon.com/).
`spark-ec2` is designed to manage multiple named clusters. You can `spark-ec2` is designed to manage multiple named clusters. You can
launch a new cluster (telling the script its size and giving it a name), launch a new cluster (telling the script its size and giving it a name),
shutdown an existing cluster, or log into a cluster. Each cluster is shutdown an existing cluster, or log into a cluster. Each cluster
identified by placing its machines into EC2 security groups whose names launches a set of instances, which are tagged with the cluster name,
are derived from the name of the cluster. For example, a cluster named and placed into EC2 security groups. If you don't specify a security
group, the `spark-ec2` script will create security groups based on the
cluster name you request. For example, a cluster named
`test` will contain a master node in a security group called `test` will contain a master node in a security group called
`test-master`, and a number of slave nodes in a security group called `test-master`, and a number of slave nodes in a security group called
`test-slaves`. The `spark-ec2` script will create these security groups `test-slaves`. You can also specify a security group prefix to be used
for you based on the cluster name you request. You can also use them to in place of the cluster name. Machines in a cluster can be identified
identify machines belonging to each cluster in the Amazon EC2 Console. by looking for the "Name" tag of the instance in the Amazon EC2 Console.
# Before You Start # Before You Start
......
...@@ -124,7 +124,7 @@ def parse_args(): ...@@ -124,7 +124,7 @@ def parse_args():
help="The SSH user you want to connect as (default: root)") help="The SSH user you want to connect as (default: root)")
parser.add_option( parser.add_option(
"--delete-groups", action="store_true", default=False, "--delete-groups", action="store_true", default=False,
help="When destroying a cluster, delete the security groups that were created") help="When destroying a cluster, delete the security groups that were created.")
parser.add_option( parser.add_option(
"--use-existing-master", action="store_true", default=False, "--use-existing-master", action="store_true", default=False,
help="Launch fresh slaves, but use an existing stopped master if possible") help="Launch fresh slaves, but use an existing stopped master if possible")
...@@ -138,7 +138,9 @@ def parse_args(): ...@@ -138,7 +138,9 @@ def parse_args():
parser.add_option( parser.add_option(
"--user-data", type="string", default="", "--user-data", type="string", default="",
help="Path to a user-data file (most AMI's interpret this as an initialization script)") help="Path to a user-data file (most AMI's interpret this as an initialization script)")
parser.add_option(
"--security-group-prefix", type="string", default=None,
help="Use this prefix for the security group rather than the cluster name.")
(opts, args) = parser.parse_args() (opts, args) = parser.parse_args()
if len(args) != 2: if len(args) != 2:
...@@ -285,8 +287,12 @@ def launch_cluster(conn, opts, cluster_name): ...@@ -285,8 +287,12 @@ def launch_cluster(conn, opts, cluster_name):
user_data_content = user_data_file.read() user_data_content = user_data_file.read()
print "Setting up security groups..." print "Setting up security groups..."
master_group = get_or_make_group(conn, cluster_name + "-master") if opts.security_group_prefix is None:
slave_group = get_or_make_group(conn, cluster_name + "-slaves") master_group = get_or_make_group(conn, cluster_name + "-master")
slave_group = get_or_make_group(conn, cluster_name + "-slaves")
else:
master_group = get_or_make_group(conn, opts.security_group_prefix + "-master")
slave_group = get_or_make_group(conn, opts.security_group_prefix + "-slaves")
if master_group.rules == []: # Group was just now created if master_group.rules == []: # Group was just now created
master_group.authorize(src_group=master_group) master_group.authorize(src_group=master_group)
master_group.authorize(src_group=slave_group) master_group.authorize(src_group=slave_group)
...@@ -310,12 +316,11 @@ def launch_cluster(conn, opts, cluster_name): ...@@ -310,12 +316,11 @@ def launch_cluster(conn, opts, cluster_name):
slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0') slave_group.authorize('tcp', 60060, 60060, '0.0.0.0/0')
slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0') slave_group.authorize('tcp', 60075, 60075, '0.0.0.0/0')
# Check if instances are already running in our groups # Check if instances are already running with the cluster name
existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name, existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
die_on_error=False) die_on_error=False)
if existing_slaves or (existing_masters and not opts.use_existing_master): if existing_slaves or (existing_masters and not opts.use_existing_master):
print >> stderr, ("ERROR: There are already instances running in " + print >> stderr, ("ERROR: There are already instances for name: %s " % cluster_name)
"group %s or %s" % (master_group.name, slave_group.name))
sys.exit(1) sys.exit(1)
# Figure out Spark AMI # Figure out Spark AMI
...@@ -371,9 +376,13 @@ def launch_cluster(conn, opts, cluster_name): ...@@ -371,9 +376,13 @@ def launch_cluster(conn, opts, cluster_name):
for r in reqs: for r in reqs:
id_to_req[r.id] = r id_to_req[r.id] = r
active_instance_ids = [] active_instance_ids = []
outstanding_request_ids = []
for i in my_req_ids: for i in my_req_ids:
if i in id_to_req and id_to_req[i].state == "active": if i in id_to_req:
active_instance_ids.append(id_to_req[i].instance_id) if id_to_req[i].state == "active":
active_instance_ids.append(id_to_req[i].instance_id)
else:
outstanding_request_ids.append(i)
if len(active_instance_ids) == opts.slaves: if len(active_instance_ids) == opts.slaves:
print "All %d slaves granted" % opts.slaves print "All %d slaves granted" % opts.slaves
reservations = conn.get_all_instances(active_instance_ids) reservations = conn.get_all_instances(active_instance_ids)
...@@ -382,8 +391,8 @@ def launch_cluster(conn, opts, cluster_name): ...@@ -382,8 +391,8 @@ def launch_cluster(conn, opts, cluster_name):
slave_nodes += r.instances slave_nodes += r.instances
break break
else: else:
print "%d of %d slaves granted, waiting longer" % ( print "%d of %d slaves granted, waiting longer for request ids including %s" % (
len(active_instance_ids), opts.slaves) len(active_instance_ids), opts.slaves, outstanding_request_ids[0:10])
except: except:
print "Canceling spot instance requests" print "Canceling spot instance requests"
conn.cancel_spot_instance_requests(my_req_ids) conn.cancel_spot_instance_requests(my_req_ids)
...@@ -440,14 +449,29 @@ def launch_cluster(conn, opts, cluster_name): ...@@ -440,14 +449,29 @@ def launch_cluster(conn, opts, cluster_name):
print "Launched master in %s, regid = %s" % (zone, master_res.id) print "Launched master in %s, regid = %s" % (zone, master_res.id)
# Give the instances descriptive names # Give the instances descriptive names
# TODO: Add retry logic for tagging with name since it's used to identify a cluster.
for master in master_nodes: for master in master_nodes:
master.add_tag( name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)
key='Name', for i in range(0, 5):
value='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)) try:
master.add_tag(key='Name', value=name)
except:
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
if (i == 5):
raise "Error - failed max attempts to add name tag"
time.sleep(5)
for slave in slave_nodes: for slave in slave_nodes:
slave.add_tag( name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)
key='Name', for i in range(0, 5):
value='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)) try:
slave.add_tag(key='Name', value=name)
except:
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
if (i == 5):
raise "Error - failed max attempts to add name tag"
time.sleep(5)
# Return all the instances # Return all the instances
return (master_nodes, slave_nodes) return (master_nodes, slave_nodes)
...@@ -463,10 +487,10 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): ...@@ -463,10 +487,10 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
for res in reservations: for res in reservations:
active = [i for i in res.instances if is_active(i)] active = [i for i in res.instances if is_active(i)]
for inst in active: for inst in active:
group_names = [g.name for g in inst.groups] name = inst.tags.get(u'Name', "")
if group_names == [cluster_name + "-master"]: if name.startswith(cluster_name + "-master"):
master_nodes.append(inst) master_nodes.append(inst)
elif group_names == [cluster_name + "-slaves"]: elif name.startswith(cluster_name + "-slave"):
slave_nodes.append(inst) slave_nodes.append(inst)
if any((master_nodes, slave_nodes)): if any((master_nodes, slave_nodes)):
print ("Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes))) print ("Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes)))
...@@ -474,7 +498,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True): ...@@ -474,7 +498,7 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
return (master_nodes, slave_nodes) return (master_nodes, slave_nodes)
else: else:
if master_nodes == [] and slave_nodes != []: if master_nodes == [] and slave_nodes != []:
print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name + "-master" print >> sys.stderr, "ERROR: Could not find master in with name " + cluster_name + "-master"
else: else:
print >> sys.stderr, "ERROR: Could not find any existing cluster" print >> sys.stderr, "ERROR: Could not find any existing cluster"
sys.exit(1) sys.exit(1)
...@@ -816,7 +840,10 @@ def real_main(): ...@@ -816,7 +840,10 @@ def real_main():
# Delete security groups as well # Delete security groups as well
if opts.delete_groups: if opts.delete_groups:
print "Deleting security groups (this will take some time)..." print "Deleting security groups (this will take some time)..."
group_names = [cluster_name + "-master", cluster_name + "-slaves"] if opts.security_group_prefix is None:
group_names = [cluster_name + "-master", cluster_name + "-slaves"]
else:
group_names = [opts.security_group_prefix + "-master", opts.security_group_prefix + "-slaves"]
attempt = 1 attempt = 1
while attempt <= 3: while attempt <= 3:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment