Blame build/upload_generated_sources.py

Packit f0b94e
#!/usr/bin/env/python
Packit f0b94e
# This Source Code Form is subject to the terms of the Mozilla Public
Packit f0b94e
# License, v. 2.0. If a copy of the MPL was not distributed with this
Packit f0b94e
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
Packit f0b94e
Packit f0b94e
from __future__ import absolute_import, print_function, unicode_literals
Packit f0b94e
Packit f0b94e
import argparse
Packit f0b94e
from contextlib import contextmanager
Packit f0b94e
import gzip
Packit f0b94e
import io
Packit f0b94e
import logging
Packit f0b94e
from mozbuild.base import MozbuildObject
Packit f0b94e
from mozbuild.generated_sources import (
Packit f0b94e
    get_filename_with_digest,
Packit f0b94e
    get_s3_region_and_bucket,
Packit f0b94e
)
Packit f0b94e
import os
Packit f0b94e
from Queue import Queue
Packit f0b94e
import requests
Packit f0b94e
import sys
Packit f0b94e
import tarfile
Packit f0b94e
from requests.packages.urllib3.util.retry import Retry
Packit f0b94e
from threading import Event, Thread
Packit f0b94e
import time
Packit f0b94e
Packit f0b94e
# Arbitrary, should probably measure this.
Packit f0b94e
NUM_WORKER_THREADS = 10
Packit f0b94e
log = logging.getLogger('upload-generated-sources')
Packit f0b94e
log.setLevel(logging.INFO)
Packit f0b94e
Packit f0b94e
Packit f0b94e
@contextmanager
Packit f0b94e
def timed():
Packit f0b94e
    '''
Packit f0b94e
    Yield a function that provides the elapsed time in seconds since this
Packit f0b94e
    function was called.
Packit f0b94e
    '''
Packit f0b94e
    start = time.time()
Packit f0b94e
Packit f0b94e
    def elapsed():
Packit f0b94e
        return time.time() - start
Packit f0b94e
    yield elapsed
Packit f0b94e
Packit f0b94e
Packit f0b94e
def gzip_compress(data):
Packit f0b94e
    '''
Packit f0b94e
    Apply gzip compression to `data` and return the result as a `BytesIO`.
Packit f0b94e
    '''
Packit f0b94e
    b = io.BytesIO()
Packit f0b94e
    with gzip.GzipFile(fileobj=b, mode='w') as f:
Packit f0b94e
        f.write(data)
Packit f0b94e
    b.flush()
Packit f0b94e
    b.seek(0)
Packit f0b94e
    return b
Packit f0b94e
Packit f0b94e
Packit f0b94e
def upload_worker(queue, event, bucket, session_args):
Packit f0b94e
    '''
Packit f0b94e
    Get `(name, contents)` entries from `queue` and upload `contents`
Packit f0b94e
    to S3 with gzip compression using `name` as the key, prefixed with
Packit f0b94e
    the SHA-512 digest of `contents` as a hex string. If an exception occurs,
Packit f0b94e
    set `event`.
Packit f0b94e
    '''
Packit f0b94e
    try:
Packit f0b94e
        import boto3
Packit f0b94e
        session = boto3.session.Session(**session_args)
Packit f0b94e
        s3 = session.client('s3')
Packit f0b94e
        while True:
Packit f0b94e
            if event.is_set():
Packit f0b94e
                # Some other thread hit an exception.
Packit f0b94e
                return
Packit f0b94e
            (name, contents) = queue.get()
Packit f0b94e
            pathname = get_filename_with_digest(name, contents)
Packit f0b94e
            compressed = gzip_compress(contents)
Packit f0b94e
            extra_args = {
Packit f0b94e
                'ContentEncoding': 'gzip',
Packit f0b94e
                'ContentType': 'text/plain',
Packit f0b94e
            }
Packit f0b94e
            log.info('Uploading "{}" ({} bytes)'.format(
Packit f0b94e
                pathname, len(compressed.getvalue())))
Packit f0b94e
            with timed() as elapsed:
Packit f0b94e
                s3.upload_fileobj(compressed, bucket,
Packit f0b94e
                                  pathname, ExtraArgs=extra_args)
Packit f0b94e
                log.info('Finished uploading "{}" in {:0.3f}s'.format(
Packit f0b94e
                    pathname, elapsed()))
Packit f0b94e
            queue.task_done()
Packit f0b94e
    except Exception:
Packit f0b94e
        log.exception('Thread encountered exception:')
Packit f0b94e
        event.set()
Packit f0b94e
Packit f0b94e
Packit f0b94e
def do_work(artifact, region, bucket):
Packit f0b94e
    session_args = {'region_name': region}
