Blame internal/worker/server.go

Packit 63bb0d
package worker
Packit 63bb0d
Packit 63bb0d
import (
Packit 63bb0d
	"encoding/json"
Packit 63bb0d
	"fmt"
Packit 63bb0d
	"io"
Packit 63bb0d
	"io/ioutil"
Packit 63bb0d
	"log"
Packit 63bb0d
	"net"
Packit 63bb0d
	"net/http"
Packit 63bb0d
	"os"
Packit 63bb0d
	"path"
Packit 63bb0d
	"time"
Packit 63bb0d
Packit 63bb0d
	"github.com/google/uuid"
Packit 63bb0d
	"github.com/julienschmidt/httprouter"
Packit 63bb0d
Packit 63bb0d
	"github.com/osbuild/osbuild-composer/internal/common"
Packit 63bb0d
	"github.com/osbuild/osbuild-composer/internal/distro"
Packit 63bb0d
	"github.com/osbuild/osbuild-composer/internal/jobqueue"
Packit 63bb0d
	"github.com/osbuild/osbuild-composer/internal/target"
Packit 63bb0d
)
Packit 63bb0d
Packit 63bb0d
type Server struct {
Packit 63bb0d
	logger       *log.Logger
Packit 63bb0d
	jobs         jobqueue.JobQueue
Packit 63bb0d
	router       *httprouter.Router
Packit 63bb0d
	artifactsDir string
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
type JobStatus struct {
Packit 63bb0d
	State    common.ComposeState
Packit 63bb0d
	Queued   time.Time
Packit 63bb0d
	Started  time.Time
Packit 63bb0d
	Finished time.Time
Packit 63bb0d
	Canceled bool
Packit 63bb0d
	Result   OSBuildJobResult
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string) *Server {
Packit 63bb0d
	s := &Server{
Packit 63bb0d
		logger:       logger,
Packit 63bb0d
		jobs:         jobs,
Packit 63bb0d
		artifactsDir: artifactsDir,
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	s.router = httprouter.New()
Packit 63bb0d
	s.router.RedirectTrailingSlash = false
Packit 63bb0d
	s.router.RedirectFixedPath = false
Packit 63bb0d
	s.router.MethodNotAllowed = http.HandlerFunc(methodNotAllowedHandler)
Packit 63bb0d
	s.router.NotFound = http.HandlerFunc(notFoundHandler)
Packit 63bb0d
Packit 63bb0d
	// Add a basic status handler for checking if osbuild-composer is alive.
Packit 63bb0d
	s.router.GET("/status", s.statusHandler)
Packit 63bb0d
Packit 63bb0d
	// Add handlers for managing jobs.
Packit 63bb0d
	s.router.POST("/job-queue/v1/jobs", s.addJobHandler)
Packit 63bb0d
	s.router.GET("/job-queue/v1/jobs/:job_id", s.jobHandler)
Packit 63bb0d
	s.router.PATCH("/job-queue/v1/jobs/:job_id", s.updateJobHandler)
Packit 63bb0d
	s.router.POST("/job-queue/v1/jobs/:job_id/artifacts/:name", s.addJobImageHandler)
Packit 63bb0d
Packit 63bb0d
	return s
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *Server) Serve(listener net.Listener) error {
Packit 63bb0d
	server := http.Server{Handler: s}
Packit 63bb0d
Packit 63bb0d
	err := server.Serve(listener)
Packit 63bb0d
	if err != nil && err != http.ErrServerClosed {
Packit 63bb0d
		return err
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	return nil
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
Packit 63bb0d
	if s.logger != nil {
Packit 63bb0d
		log.Println(request.Method, request.URL.Path)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	writer.Header().Set("Content-Type", "application/json; charset=utf-8")
Packit 63bb0d
	s.router.ServeHTTP(writer, request)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *Server) Enqueue(manifest distro.Manifest, targets []*target.Target) (uuid.UUID, error) {
Packit 63bb0d
	job := OSBuildJob{
Packit 63bb0d
		Manifest: manifest,
Packit 63bb0d
		Targets:  targets,
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	return s.jobs.Enqueue("osbuild", job, nil)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *Server) JobStatus(id uuid.UUID) (*JobStatus, error) {
Packit 63bb0d
	var canceled bool
Packit 63bb0d
	var result OSBuildJobResult
Packit 63bb0d
Packit 63bb0d
	queued, started, finished, canceled, err := s.jobs.JobStatus(id, &result)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return nil, err
Packit 63bb0d
	}
Packit 63bb0d
	state := common.CWaiting
Packit 63bb0d
	if canceled {
Packit 63bb0d
		state = common.CFailed
Packit 63bb0d
	} else if !finished.IsZero() {
Packit 63bb0d
		if result.OSBuildOutput != nil && result.OSBuildOutput.Success {
Packit 63bb0d
			state = common.CFinished
Packit 63bb0d
		} else {
Packit 63bb0d
			state = common.CFailed
Packit 63bb0d
		}
Packit 63bb0d
	} else if !started.IsZero() {
Packit 63bb0d
		state = common.CRunning
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	return &JobStatus{
Packit 63bb0d
		State:    state,
Packit 63bb0d
		Queued:   queued,
Packit 63bb0d
		Started:  started,
Packit 63bb0d
		Finished: finished,
Packit 63bb0d
		Canceled: canceled,
Packit 63bb0d
		Result:   result,
Packit 63bb0d
	}, nil
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *Server) Cancel(id uuid.UUID) error {
Packit 63bb0d
	return s.jobs.CancelJob(id)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// Provides access to artifacts of a job. Returns an io.Reader for the artifact
Packit 63bb0d
// and the artifact's size.
Packit 63bb0d
func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) {
Packit 63bb0d
	status, err := s.JobStatus(id)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return nil, 0, err
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	if status.Finished.IsZero() {
Packit 63bb0d
		return nil, 0, fmt.Errorf("Cannot access artifacts before job is finished: %s", id)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	p := path.Join(s.artifactsDir, id.String(), name)
Packit 63bb0d
	f, err := os.Open(p)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return nil, 0, fmt.Errorf("Error accessing artifact %s for job %s: %v", name, id, err)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	info, err := f.Stat()
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return nil, 0, fmt.Errorf("Error getting size of artifact %s for job %s: %v", name, id, err)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	return f, info.Size(), nil
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// Deletes all artifacts for job `id`.
Packit 63bb0d
func (s *Server) DeleteArtifacts(id uuid.UUID) error {
Packit 63bb0d
	status, err := s.JobStatus(id)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return err
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	if status.Finished.IsZero() {
Packit 63bb0d
		return fmt.Errorf("Cannot delete artifacts before job is finished: %s", id)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	return os.RemoveAll(path.Join(s.artifactsDir, id.String()))
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// jsonErrorf() is similar to http.Error(), but returns the message in a json
Packit 63bb0d
// object with a "message" field.
Packit 63bb0d
func jsonErrorf(writer http.ResponseWriter, code int, message string, args ...interface{}) {
Packit 63bb0d
	writer.WriteHeader(code)
Packit 63bb0d
Packit 63bb0d
	// ignore error, because we cannot do anything useful with it
Packit 63bb0d
	_ = json.NewEncoder(writer).Encode(&errorResponse{
Packit 63bb0d
		Message: fmt.Sprintf(message, args...),
Packit 63bb0d
	})
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func methodNotAllowedHandler(writer http.ResponseWriter, request *http.Request) {
Packit 63bb0d
	jsonErrorf(writer, http.StatusMethodNotAllowed, "method not allowed")
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func notFoundHandler(writer http.ResponseWriter, request *http.Request) {
Packit 63bb0d
	jsonErrorf(writer, http.StatusNotFound, "not found")
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *Server) statusHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
Packit 63bb0d
	writer.WriteHeader(http.StatusOK)
Packit 63bb0d
Packit 63bb0d
	// Send back a status message.
Packit 63bb0d
	_ = json.NewEncoder(writer).Encode(&statusResponse{
Packit 63bb0d
		Status: "OK",
Packit 63bb0d
	})
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *Server) jobHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
Packit 63bb0d
	id, err := uuid.Parse(params.ByName("job_id"))
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		jsonErrorf(writer, http.StatusBadRequest, "cannot parse compose id: %v", err)
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	status, err := s.JobStatus(id)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		switch err {
Packit 63bb0d
		case jobqueue.ErrNotExist:
Packit 63bb0d
			jsonErrorf(writer, http.StatusNotFound, "job does not exist: %s", id)
Packit 63bb0d
		default:
Packit 63bb0d
			jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
Packit 63bb0d
		}
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	_ = json.NewEncoder(writer).Encode(jobResponse{
Packit 63bb0d
		Id:       id,
Packit 63bb0d
		Canceled: status.Canceled,
Packit 63bb0d
	})
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *Server) addJobHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
Packit 63bb0d
	contentType := request.Header["Content-Type"]
Packit 63bb0d
	if len(contentType) != 1 || contentType[0] != "application/json" {
Packit 63bb0d
		jsonErrorf(writer, http.StatusUnsupportedMediaType, "request must contain application/json data")
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	var body addJobRequest
Packit 63bb0d
	err := json.NewDecoder(request.Body).Decode(&body)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		jsonErrorf(writer, http.StatusBadRequest, "%v", err)
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	var job OSBuildJob
Packit 63bb0d
	id, err := s.jobs.Dequeue(request.Context(), []string{"osbuild"}, &job)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	writer.WriteHeader(http.StatusCreated)
Packit 63bb0d
	// FIXME: handle or comment this possible error
Packit 63bb0d
	_ = json.NewEncoder(writer).Encode(addJobResponse{
Packit 63bb0d
		Id:       id,
Packit 63bb0d
		Manifest: job.Manifest,
Packit 63bb0d
		Targets:  job.Targets,
Packit 63bb0d
	})
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *Server) updateJobHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
Packit 63bb0d
	contentType := request.Header["Content-Type"]
Packit 63bb0d
	if len(contentType) != 1 || contentType[0] != "application/json" {
Packit 63bb0d
		jsonErrorf(writer, http.StatusUnsupportedMediaType, "request must contain application/json data")
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	id, err := uuid.Parse(params.ByName("job_id"))
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		jsonErrorf(writer, http.StatusBadRequest, "cannot parse compose id: %v", err)
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	var body updateJobRequest
Packit 63bb0d
	err = json.NewDecoder(request.Body).Decode(&body)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		jsonErrorf(writer, http.StatusBadRequest, "cannot parse request body: %v", err)
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	// The jobqueue doesn't support setting the status before a job is
Packit 63bb0d
	// finished. This branch should never be hit, because the worker
Packit 63bb0d
	// doesn't attempt this. Change the API to remove this awkwardness.
Packit 63bb0d
	if body.Status != common.IBFinished && body.Status != common.IBFailed {
Packit 63bb0d
		jsonErrorf(writer, http.StatusBadRequest, "setting status of a job to waiting or running is not supported")
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	err = s.jobs.FinishJob(id, OSBuildJobResult{OSBuildOutput: body.Result})
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		switch err {
Packit 63bb0d
		case jobqueue.ErrNotExist:
Packit 63bb0d
			jsonErrorf(writer, http.StatusNotFound, "job does not exist: %s", id)
Packit 63bb0d
		case jobqueue.ErrNotRunning:
Packit 63bb0d
			jsonErrorf(writer, http.StatusBadRequest, "job is not running: %s", id)
Packit 63bb0d
		default:
Packit 63bb0d
			jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
Packit 63bb0d
		}
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	_ = json.NewEncoder(writer).Encode(updateJobResponse{})
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *Server) addJobImageHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
Packit 63bb0d
	id, err := uuid.Parse(params.ByName("job_id"))
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		jsonErrorf(writer, http.StatusBadRequest, "cannot parse compose id: %v", err)
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	name := params.ByName("name")
Packit 63bb0d
	if name == "" {
Packit 63bb0d
		jsonErrorf(writer, http.StatusBadRequest, "invalid artifact name")
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	if s.artifactsDir == "" {
Packit 63bb0d
		_, err := io.Copy(ioutil.Discard, request.Body)
Packit 63bb0d
		if err != nil {
Packit 63bb0d
			jsonErrorf(writer, http.StatusInternalServerError, "error discarding artifact: %v", err)
Packit 63bb0d
		}
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	err = os.Mkdir(path.Join(s.artifactsDir, id.String()), 0700)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		jsonErrorf(writer, http.StatusInternalServerError, "cannot create artifact directory: %v", err)
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	f, err := os.Create(path.Join(s.artifactsDir, id.String(), name))
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		jsonErrorf(writer, http.StatusInternalServerError, "cannot create artifact file: %v", err)
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	_, err = io.Copy(f, request.Body)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		jsonErrorf(writer, http.StatusInternalServerError, "error writing artifact file: %v", err)
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
}