|
Packit |
63bb0d |
package worker
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
import (
|
|
Packit |
63bb0d |
"encoding/json"
|
|
Packit |
63bb0d |
"fmt"
|
|
Packit |
63bb0d |
"io"
|
|
Packit |
63bb0d |
"io/ioutil"
|
|
Packit |
63bb0d |
"log"
|
|
Packit |
63bb0d |
"net"
|
|
Packit |
63bb0d |
"net/http"
|
|
Packit |
63bb0d |
"os"
|
|
Packit |
63bb0d |
"path"
|
|
Packit |
63bb0d |
"time"
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
"github.com/google/uuid"
|
|
Packit |
63bb0d |
"github.com/julienschmidt/httprouter"
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
"github.com/osbuild/osbuild-composer/internal/common"
|
|
Packit |
63bb0d |
"github.com/osbuild/osbuild-composer/internal/distro"
|
|
Packit |
63bb0d |
"github.com/osbuild/osbuild-composer/internal/jobqueue"
|
|
Packit |
63bb0d |
"github.com/osbuild/osbuild-composer/internal/target"
|
|
Packit |
63bb0d |
)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
type Server struct {
|
|
Packit |
63bb0d |
logger *log.Logger
|
|
Packit |
63bb0d |
jobs jobqueue.JobQueue
|
|
Packit |
63bb0d |
router *httprouter.Router
|
|
Packit |
63bb0d |
artifactsDir string
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
type JobStatus struct {
|
|
Packit |
63bb0d |
State common.ComposeState
|
|
Packit |
63bb0d |
Queued time.Time
|
|
Packit |
63bb0d |
Started time.Time
|
|
Packit |
63bb0d |
Finished time.Time
|
|
Packit |
63bb0d |
Canceled bool
|
|
Packit |
63bb0d |
Result OSBuildJobResult
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string) *Server {
|
|
Packit |
63bb0d |
s := &Server{
|
|
Packit |
63bb0d |
logger: logger,
|
|
Packit |
63bb0d |
jobs: jobs,
|
|
Packit |
63bb0d |
artifactsDir: artifactsDir,
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
s.router = httprouter.New()
|
|
Packit |
63bb0d |
s.router.RedirectTrailingSlash = false
|
|
Packit |
63bb0d |
s.router.RedirectFixedPath = false
|
|
Packit |
63bb0d |
s.router.MethodNotAllowed = http.HandlerFunc(methodNotAllowedHandler)
|
|
Packit |
63bb0d |
s.router.NotFound = http.HandlerFunc(notFoundHandler)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Add a basic status handler for checking if osbuild-composer is alive.
|
|
Packit |
63bb0d |
s.router.GET("/status", s.statusHandler)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Add handlers for managing jobs.
|
|
Packit |
63bb0d |
s.router.POST("/job-queue/v1/jobs", s.addJobHandler)
|
|
Packit |
63bb0d |
s.router.GET("/job-queue/v1/jobs/:job_id", s.jobHandler)
|
|
Packit |
63bb0d |
s.router.PATCH("/job-queue/v1/jobs/:job_id", s.updateJobHandler)
|
|
Packit |
63bb0d |
s.router.POST("/job-queue/v1/jobs/:job_id/artifacts/:name", s.addJobImageHandler)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return s
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (s *Server) Serve(listener net.Listener) error {
|
|
Packit |
63bb0d |
server := http.Server{Handler: s}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
err := server.Serve(listener)
|
|
Packit |
63bb0d |
if err != nil && err != http.ErrServerClosed {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
|
|
Packit |
63bb0d |
if s.logger != nil {
|
|
Packit |
63bb0d |
log.Println(request.Method, request.URL.Path)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
Packit |
63bb0d |
s.router.ServeHTTP(writer, request)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (s *Server) Enqueue(manifest distro.Manifest, targets []*target.Target) (uuid.UUID, error) {
|
|
Packit |
63bb0d |
job := OSBuildJob{
|
|
Packit |
63bb0d |
Manifest: manifest,
|
|
Packit |
63bb0d |
Targets: targets,
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return s.jobs.Enqueue("osbuild", job, nil)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (s *Server) JobStatus(id uuid.UUID) (*JobStatus, error) {
|
|
Packit |
63bb0d |
var canceled bool
|
|
Packit |
63bb0d |
var result OSBuildJobResult
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
queued, started, finished, canceled, err := s.jobs.JobStatus(id, &result)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return nil, err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
state := common.CWaiting
|
|
Packit |
63bb0d |
if canceled {
|
|
Packit |
63bb0d |
state = common.CFailed
|
|
Packit |
63bb0d |
} else if !finished.IsZero() {
|
|
Packit |
63bb0d |
if result.OSBuildOutput != nil && result.OSBuildOutput.Success {
|
|
Packit |
63bb0d |
state = common.CFinished
|
|
Packit |
63bb0d |
} else {
|
|
Packit |
63bb0d |
state = common.CFailed
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
} else if !started.IsZero() {
|
|
Packit |
63bb0d |
state = common.CRunning
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return &JobStatus{
|
|
Packit |
63bb0d |
State: state,
|
|
Packit |
63bb0d |
Queued: queued,
|
|
Packit |
63bb0d |
Started: started,
|
|
Packit |
63bb0d |
Finished: finished,
|
|
Packit |
63bb0d |
Canceled: canceled,
|
|
Packit |
63bb0d |
Result: result,
|
|
Packit |
63bb0d |
}, nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (s *Server) Cancel(id uuid.UUID) error {
|
|
Packit |
63bb0d |
return s.jobs.CancelJob(id)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Provides access to artifacts of a job. Returns an io.Reader for the artifact
|
|
Packit |
63bb0d |
// and the artifact's size.
|
|
Packit |
63bb0d |
func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) {
|
|
Packit |
63bb0d |
status, err := s.JobStatus(id)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return nil, 0, err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if status.Finished.IsZero() {
|
|
Packit |
63bb0d |
return nil, 0, fmt.Errorf("Cannot access artifacts before job is finished: %s", id)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
p := path.Join(s.artifactsDir, id.String(), name)
|
|
Packit |
63bb0d |
f, err := os.Open(p)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return nil, 0, fmt.Errorf("Error accessing artifact %s for job %s: %v", name, id, err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
info, err := f.Stat()
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return nil, 0, fmt.Errorf("Error getting size of artifact %s for job %s: %v", name, id, err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return f, info.Size(), nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Deletes all artifacts for job `id`.
|
|
Packit |
63bb0d |
func (s *Server) DeleteArtifacts(id uuid.UUID) error {
|
|
Packit |
63bb0d |
status, err := s.JobStatus(id)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if status.Finished.IsZero() {
|
|
Packit |
63bb0d |
return fmt.Errorf("Cannot delete artifacts before job is finished: %s", id)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return os.RemoveAll(path.Join(s.artifactsDir, id.String()))
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// jsonErrorf() is similar to http.Error(), but returns the message in a json
|
|
Packit |
63bb0d |
// object with a "message" field.
|
|
Packit |
63bb0d |
func jsonErrorf(writer http.ResponseWriter, code int, message string, args ...interface{}) {
|
|
Packit |
63bb0d |
writer.WriteHeader(code)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// ignore error, because we cannot do anything useful with it
|
|
Packit |
63bb0d |
_ = json.NewEncoder(writer).Encode(&errorResponse{
|
|
Packit |
63bb0d |
Message: fmt.Sprintf(message, args...),
|
|
Packit |
63bb0d |
})
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func methodNotAllowedHandler(writer http.ResponseWriter, request *http.Request) {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusMethodNotAllowed, "method not allowed")
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func notFoundHandler(writer http.ResponseWriter, request *http.Request) {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusNotFound, "not found")
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (s *Server) statusHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
|
|
Packit |
63bb0d |
writer.WriteHeader(http.StatusOK)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Send back a status message.
|
|
Packit |
63bb0d |
_ = json.NewEncoder(writer).Encode(&statusResponse{
|
|
Packit |
63bb0d |
Status: "OK",
|
|
Packit |
63bb0d |
})
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (s *Server) jobHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
|
|
Packit |
63bb0d |
id, err := uuid.Parse(params.ByName("job_id"))
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusBadRequest, "cannot parse compose id: %v", err)
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
status, err := s.JobStatus(id)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
switch err {
|
|
Packit |
63bb0d |
case jobqueue.ErrNotExist:
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusNotFound, "job does not exist: %s", id)
|
|
Packit |
63bb0d |
default:
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
_ = json.NewEncoder(writer).Encode(jobResponse{
|
|
Packit |
63bb0d |
Id: id,
|
|
Packit |
63bb0d |
Canceled: status.Canceled,
|
|
Packit |
63bb0d |
})
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (s *Server) addJobHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
|
|
Packit |
63bb0d |
contentType := request.Header["Content-Type"]
|
|
Packit |
63bb0d |
if len(contentType) != 1 || contentType[0] != "application/json" {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusUnsupportedMediaType, "request must contain application/json data")
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
var body addJobRequest
|
|
Packit |
63bb0d |
err := json.NewDecoder(request.Body).Decode(&body)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusBadRequest, "%v", err)
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
var job OSBuildJob
|
|
Packit |
63bb0d |
id, err := s.jobs.Dequeue(request.Context(), []string{"osbuild"}, &job)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
writer.WriteHeader(http.StatusCreated)
|
|
Packit |
63bb0d |
// FIXME: handle or comment this possible error
|
|
Packit |
63bb0d |
_ = json.NewEncoder(writer).Encode(addJobResponse{
|
|
Packit |
63bb0d |
Id: id,
|
|
Packit |
63bb0d |
Manifest: job.Manifest,
|
|
Packit |
63bb0d |
Targets: job.Targets,
|
|
Packit |
63bb0d |
})
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (s *Server) updateJobHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
|
|
Packit |
63bb0d |
contentType := request.Header["Content-Type"]
|
|
Packit |
63bb0d |
if len(contentType) != 1 || contentType[0] != "application/json" {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusUnsupportedMediaType, "request must contain application/json data")
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
id, err := uuid.Parse(params.ByName("job_id"))
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusBadRequest, "cannot parse compose id: %v", err)
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
var body updateJobRequest
|
|
Packit |
63bb0d |
err = json.NewDecoder(request.Body).Decode(&body)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusBadRequest, "cannot parse request body: %v", err)
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// The jobqueue doesn't support setting the status before a job is
|
|
Packit |
63bb0d |
// finished. This branch should never be hit, because the worker
|
|
Packit |
63bb0d |
// doesn't attempt this. Change the API to remove this awkwardness.
|
|
Packit |
63bb0d |
if body.Status != common.IBFinished && body.Status != common.IBFailed {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusBadRequest, "setting status of a job to waiting or running is not supported")
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
err = s.jobs.FinishJob(id, OSBuildJobResult{OSBuildOutput: body.Result})
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
switch err {
|
|
Packit |
63bb0d |
case jobqueue.ErrNotExist:
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusNotFound, "job does not exist: %s", id)
|
|
Packit |
63bb0d |
case jobqueue.ErrNotRunning:
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusBadRequest, "job is not running: %s", id)
|
|
Packit |
63bb0d |
default:
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
_ = json.NewEncoder(writer).Encode(updateJobResponse{})
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (s *Server) addJobImageHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
|
|
Packit |
63bb0d |
id, err := uuid.Parse(params.ByName("job_id"))
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusBadRequest, "cannot parse compose id: %v", err)
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
name := params.ByName("name")
|
|
Packit |
63bb0d |
if name == "" {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusBadRequest, "invalid artifact name")
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if s.artifactsDir == "" {
|
|
Packit |
63bb0d |
_, err := io.Copy(ioutil.Discard, request.Body)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusInternalServerError, "error discarding artifact: %v", err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
err = os.Mkdir(path.Join(s.artifactsDir, id.String()), 0700)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusInternalServerError, "cannot create artifact directory: %v", err)
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
f, err := os.Create(path.Join(s.artifactsDir, id.String(), name))
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusInternalServerError, "cannot create artifact file: %v", err)
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
_, err = io.Copy(f, request.Body)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
jsonErrorf(writer, http.StatusInternalServerError, "error writing artifact file: %v", err)
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|