Blame internal/upload/azure/azure.go

Packit 63bb0d
package azure
Packit 63bb0d
Packit 63bb0d
import (
Packit 63bb0d
	"bufio"
Packit 63bb0d
	"bytes"
Packit 63bb0d
	"context"
Packit 63bb0d
	"crypto/md5"
Packit 63bb0d
	"errors"
Packit 63bb0d
	"fmt"
Packit 63bb0d
	"io"
Packit 63bb0d
	"net/url"
Packit 63bb0d
	"os"
Packit 63bb0d
	"strings"
Packit 63bb0d
	"sync"
Packit 63bb0d
Packit 63bb0d
	"github.com/Azure/azure-storage-blob-go/azblob"
Packit 63bb0d
)
Packit 63bb0d
Packit 63bb0d
// Credentials contains credentials to connect to your account
Packit 63bb0d
// It uses so called "Client credentials", see the official documentation for more information:
Packit 63bb0d
// https://docs.microsoft.com/en-us/azure/go/azure-sdk-go-authorization#available-authentication-types-and-methods
Packit 63bb0d
type Credentials struct {
Packit 63bb0d
	StorageAccount   string
Packit 63bb0d
	StorageAccessKey string
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// ImageMetadata contains information needed to store the image in a proper place.
Packit 63bb0d
// In case of Azure cloud storage this includes container name and blob name.
Packit 63bb0d
type ImageMetadata struct {
Packit 63bb0d
	ContainerName string
Packit 63bb0d
	ImageName     string
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// UploadImage takes the metadata and credentials required to upload the image specified by `fileName`
Packit 63bb0d
// It can speed up the upload by using goroutines. The number of parallel goroutines is bounded by
Packit 63bb0d
// the `threads` argument.
Packit 63bb0d
func UploadImage(credentials Credentials, metadata ImageMetadata, fileName string, threads int) error {
Packit 63bb0d
	// Azure cannot create an image from a storage blob without .vhd extension
Packit 63bb0d
	if !strings.HasSuffix(metadata.ImageName, ".vhd") {
Packit 63bb0d
		metadata.ImageName = metadata.ImageName + ".vhd"
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	// Create a default request pipeline using your storage account name and account key.
Packit 63bb0d
	credential, err := azblob.NewSharedKeyCredential(credentials.StorageAccount, credentials.StorageAccessKey)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return fmt.Errorf("cannot create azure credentials: %v", err)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
Packit 63bb0d
Packit 63bb0d
	// get storage account blob service URL endpoint.
Packit 63bb0d
	URL, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net/%s", credentials.StorageAccount, metadata.ContainerName))
Packit 63bb0d
Packit 63bb0d
	// Create a ContainerURL object that wraps the container URL and a request
Packit 63bb0d
	// pipeline to make requests.
Packit 63bb0d
	containerURL := azblob.NewContainerURL(*URL, p)
Packit 63bb0d
Packit 63bb0d
	// Create the container, use a never-expiring context
Packit 63bb0d
	ctx := context.Background()
Packit 63bb0d
Packit 63bb0d
	// Open the image file for reading
Packit 63bb0d
	imageFile, err := os.Open(fileName)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return fmt.Errorf("cannot open the image: %v", err)
Packit 63bb0d
	}
Packit 63bb0d
	defer imageFile.Close()
Packit 63bb0d
Packit 63bb0d
	// Stat image to get the file size
Packit 63bb0d
	stat, err := imageFile.Stat()
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return fmt.Errorf("cannot stat the image: %v", err)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	// Hash the imageFile
Packit 63bb0d
	imageFileHash := md5.New()
Packit 63bb0d
	if _, err := io.Copy(imageFileHash, imageFile); err != nil {
Packit 63bb0d
		return fmt.Errorf("cannot create md5 of the image: %v", err)
Packit 63bb0d
	}
Packit 63bb0d
	// Move the cursor back to the start of the imageFile
Packit 63bb0d
	if _, err := imageFile.Seek(0, 0); err != nil {
Packit 63bb0d
		return fmt.Errorf("cannot seek the image: %v", err)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	// Create page blob URL. Page blob is required for VM images
Packit Service 3a6627
	blobURL := newPageBlobURL(containerURL, metadata.ImageName)
Packit 63bb0d
	_, err = blobURL.Create(ctx, stat.Size(), 0, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return fmt.Errorf("cannot create the blob URL: %v", err)
Packit 63bb0d
	}
Packit 63bb0d
	// Wrong MD5 does not seem to have any impact on the upload
Packit 63bb0d
	_, err = blobURL.SetHTTPHeaders(ctx, azblob.BlobHTTPHeaders{ContentMD5: imageFileHash.Sum(nil)}, azblob.BlobAccessConditions{})
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return fmt.Errorf("cannot set the HTTP headers on the blob URL: %v", err)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	// Create control variables
Packit 63bb0d
	// This channel simulates behavior of a semaphore and bounds the number of parallel threads
Packit 63bb0d
	var semaphore = make(chan int, threads)
Packit 63bb0d
	// Forward error from goroutine to the caller
Packit 63bb0d
	var errorInGoroutine = make(chan error, 1)
Packit 63bb0d
	var counter int64 = 0
Packit 63bb0d
Packit 63bb0d
	// Create buffered reader to speed up the upload
Packit 63bb0d
	reader := bufio.NewReader(imageFile)
Packit 63bb0d
	// Run the upload
Packit 63bb0d
	run := true
Packit 63bb0d
	var wg sync.WaitGroup
Packit 63bb0d
	for run {
Packit 63bb0d
		buffer := make([]byte, azblob.PageBlobMaxUploadPagesBytes)
Packit 63bb0d
		n, err := reader.Read(buffer)
Packit 63bb0d
		if err != nil {
Packit 63bb0d
			if err == io.EOF {
Packit 63bb0d
				run = false
Packit 63bb0d
			} else {
Packit 63bb0d
				return fmt.Errorf("reading the image failed: %v", err)
Packit 63bb0d
			}
Packit 63bb0d
		}
Packit 63bb0d
		if n == 0 {
Packit 63bb0d
			break
Packit 63bb0d
		}
Packit 63bb0d
		wg.Add(1)
Packit 63bb0d
		semaphore <- 1
Packit 63bb0d
		go func(counter int64, buffer []byte, n int) {
Packit 63bb0d
			defer wg.Done()
Packit 63bb0d
			_, err = blobURL.UploadPages(ctx, counter*azblob.PageBlobMaxUploadPagesBytes, bytes.NewReader(buffer[:n]), azblob.PageBlobAccessConditions{}, nil)
Packit 63bb0d
			if err != nil {
Packit 63bb0d
				err = fmt.Errorf("uploading a page failed: %v", err)
Packit 63bb0d
				// Send the error to the error channel in a non-blocking way. If there is already an error, just discard this one
Packit 63bb0d
				select {
Packit 63bb0d
				case errorInGoroutine <- err:
Packit 63bb0d
				default:
Packit 63bb0d
				}
Packit 63bb0d
			}
Packit 63bb0d
			<-semaphore
Packit 63bb0d
		}(counter, buffer, n)
Packit 63bb0d
		counter++
Packit 63bb0d
	}
Packit 63bb0d
	// Wait for all goroutines to finish
Packit 63bb0d
	wg.Wait()
Packit 63bb0d
	// Check any errors during the transmission using a nonblocking read from the channel
Packit 63bb0d
	select {
Packit 63bb0d
	case err := <-errorInGoroutine:
Packit 63bb0d
		return err
Packit 63bb0d
	default:
Packit 63bb0d
	}
Packit 63bb0d
	// Check properties, specifically MD5 sum of the blob
Packit 63bb0d
	props, err := blobURL.GetProperties(ctx, azblob.BlobAccessConditions{})
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return fmt.Errorf("getting the properties of the new blob failed: %v", err)
Packit 63bb0d
	}
Packit 63bb0d
	var blobChecksum []byte = props.ContentMD5()
Packit 63bb0d
	var fileChecksum []byte = imageFileHash.Sum(nil)
Packit 63bb0d
Packit 63bb0d
	if !bytes.Equal(blobChecksum, fileChecksum) {
Packit 63bb0d
		return errors.New("error during image upload. the image seems to be corrupted")
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	return nil
Packit 63bb0d
}