|
Packit |
f0b94e |
#!/usr/bin/env/python
|
|
Packit |
f0b94e |
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
Packit |
f0b94e |
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
Packit |
f0b94e |
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
from __future__ import absolute_import, print_function, unicode_literals
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
import argparse
|
|
Packit |
f0b94e |
from contextlib import contextmanager
|
|
Packit |
f0b94e |
import gzip
|
|
Packit |
f0b94e |
import io
|
|
Packit |
f0b94e |
import logging
|
|
Packit |
f0b94e |
from mozbuild.base import MozbuildObject
|
|
Packit |
f0b94e |
from mozbuild.generated_sources import (
|
|
Packit |
f0b94e |
get_filename_with_digest,
|
|
Packit |
f0b94e |
get_s3_region_and_bucket,
|
|
Packit |
f0b94e |
)
|
|
Packit |
f0b94e |
import os
|
|
Packit |
f0b94e |
from Queue import Queue
|
|
Packit |
f0b94e |
import requests
|
|
Packit |
f0b94e |
import sys
|
|
Packit |
f0b94e |
import tarfile
|
|
Packit |
f0b94e |
from requests.packages.urllib3.util.retry import Retry
|
|
Packit |
f0b94e |
from threading import Event, Thread
|
|
Packit |
f0b94e |
import time
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
# Arbitrary, should probably measure this.
|
|
Packit |
f0b94e |
NUM_WORKER_THREADS = 10
|
|
Packit |
f0b94e |
log = logging.getLogger('upload-generated-sources')
|
|
Packit |
f0b94e |
log.setLevel(logging.INFO)
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
@contextmanager
|
|
Packit |
f0b94e |
def timed():
|
|
Packit |
f0b94e |
'''
|
|
Packit |
f0b94e |
Yield a function that provides the elapsed time in seconds since this
|
|
Packit |
f0b94e |
function was called.
|
|
Packit |
f0b94e |
'''
|
|
Packit |
f0b94e |
start = time.time()
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
def elapsed():
|
|
Packit |
f0b94e |
return time.time() - start
|
|
Packit |
f0b94e |
yield elapsed
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
def gzip_compress(data):
|
|
Packit |
f0b94e |
'''
|
|
Packit |
f0b94e |
Apply gzip compression to `data` and return the result as a `BytesIO`.
|
|
Packit |
f0b94e |
'''
|
|
Packit |
f0b94e |
b = io.BytesIO()
|
|
Packit |
f0b94e |
with gzip.GzipFile(fileobj=b, mode='w') as f:
|
|
Packit |
f0b94e |
f.write(data)
|
|
Packit |
f0b94e |
b.flush()
|
|
Packit |
f0b94e |
b.seek(0)
|
|
Packit |
f0b94e |
return b
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
def upload_worker(queue, event, bucket, session_args):
|
|
Packit |
f0b94e |
'''
|
|
Packit |
f0b94e |
Get `(name, contents)` entries from `queue` and upload `contents`
|
|
Packit |
f0b94e |
to S3 with gzip compression using `name` as the key, prefixed with
|
|
Packit |
f0b94e |
the SHA-512 digest of `contents` as a hex string. If an exception occurs,
|
|
Packit |
f0b94e |
set `event`.
|
|
Packit |
f0b94e |
'''
|
|
Packit |
f0b94e |
try:
|
|
Packit |
f0b94e |
import boto3
|
|
Packit |
f0b94e |
session = boto3.session.Session(**session_args)
|
|
Packit |
f0b94e |
s3 = session.client('s3')
|
|
Packit |
f0b94e |
while True:
|
|
Packit |
f0b94e |
if event.is_set():
|
|
Packit |
f0b94e |
# Some other thread hit an exception.
|
|
Packit |
f0b94e |
return
|
|
Packit |
f0b94e |
(name, contents) = queue.get()
|
|
Packit |
f0b94e |
pathname = get_filename_with_digest(name, contents)
|
|
Packit |
f0b94e |
compressed = gzip_compress(contents)
|
|
Packit |
f0b94e |
extra_args = {
|
|
Packit |
f0b94e |
'ContentEncoding': 'gzip',
|
|
Packit |
f0b94e |
'ContentType': 'text/plain',
|
|
Packit |
f0b94e |
}
|
|
Packit |
f0b94e |
log.info('Uploading "{}" ({} bytes)'.format(
|
|
Packit |
f0b94e |
pathname, len(compressed.getvalue())))
|
|
Packit |
f0b94e |
with timed() as elapsed:
|
|
Packit |
f0b94e |
s3.upload_fileobj(compressed, bucket,
|
|
Packit |
f0b94e |
pathname, ExtraArgs=extra_args)
|
|
Packit |
f0b94e |
log.info('Finished uploading "{}" in {:0.3f}s'.format(
|
|
Packit |
f0b94e |
pathname, elapsed()))
|
|
Packit |
f0b94e |
queue.task_done()
|
|
Packit |
f0b94e |
except Exception:
|
|
Packit |
f0b94e |
log.exception('Thread encountered exception:')
|
|
Packit |
f0b94e |
event.set()
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
def do_work(artifact, region, bucket):
|
|
Packit |
f0b94e |
session_args = {'region_name': region}
|
|
Packit |
f0b94e |
session = requests.Session()
|
|
Packit |
f0b94e |
retry = Retry(total=5, backoff_factor=0.1,
|
|
Packit |
f0b94e |
status_forcelist=[500, 502, 503, 504])
|
|
Packit |
f0b94e |
http_adapter = requests.adapters.HTTPAdapter(max_retries=retry)
|
|
Packit |
f0b94e |
session.mount('https://', http_adapter)
|
|
Packit |
f0b94e |
session.mount('http://', http_adapter)
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
if 'TASK_ID' in os.environ:
|
|
Packit |
f0b94e |
level = os.environ.get('MOZ_SCM_LEVEL', '1')
|
|
Packit |
f0b94e |
secrets_url = 'http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload'.format( # noqa
|
|
Packit |
f0b94e |
level)
|
|
Packit |
f0b94e |
log.info(
|
|
Packit |
f0b94e |
'Using AWS credentials from the secrets service: "{}"'.format(secrets_url))
|
|
Packit |
f0b94e |
res = session.get(secrets_url)
|
|
Packit |
f0b94e |
res.raise_for_status()
|
|
Packit |
f0b94e |
secret = res.json()
|
|
Packit |
f0b94e |
session_args.update(
|
|
Packit |
f0b94e |
aws_access_key_id=secret['secret']['AWS_ACCESS_KEY_ID'],
|
|
Packit |
f0b94e |
aws_secret_access_key=secret['secret']['AWS_SECRET_ACCESS_KEY'],
|
|
Packit |
f0b94e |
)
|
|
Packit |
f0b94e |
else:
|
|
Packit |
f0b94e |
log.info('Trying to use your AWS credentials..')
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
# First, fetch the artifact containing the sources.
|
|
Packit |
f0b94e |
log.info('Fetching generated sources artifact: "{}"'.format(artifact))
|
|
Packit |
f0b94e |
with timed() as elapsed:
|
|
Packit |
f0b94e |
res = session.get(artifact)
|
|
Packit |
f0b94e |
log.info('Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s'.format(
|
|
Packit |
f0b94e |
res.status_code, len(res.content), elapsed()))
|
|
Packit |
f0b94e |
res.raise_for_status()
|
|
Packit |
f0b94e |
# Create a queue and worker threads for uploading.
|
|
Packit |
f0b94e |
q = Queue()
|
|
Packit |
f0b94e |
event = Event()
|
|
Packit |
f0b94e |
log.info('Creating {} worker threads'.format(NUM_WORKER_THREADS))
|
|
Packit |
f0b94e |
for i in range(NUM_WORKER_THREADS):
|
|
Packit |
f0b94e |
t = Thread(target=upload_worker, args=(q, event, bucket, session_args))
|
|
Packit |
f0b94e |
t.daemon = True
|
|
Packit |
f0b94e |
t.start()
|
|
Packit |
f0b94e |
with tarfile.open(fileobj=io.BytesIO(res.content), mode='r|gz') as tar:
|
|
Packit |
f0b94e |
# Next, process each file.
|
|
Packit |
f0b94e |
for entry in tar:
|
|
Packit |
f0b94e |
if event.is_set():
|
|
Packit |
f0b94e |
break
|
|
Packit |
f0b94e |
log.info('Queueing "{}"'.format(entry.name))
|
|
Packit |
f0b94e |
q.put((entry.name, tar.extractfile(entry).read()))
|
|
Packit |
f0b94e |
# Wait until all uploads are finished.
|
|
Packit |
f0b94e |
# We don't use q.join() here because we want to also monitor event.
|
|
Packit |
f0b94e |
while q.unfinished_tasks:
|
|
Packit |
f0b94e |
if event.wait(0.1):
|
|
Packit |
f0b94e |
log.error('Worker thread encountered exception, exiting...')
|
|
Packit |
f0b94e |
break
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
def main(argv):
|
|
Packit |
f0b94e |
logging.basicConfig(format='%(levelname)s - %(threadName)s - %(message)s')
|
|
Packit |
f0b94e |
parser = argparse.ArgumentParser(
|
|
Packit |
f0b94e |
description='Upload generated source files in ARTIFACT to BUCKET in S3.')
|
|
Packit |
f0b94e |
parser.add_argument('artifact',
|
|
Packit |
f0b94e |
help='generated-sources artifact from build task')
|
|
Packit |
f0b94e |
args = parser.parse_args(argv)
|
|
Packit |
f0b94e |
region, bucket = get_s3_region_and_bucket()
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
config = MozbuildObject.from_environment()
|
|
Packit |
f0b94e |
config._activate_virtualenv()
|
|
Packit |
f0b94e |
config.virtualenv_manager.install_pip_package('boto3==1.4.4')
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
with timed() as elapsed:
|
|
Packit |
f0b94e |
do_work(region=region, bucket=bucket, artifact=args.artifact)
|
|
Packit |
f0b94e |
log.info('Finished in {:.03f}s'.format(elapsed()))
|
|
Packit |
f0b94e |
return 0
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
|
|
Packit |
f0b94e |
if __name__ == '__main__':
|
|
Packit |
f0b94e |
sys.exit(main(sys.argv[1:]))
|