# This file is part of cloud-init. See LICENSE file for license information.
"""Base EC2 platform."""
from datetime import datetime
import os
import boto3
import botocore
from botocore import session, handlers
import base64
from ..platforms import Platform
from .image import EC2Image
from .instance import EC2Instance
from tests.cloud_tests import LOG
class EC2Platform(Platform):
"""EC2 test platform."""
platform_name = 'ec2'
ipv4_cidr = '192.168.1.0/20'
def __init__(self, config):
"""Set up platform."""
super(EC2Platform, self).__init__(config)
# Used for unique VPC, SSH key, and custom AMI generation naming
self.tag = '%s-%s' % (
config['tag'], datetime.now().strftime('%Y%m%d%H%M%S'))
self.instance_type = config['instance-type']
try:
b3session = get_session()
self.ec2_client = b3session.client('ec2')
self.ec2_resource = b3session.resource('ec2')
self.ec2_region = b3session.region_name
self.key_name = self._upload_public_key(config)
except botocore.exceptions.NoRegionError as e:
raise RuntimeError(
'Please configure default region in $HOME/.aws/config'
) from e
except botocore.exceptions.NoCredentialsError as e:
raise RuntimeError(
'Please configure ec2 credentials in $HOME/.aws/credentials'
) from e
self.vpc = self._create_vpc()
self.internet_gateway = self._create_internet_gateway()
self.subnet = self._create_subnet()
self.routing_table = self._create_routing_table()
self.security_group = self._create_security_group()
def create_instance(self, properties, config, features,
image_ami, user_data=None):
"""Create an instance
@param src_img_path: image path to launch from
@param properties: image properties
@param config: image configuration
@param features: image features
@param image_ami: string of image ami ID
@param user_data: test user-data to pass to instance
@return_value: cloud_tests.instances instance
"""
return EC2Instance(self, properties, config, features,
image_ami, user_data)
def destroy(self):
"""Delete SSH keys, terminate all instances, and delete VPC."""
for instance in self.vpc.instances.all():
LOG.debug('waiting for instance %s termination', instance.id)
instance.terminate()
instance.wait_until_terminated()
if self.key_name:
LOG.debug('deleting SSH key %s', self.key_name)
self.ec2_client.delete_key_pair(KeyName=self.key_name)
if self.security_group:
LOG.debug('deleting security group %s', self.security_group.id)
self.security_group.delete()
if self.subnet:
LOG.debug('deleting subnet %s', self.subnet.id)
self.subnet.delete()
if self.routing_table:
LOG.debug('deleting routing table %s', self.routing_table.id)
self.routing_table.delete()
if self.internet_gateway:
LOG.debug('deleting internet gateway %s', self.internet_gateway.id)
self.internet_gateway.detach_from_vpc(VpcId=self.vpc.id)
self.internet_gateway.delete()
if self.vpc:
LOG.debug('deleting vpc %s', self.vpc.id)
self.vpc.delete()
def get_image(self, img_conf):
"""Get image using specified image configuration.
Hard coded for 'amd64' based images.
@param img_conf: configuration for image
@return_value: cloud_tests.images instance
"""
if img_conf['root-store'] == 'ebs':
root_store = 'ssd'
elif img_conf['root-store'] == 'instance-store':
root_store = 'instance'
else:
raise RuntimeError('Unknown root-store type: %s' %
(img_conf['root-store']))
filters = [
'arch=%s' % 'amd64',
'endpoint=https://ec2.%s.amazonaws.com' % self.ec2_region,
'region=%s' % self.ec2_region,
'release=%s' % img_conf['release'],
'root_store=%s' % root_store,
'virt=hvm',
]
LOG.debug('finding image using streams')
image = self._query_streams(img_conf, filters)
try:
image_ami = image['id']
except KeyError as e:
raise RuntimeError(
'No images found for %s!' % img_conf['release']
) from e
LOG.debug('found image: %s', image_ami)
image = EC2Image(self, img_conf, image_ami)
return image
def _create_internet_gateway(self):
"""Create Internet Gateway and assign to VPC."""
LOG.debug('creating internet gateway')
# pylint: disable=no-member
internet_gateway = self.ec2_resource.create_internet_gateway()
internet_gateway.attach_to_vpc(VpcId=self.vpc.id)
self._tag_resource(internet_gateway)
return internet_gateway
def _create_routing_table(self):
"""Update default routing table with internet gateway.
This sets up internet access between the VPC via the internet gateway
by configuring routing tables for IPv4 and IPv6.
"""
LOG.debug('creating routing table')
route_table = self.vpc.create_route_table()
route_table.create_route(DestinationCidrBlock='0.0.0.0/0',
GatewayId=self.internet_gateway.id)
route_table.create_route(DestinationIpv6CidrBlock='::/0',
GatewayId=self.internet_gateway.id)
route_table.associate_with_subnet(SubnetId=self.subnet.id)
self._tag_resource(route_table)
return route_table
def _create_security_group(self):
"""Enables ingress to default VPC security group."""
LOG.debug('creating security group')
security_group = self.vpc.create_security_group(
GroupName=self.tag, Description='integration test security group')
security_group.authorize_ingress(
IpProtocol='-1', FromPort=-1, ToPort=-1, CidrIp='0.0.0.0/0')
self._tag_resource(security_group)
return security_group
def _create_subnet(self):
"""Generate IPv4 and IPv6 subnets for use."""
ipv6_cidr = self.vpc.ipv6_cidr_block_association_set[0][
'Ipv6CidrBlock'][:-2] + '64'
LOG.debug('creating subnet with following ranges:')
LOG.debug('ipv4: %s', self.ipv4_cidr)
LOG.debug('ipv6: %s', ipv6_cidr)
subnet = self.vpc.create_subnet(CidrBlock=self.ipv4_cidr,
Ipv6CidrBlock=ipv6_cidr)
modify_subnet = subnet.meta.client.modify_subnet_attribute
modify_subnet(SubnetId=subnet.id,
MapPublicIpOnLaunch={'Value': True})
self._tag_resource(subnet)
return subnet
def _create_vpc(self):
"""Setup AWS EC2 VPC or return existing VPC."""
LOG.debug('creating new vpc')
try:
vpc = self.ec2_resource.create_vpc( # pylint: disable=no-member
CidrBlock=self.ipv4_cidr,
AmazonProvidedIpv6CidrBlock=True)
except botocore.exceptions.ClientError as e:
raise RuntimeError(e) from e
vpc.wait_until_available()
self._tag_resource(vpc)
return vpc
def _tag_resource(self, resource):
"""Tag a resource with the specified tag.
This makes finding and deleting resources specific to this testing
much easier to find.
@param resource: resource to tag
"""
tag = {
'Key': 'Name',
'Value': self.tag
}
resource.create_tags(Tags=[tag])
def _upload_public_key(self, config):
"""Generate random name and upload SSH key with that name.
@param config: platform config
@return: string of ssh key name
"""
key_file = os.path.join(config['data_dir'], config['public_key'])
with open(key_file, 'r') as file:
public_key = file.read().strip('\n')
LOG.debug('uploading SSH key %s', self.tag)
self.ec2_client.import_key_pair(KeyName=self.tag,
PublicKeyMaterial=public_key)
return self.tag
def _decode_console_output_as_bytes(parsed, **kwargs):
"""Provide console output as bytes in OutputBytes.
For this to be useful, the session has to have had the
decode_console_output handler unregistered already.
https://github.com/boto/botocore/issues/1351 ."""
if 'Output' not in parsed:
return
orig = parsed['Output']
handlers.decode_console_output(parsed, **kwargs)
parsed['OutputBytes'] = base64.b64decode(orig)
def get_session():
mysess = session.get_session()
mysess.unregister('after-call.ec2.GetConsoleOutput',
handlers.decode_console_output)
mysess.register('after-call.ec2.GetConsoleOutput',
_decode_console_output_as_bytes)
return boto3.Session(botocore_session=mysess)
# vi: ts=4 expandtab