Blob Blame History Raw
# This file is part of cloud-init. See LICENSE file for license information.

"""Base EC2 platform."""
from datetime import datetime
import os

import boto3
import botocore
from botocore import session, handlers
import base64

from ..platforms import Platform
from .image import EC2Image
from .instance import EC2Instance
from tests.cloud_tests import LOG


class EC2Platform(Platform):
    """EC2 test platform."""

    platform_name = 'ec2'
    ipv4_cidr = '192.168.1.0/20'

    def __init__(self, config):
        """Set up platform."""
        super(EC2Platform, self).__init__(config)
        # Used for unique VPC, SSH key, and custom AMI generation naming
        self.tag = '%s-%s' % (
            config['tag'], datetime.now().strftime('%Y%m%d%H%M%S'))
        self.instance_type = config['instance-type']

        try:
            b3session = get_session()
            self.ec2_client = b3session.client('ec2')
            self.ec2_resource = b3session.resource('ec2')
            self.ec2_region = b3session.region_name
            self.key_name = self._upload_public_key(config)
        except botocore.exceptions.NoRegionError as e:
            raise RuntimeError(
                'Please configure default region in $HOME/.aws/config'
            ) from e
        except botocore.exceptions.NoCredentialsError as e:
            raise RuntimeError(
                'Please configure ec2 credentials in $HOME/.aws/credentials'
            ) from e

        self.vpc = self._create_vpc()
        self.internet_gateway = self._create_internet_gateway()
        self.subnet = self._create_subnet()
        self.routing_table = self._create_routing_table()
        self.security_group = self._create_security_group()

    def create_instance(self, properties, config, features,
                        image_ami, user_data=None):
        """Create an instance

        @param src_img_path: image path to launch from
        @param properties: image properties
        @param config: image configuration
        @param features: image features
        @param image_ami: string of image ami ID
        @param user_data: test user-data to pass to instance
        @return_value: cloud_tests.instances instance
        """
        return EC2Instance(self, properties, config, features,
                           image_ami, user_data)

    def destroy(self):
        """Delete SSH keys, terminate all instances, and delete VPC."""
        for instance in self.vpc.instances.all():
            LOG.debug('waiting for instance %s termination', instance.id)
            instance.terminate()
            instance.wait_until_terminated()

        if self.key_name:
            LOG.debug('deleting SSH key %s', self.key_name)
            self.ec2_client.delete_key_pair(KeyName=self.key_name)

        if self.security_group:
            LOG.debug('deleting security group %s', self.security_group.id)
            self.security_group.delete()

        if self.subnet:
            LOG.debug('deleting subnet %s', self.subnet.id)
            self.subnet.delete()

        if self.routing_table:
            LOG.debug('deleting routing table %s', self.routing_table.id)
            self.routing_table.delete()

        if self.internet_gateway:
            LOG.debug('deleting internet gateway %s', self.internet_gateway.id)
            self.internet_gateway.detach_from_vpc(VpcId=self.vpc.id)
            self.internet_gateway.delete()

        if self.vpc:
            LOG.debug('deleting vpc %s', self.vpc.id)
            self.vpc.delete()

    def get_image(self, img_conf):
        """Get image using specified image configuration.

        Hard coded for 'amd64' based images.

        @param img_conf: configuration for image
        @return_value: cloud_tests.images instance
        """
        if img_conf['root-store'] == 'ebs':
            root_store = 'ssd'
        elif img_conf['root-store'] == 'instance-store':
            root_store = 'instance'
        else:
            raise RuntimeError('Unknown root-store type: %s' %
                               (img_conf['root-store']))

        filters = [
            'arch=%s' % 'amd64',
            'endpoint=https://ec2.%s.amazonaws.com' % self.ec2_region,
            'region=%s' % self.ec2_region,
            'release=%s' % img_conf['release'],
            'root_store=%s' % root_store,
            'virt=hvm',
        ]

        LOG.debug('finding image using streams')
        image = self._query_streams(img_conf, filters)

        try:
            image_ami = image['id']
        except KeyError as e:
            raise RuntimeError(
                'No images found for %s!' % img_conf['release']
            ) from e

        LOG.debug('found image: %s', image_ami)
        image = EC2Image(self, img_conf, image_ami)
        return image

    def _create_internet_gateway(self):
        """Create Internet Gateway and assign to VPC."""
        LOG.debug('creating internet gateway')
        # pylint: disable=no-member
        internet_gateway = self.ec2_resource.create_internet_gateway()
        internet_gateway.attach_to_vpc(VpcId=self.vpc.id)
        self._tag_resource(internet_gateway)

        return internet_gateway

    def _create_routing_table(self):
        """Update default routing table with internet gateway.

        This sets up internet access between the VPC via the internet gateway
        by configuring routing tables for IPv4 and IPv6.
        """
        LOG.debug('creating routing table')
        route_table = self.vpc.create_route_table()
        route_table.create_route(DestinationCidrBlock='0.0.0.0/0',
                                 GatewayId=self.internet_gateway.id)
        route_table.create_route(DestinationIpv6CidrBlock='::/0',
                                 GatewayId=self.internet_gateway.id)
        route_table.associate_with_subnet(SubnetId=self.subnet.id)
        self._tag_resource(route_table)

        return route_table

    def _create_security_group(self):
        """Enables ingress to default VPC security group."""
        LOG.debug('creating security group')
        security_group = self.vpc.create_security_group(
            GroupName=self.tag, Description='integration test security group')
        security_group.authorize_ingress(
            IpProtocol='-1', FromPort=-1, ToPort=-1, CidrIp='0.0.0.0/0')
        self._tag_resource(security_group)

        return security_group

    def _create_subnet(self):
        """Generate IPv4 and IPv6 subnets for use."""
        ipv6_cidr = self.vpc.ipv6_cidr_block_association_set[0][
            'Ipv6CidrBlock'][:-2] + '64'

        LOG.debug('creating subnet with following ranges:')
        LOG.debug('ipv4: %s', self.ipv4_cidr)
        LOG.debug('ipv6: %s', ipv6_cidr)
        subnet = self.vpc.create_subnet(CidrBlock=self.ipv4_cidr,
                                        Ipv6CidrBlock=ipv6_cidr)
        modify_subnet = subnet.meta.client.modify_subnet_attribute
        modify_subnet(SubnetId=subnet.id,
                      MapPublicIpOnLaunch={'Value': True})
        self._tag_resource(subnet)

        return subnet

    def _create_vpc(self):
        """Setup AWS EC2 VPC or return existing VPC."""
        LOG.debug('creating new vpc')
        try:
            vpc = self.ec2_resource.create_vpc(  # pylint: disable=no-member
                CidrBlock=self.ipv4_cidr,
                AmazonProvidedIpv6CidrBlock=True)
        except botocore.exceptions.ClientError as e:
            raise RuntimeError(e) from e

        vpc.wait_until_available()
        self._tag_resource(vpc)

        return vpc

    def _tag_resource(self, resource):
        """Tag a resource with the specified tag.

        This makes finding and deleting resources specific to this testing
        much easier to find.

        @param resource: resource to tag
        """
        tag = {
            'Key': 'Name',
            'Value': self.tag
        }
        resource.create_tags(Tags=[tag])

    def _upload_public_key(self, config):
        """Generate random name and upload SSH key with that name.

        @param config: platform config
        @return: string of ssh key name
        """
        key_file = os.path.join(config['data_dir'], config['public_key'])
        with open(key_file, 'r') as file:
            public_key = file.read().strip('\n')

        LOG.debug('uploading SSH key %s', self.tag)
        self.ec2_client.import_key_pair(KeyName=self.tag,
                                        PublicKeyMaterial=public_key)

        return self.tag


def _decode_console_output_as_bytes(parsed, **kwargs):
    """Provide console output as bytes in OutputBytes.

       For this to be useful, the session has to have had the
       decode_console_output handler unregistered already.

       https://github.com/boto/botocore/issues/1351 ."""
    if 'Output' not in parsed:
        return
    orig = parsed['Output']
    handlers.decode_console_output(parsed, **kwargs)
    parsed['OutputBytes'] = base64.b64decode(orig)


def get_session():
    mysess = session.get_session()
    mysess.unregister('after-call.ec2.GetConsoleOutput',
                      handlers.decode_console_output)
    mysess.register('after-call.ec2.GetConsoleOutput',
                    _decode_console_output_as_bytes)
    return boto3.Session(botocore_session=mysess)


# vi: ts=4 expandtab