Packit f0b94e
    session = requests.Session()
Packit f0b94e
    retry = Retry(total=5, backoff_factor=0.1,
Packit f0b94e
                  status_forcelist=[500, 502, 503, 504])
Packit f0b94e
    http_adapter = requests.adapters.HTTPAdapter(max_retries=retry)
Packit f0b94e
    session.mount('https://', http_adapter)
Packit f0b94e
    session.mount('http://', http_adapter)
Packit f0b94e
Packit f0b94e
    if 'TASK_ID' in os.environ:
Packit f0b94e
        level = os.environ.get('MOZ_SCM_LEVEL', '1')
Packit f0b94e
        secrets_url = 'http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload'.format( # noqa
Packit f0b94e
            level)
Packit f0b94e
        log.info(
Packit f0b94e
            'Using AWS credentials from the secrets service: "{}"'.format(secrets_url))
Packit f0b94e
        res = session.get(secrets_url)
Packit f0b94e
        res.raise_for_status()
Packit f0b94e
        secret = res.json()
Packit f0b94e
        session_args.update(
Packit f0b94e
            aws_access_key_id=secret['secret']['AWS_ACCESS_KEY_ID'],
Packit f0b94e
            aws_secret_access_key=secret['secret']['AWS_SECRET_ACCESS_KEY'],
Packit f0b94e
        )
Packit f0b94e
    else:
Packit f0b94e
        log.info('Trying to use your AWS credentials..')
Packit f0b94e
Packit f0b94e
    # First, fetch the artifact containing the sources.
Packit f0b94e
    log.info('Fetching generated sources artifact: "{}"'.format(artifact))
Packit f0b94e
    with timed() as elapsed:
Packit f0b94e
        res = session.get(artifact)
Packit f0b94e
        log.info('Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s'.format(
Packit f0b94e
            res.status_code, len(res.content), elapsed()))
Packit f0b94e
    res.raise_for_status()
Packit f0b94e
    # Create a queue and worker threads for uploading.
Packit f0b94e
    q = Queue()
Packit f0b94e
    event = Event()
Packit f0b94e
    log.info('Creating {} worker threads'.format(NUM_WORKER_THREADS))
Packit f0b94e
    for i in range(NUM_WORKER_THREADS):
Packit f0b94e
        t = Thread(target=upload_worker, args=(q, event, bucket, session_args))
Packit f0b94e
        t.daemon = True
Packit f0b94e
        t.start()
Packit f0b94e
    with tarfile.open(fileobj=io.BytesIO(res.content), mode='r|gz') as tar:
Packit f0b94e
        # Next, process each file.
Packit f0b94e
        for entry in tar:
Packit f0b94e
            if event.is_set():
Packit f0b94e
                break
Packit f0b94e
            log.info('Queueing "{}"'.format(entry.name))
Packit f0b94e
            q.put((entry.name, tar.extractfile(entry).read()))
Packit f0b94e
    # Wait until all uploads are finished.
Packit f0b94e
    # We don't use q.join() here because we want to also monitor event.
Packit f0b94e
    while q.unfinished_tasks:
Packit f0b94e
        if event.wait(0.1):
Packit f0b94e
            log.error('Worker thread encountered exception, exiting...')
Packit f0b94e
            break
Packit f0b94e
Packit f0b94e
Packit f0b94e
def main(argv):
Packit f0b94e
    logging.basicConfig(format='%(levelname)s - %(threadName)s - %(message)s')
Packit f0b94e
    parser = argparse.ArgumentParser(
Packit f0b94e
        description='Upload generated source files in ARTIFACT to BUCKET in S3.')
Packit f0b94e
    parser.add_argument('artifact',
Packit f0b94e
                        help='generated-sources artifact from build task')
Packit f0b94e
    args = parser.parse_args(argv)
Packit f0b94e
    region, bucket = get_s3_region_and_bucket()
Packit f0b94e
Packit f0b94e
    config = MozbuildObject.from_environment()
Packit f0b94e
    config._activate_virtualenv()
Packit f0b94e
    config.virtualenv_manager.install_pip_package('boto3==1.4.4')
Packit f0b94e
Packit f0b94e
    with timed() as elapsed:
Packit f0b94e
        do_work(region=region, bucket=bucket, artifact=args.artifact)
Packit f0b94e
        log.info('Finished in {:.03f}s'.format(elapsed()))
Packit f0b94e
    return 0
Packit f0b94e
Packit f0b94e
Packit f0b94e
if __name__ == '__main__':
Packit f0b94e
    sys.exit(main(sys.argv[1:]))