package main
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"os/exec"
"path"
"time"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/upload/awsupload"
"github.com/osbuild/osbuild-composer/internal/upload/azure"
"github.com/osbuild/osbuild-composer/internal/worker"
)
type connectionConfig struct {
CACertFile string
ClientKeyFile string
ClientCertFile string
}
func createTLSConfig(config *connectionConfig) (*tls.Config, error) {
caCertPEM, err := ioutil.ReadFile(config.CACertFile)
if err != nil {
return nil, err
}
roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM(caCertPEM)
if !ok {
return nil, errors.New("failed to append root certificate")
}
cert, err := tls.LoadX509KeyPair(config.ClientCertFile, config.ClientKeyFile)
if err != nil {
return nil, err
}
return &tls.Config{
RootCAs: roots,
Certificates: []tls.Certificate{cert},
}, nil
}
type TargetsError struct {
Errors []error
}
func (e *TargetsError) Error() string {
errString := fmt.Sprintf("%d target(s) errored:\n", len(e.Errors))
for _, err := range e.Errors {
errString += err.Error() + "\n"
}
return errString
}
func openAsStreamOptimizedVmdk(imagePath string) (*os.File, error) {
newPath := imagePath + ".stream"
cmd := exec.Command(
"/usr/bin/qemu-img", "convert", "-O", "vmdk", "-o", "subformat=streamOptimized",
imagePath, newPath)
err := cmd.Run()
if err != nil {
return nil, err
}
f, err := os.Open(newPath)
if err != nil {
return nil, err
}
err = os.Remove(newPath)
if err != nil {
return nil, err
}
return f, err
}
func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io.Reader) error) (*common.ComposeResult, error) {
outputDirectory, err := ioutil.TempDir("/var/tmp", "osbuild-worker-*")
if err != nil {
return nil, fmt.Errorf("error creating temporary output directory: %v", err)
}
defer func() {
err := os.RemoveAll(outputDirectory)
if err != nil {
log.Printf("Error removing temporary output directory (%s): %v", outputDirectory, err)
}
}()
result, err := RunOSBuild(job.Manifest, store, outputDirectory, os.Stderr)
if err != nil {
return nil, err
}
var r []error
for _, t := range job.Targets {
switch options := t.Options.(type) {
case *target.LocalTargetOptions:
var f *os.File
imagePath := path.Join(outputDirectory, options.Filename)
if options.StreamOptimized {
f, err = openAsStreamOptimizedVmdk(imagePath)
if err != nil {
r = append(r, err)
continue
}
} else {
f, err = os.Open(imagePath)
if err != nil {
r = append(r, err)
continue
}
}
err = uploadFunc(job.Id, options.Filename, f)
if err != nil {
r = append(r, err)
continue
}
case *target.AWSTargetOptions:
a, err := awsupload.New(options.Region, options.AccessKeyID, options.SecretAccessKey)
if err != nil {
r = append(r, err)
continue
}
if options.Key == "" {
options.Key = job.Id.String()
}
_, err = a.Upload(path.Join(outputDirectory, options.Filename), options.Bucket, options.Key)
if err != nil {
r = append(r, err)
continue
}
/* TODO: communicate back the AMI */
_, err = a.Register(t.ImageName, options.Bucket, options.Key)
if err != nil {
r = append(r, err)
continue
}
case *target.AzureTargetOptions:
credentials := azure.Credentials{
StorageAccount: options.StorageAccount,
StorageAccessKey: options.StorageAccessKey,
}
metadata := azure.ImageMetadata{
ContainerName: options.Container,
ImageName: t.ImageName,
}
const azureMaxUploadGoroutines = 4
err := azure.UploadImage(
credentials,
metadata,
path.Join(outputDirectory, options.Filename),
azureMaxUploadGoroutines,
)
if err != nil {
r = append(r, err)
continue
}
default:
r = append(r, fmt.Errorf("invalid target type"))
}
}
err = os.RemoveAll(outputDirectory)
if err != nil {
log.Printf("Error removing osbuild output directory (%s): %v", outputDirectory, err)
}
if len(r) > 0 {
return result, &TargetsError{r}
}
return result, nil
}
// Regularly ask osbuild-composer if the compose we're currently working on was
// canceled and exit the process if it was.
// It would be cleaner to kill the osbuild process using (`exec.CommandContext`
// or similar), but osbuild does not currently support this. Exiting here will
// make systemd clean up the whole cgroup and restart this service.
func WatchJob(ctx context.Context, client *worker.Client, job *worker.Job) {
for {
select {
case <-time.After(15 * time.Second):
if client.JobCanceled(job) {
log.Println("Job was canceled. Exiting.")
os.Exit(0)
}
case <-ctx.Done():
return
}
}
}
func main() {
var unix bool
flag.BoolVar(&unix, "unix", false, "Interpret 'address' as a path to a unix domain socket instead of a network address")
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), "Usage: %s [-unix] address\n", os.Args[0])
flag.PrintDefaults()
os.Exit(0)
}
flag.Parse()
address := flag.Arg(0)
if address == "" {
flag.Usage()
}
cacheDirectory, ok := os.LookupEnv("CACHE_DIRECTORY")
if !ok {
log.Fatal("CACHE_DIRECTORY is not set. Is the service file missing CacheDirectory=?")
}
store := path.Join(cacheDirectory, "osbuild-store")
var client *worker.Client
if unix {
client = worker.NewClientUnix(address)
} else {
conf, err := createTLSConfig(&connectionConfig{
CACertFile: "/etc/osbuild-composer/ca-crt.pem",
ClientKeyFile: "/etc/osbuild-composer/worker-key.pem",
ClientCertFile: "/etc/osbuild-composer/worker-crt.pem",
})
if err != nil {
log.Fatalf("Error creating TLS config: %v", err)
}
client = worker.NewClient(address, conf)
}
for {
fmt.Println("Waiting for a new job...")
job, err := client.AddJob()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Running job %s\n", job.Id)
ctx, cancel := context.WithCancel(context.Background())
go WatchJob(ctx, client, job)
var status common.ImageBuildState
result, err := RunJob(job, store, client.UploadImage)
if err != nil {
log.Printf(" Job failed: %v", err)
status = common.IBFailed
// If the error comes from osbuild, retrieve the result
if osbuildError, ok := err.(*OSBuildError); ok {
result = osbuildError.Result
}
// Ensure we always have a non-nil result, composer doesn't like nils.
// This can happen in cases when OSBuild crashes and doesn't produce
// a meaningful output. E.g. when the machine runs of disk space.
if result == nil {
result = &common.ComposeResult{
Success: false,
}
}
// set the success to false on every error. This is hacky but composer
// currently relies only on this flag to decide whether a compose was
// successful. There's no different way how to inform composer that
// e.g. an upload fail. Therefore, this line reuses the osbuild success
// flag to indicate all error kinds.
result.Success = false
} else {
log.Printf(" 🎉 Job completed successfully: %s", job.Id)
status = common.IBFinished
}
// signal to WatchJob() that it can stop watching
cancel()
err = client.UpdateJob(job, status, result)
if err != nil {
log.Fatalf("Error reporting job result: %v", err)
}
}
}