Blame internal/jobqueue/fsjobqueue/fsjobqueue.go

Packit 63bb0d
// Package fsjobqueue implements a filesystem-backed job queue. It implements
Packit 63bb0d
// the interfaces in package jobqueue.
Packit 63bb0d
//
Packit 63bb0d
// Jobs are stored in the file system, using the `jsondb` package. However,
Packit 63bb0d
// this package does not use the file system as a database, but keeps some
Packit 63bb0d
// state in memory. This means that access to a given directory must be
Packit 63bb0d
// exclusive to only one `fsJobQueue` object at a time. A single `fsJobQueue`
Packit 63bb0d
// can be safely accessed from multiple goroutines, though.
Packit 63bb0d
//
Packit 63bb0d
// Data is stored non-reduntantly. Any data structure necessary for efficient
Packit 63bb0d
// access (e.g., dependants) are kept in memory.
Packit 63bb0d
package fsjobqueue
Packit 63bb0d
Packit 63bb0d
import (
Packit 63bb0d
	"context"
Packit 63bb0d
	"encoding/json"
Packit 63bb0d
	"errors"
Packit 63bb0d
	"fmt"
Packit 63bb0d
	"reflect"
Packit 63bb0d
	"sync"
Packit 63bb0d
	"time"
Packit 63bb0d
Packit 63bb0d
	"github.com/google/uuid"
Packit 63bb0d
	"github.com/osbuild/osbuild-composer/internal/jobqueue"
Packit 63bb0d
	"github.com/osbuild/osbuild-composer/internal/jsondb"
Packit 63bb0d
)
Packit 63bb0d
Packit 63bb0d
type fsJobQueue struct {
Packit 63bb0d
	// Protects all fields of this struct. In particular, it ensures
Packit 63bb0d
	// transactions on `db` are atomic. All public functions except
Packit 63bb0d
	// JobStatus hold it while they're running. Dequeue() releases it
Packit 63bb0d
	// briefly while waiting on pending channels.
Packit 63bb0d
	mu sync.Mutex
Packit 63bb0d
Packit 63bb0d
	db *jsondb.JSONDatabase
Packit 63bb0d
Packit 63bb0d
	// Maps job types to channels of job ids for that type.
Packit 63bb0d
	pending map[string]chan uuid.UUID
Packit 63bb0d
Packit 63bb0d
	// Maps job ids to the jobs that depend on it, if any of those
Packit 63bb0d
	// dependants have not yet finished.
Packit 63bb0d
	dependants map[uuid.UUID][]uuid.UUID
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// On-disk job struct. Contains all necessary (but non-redundant) information
Packit 63bb0d
// about a job. These are not held in memory by the job queue, but
Packit 63bb0d
// (de)serialized on each access.
Packit 63bb0d
type job struct {
Packit 63bb0d
	Id           uuid.UUID       `json:"id"`
Packit 63bb0d
	Type         string          `json:"type"`
Packit 63bb0d
	Args         json.RawMessage `json:"args,omitempty"`
Packit 63bb0d
	Dependencies []uuid.UUID     `json:"dependencies"`
Packit 63bb0d
	Result       json.RawMessage `json:"result,omitempty"`
Packit 63bb0d
Packit 63bb0d
	QueuedAt   time.Time `json:"queued_at,omitempty"`
Packit 63bb0d
	StartedAt  time.Time `json:"started_at,omitempty"`
Packit 63bb0d
	FinishedAt time.Time `json:"finished_at,omitempty"`
Packit 63bb0d
Packit 63bb0d
	Canceled bool `json:"canceled,omitempty"`
Packit 63bb0d
}
Packit 63bb0d
Packit Service 509fd4
// The size of channels used in fsJobQueue for queueing jobs.
Packit Service 509fd4
// Note that each job type has its own queue.
Packit Service 509fd4
const channelSize = 100
Packit Service 509fd4
Packit 63bb0d
// Create a new fsJobQueue object for `dir`. This object must have exclusive
Packit 63bb0d
// access to `dir`. If `dir` contains jobs created from previous runs, they are
Packit 63bb0d
// loaded and rescheduled to run if necessary.
Packit Service 509fd4
func New(dir string) (*fsJobQueue, error) {
Packit 63bb0d
	q := &fsJobQueue{
Packit 63bb0d
		db:         jsondb.New(dir, 0600),
Packit 63bb0d
		pending:    make(map[string]chan uuid.UUID),
Packit 63bb0d
		dependants: make(map[uuid.UUID][]uuid.UUID),
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	// Look for jobs that are still pending and build the dependant map.
Packit 63bb0d
	ids, err := q.db.List()
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return nil, fmt.Errorf("error listing jobs: %v", err)
Packit 63bb0d
	}
Packit 63bb0d
	for _, id := range ids {
Packit 63bb0d
		uuid, err := uuid.Parse(id)
Packit 63bb0d
		if err != nil {
Packit 63bb0d
			return nil, fmt.Errorf("invalid job '%s' in db: %v", id, err)
Packit 63bb0d
		}
Packit 63bb0d
		j, err := q.readJob(uuid)
Packit 63bb0d
		if err != nil {
Packit 63bb0d
			return nil, err
Packit 63bb0d
		}
Packit 63bb0d
		err = q.maybeEnqueue(j, true)
Packit 63bb0d
		if err != nil {
Packit 63bb0d
			return nil, err
Packit 63bb0d
		}
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	return q, nil
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
Packit 63bb0d
	q.mu.Lock()
Packit 63bb0d
	defer q.mu.Unlock()
Packit 63bb0d
Packit 63bb0d
	var j = job{
Packit 63bb0d
		Id:           uuid.New(),
Packit 63bb0d
		Type:         jobType,
Packit Service 509fd4
		Dependencies: dependencies,
Packit 63bb0d
		QueuedAt:     time.Now(),
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	var err error
Packit 63bb0d
	j.Args, err = json.Marshal(args)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return uuid.Nil, fmt.Errorf("error marshaling job arguments: %v", err)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	// Verify dependendencies early, so that the job doesn't get written
Packit 63bb0d
	// when one of them doesn't exist.
Packit 63bb0d
	for _, d := range j.Dependencies {
Packit 63bb0d
		exists, err := q.db.Read(d.String(), nil)
Packit 63bb0d
		if err != nil {
Packit 63bb0d
			return uuid.Nil, err
Packit 63bb0d
		}
Packit 63bb0d
		if !exists {
Packit 63bb0d
			return uuid.Nil, jobqueue.ErrNotExist
Packit 63bb0d
		}
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	// Write the job before updating in-memory state, so that the latter
Packit 63bb0d
	// doesn't become corrupt when writing fails.
Packit 63bb0d
	err = q.db.Write(j.Id.String(), j)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return uuid.Nil, fmt.Errorf("cannot write job: %v:", err)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	err = q.maybeEnqueue(&j, true)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return uuid.Nil, err
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	return j.Id, nil
Packit 63bb0d
}
Packit 63bb0d
Packit Service 509fd4
func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
Packit 63bb0d
	q.mu.Lock()
Packit 63bb0d
	defer q.mu.Unlock()
Packit 63bb0d
Packit 63bb0d
	// Return early if the context is already canceled.
Packit 63bb0d
	if err := ctx.Err(); err != nil {
Packit Service 509fd4
		return uuid.Nil, nil, "", nil, err
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	// Filter q.pending by the `jobTypes`. Ignore those job types that this
Packit 63bb0d
	// queue doesn't accept.
Packit 63bb0d
	chans := []chan uuid.UUID{}
Packit 63bb0d
	for _, jt := range jobTypes {
Packit Service 509fd4
		c, exists := q.pending[jt]
Packit Service 509fd4
		if !exists {
Packit Service 509fd4
			c = make(chan uuid.UUID, channelSize)
Packit Service 509fd4
			q.pending[jt] = c
Packit 63bb0d
		}
Packit Service 509fd4
		chans = append(chans, c)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	// Loop until finding a non-canceled job.
Packit 63bb0d
	var j *job
Packit 63bb0d
	for {
Packit 63bb0d
		// Unlock the mutex while polling channels, so that multiple goroutines
Packit 63bb0d
		// can wait at the same time.
Packit 63bb0d
		q.mu.Unlock()
Packit 63bb0d
		id, err := selectUUIDChannel(ctx, chans)
Packit 63bb0d
		q.mu.Lock()
Packit 63bb0d
Packit Service 509fd4
		// Delete empty channels
Packit Service 509fd4
		for _, jt := range jobTypes {
Packit Service 509fd4
			c, exists := q.pending[jt]
Packit Service 509fd4
			if exists && len(c) == 0 {
Packit Service 509fd4
				close(c)
Packit Service 509fd4
				delete(q.pending, jt)
Packit Service 509fd4
			}
Packit Service 509fd4
		}
Packit Service 509fd4
Packit 63bb0d
		if err != nil {
Packit Service 509fd4
			return uuid.Nil, nil, "", nil, err
Packit 63bb0d
		}
Packit 63bb0d
Packit 63bb0d
		j, err = q.readJob(id)
Packit 63bb0d
		if err != nil {
Packit Service 509fd4
			return uuid.Nil, nil, "", nil, err
Packit 63bb0d
		}
Packit 63bb0d
Packit 63bb0d
		if !j.Canceled {
Packit 63bb0d
			break
Packit 63bb0d
		}
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	j.StartedAt = time.Now()
Packit 63bb0d
Packit Service 509fd4
	err := q.db.Write(j.Id.String(), j)
Packit 63bb0d
	if err != nil {
Packit Service 509fd4
		return uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
Packit 63bb0d
	}
Packit 63bb0d
Packit Service 509fd4
	return j.Id, j.Dependencies, j.Type, j.Args, nil
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
Packit 63bb0d
	q.mu.Lock()
Packit 63bb0d
	defer q.mu.Unlock()
Packit 63bb0d
Packit 63bb0d
	j, err := q.readJob(id)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return err
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	if j.Canceled {
Packit 63bb0d
		return jobqueue.ErrCanceled
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
Packit 63bb0d
		return jobqueue.ErrNotRunning
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	j.FinishedAt = time.Now()
Packit 63bb0d
Packit 63bb0d
	j.Result, err = json.Marshal(result)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return fmt.Errorf("error marshaling result: %v", err)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	// Write before notifying dependants, because it will be read again.
Packit 63bb0d
	err = q.db.Write(id.String(), j)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return fmt.Errorf("error writing job %s: %v", id, err)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	for _, depid := range q.dependants[id] {
Packit 63bb0d
		dep, err := q.readJob(depid)
Packit 63bb0d
		if err != nil {
Packit 63bb0d
			return err
Packit 63bb0d
		}
Packit 63bb0d
		err = q.maybeEnqueue(dep, false)
Packit 63bb0d
		if err != nil {
Packit 63bb0d
			return err
Packit 63bb0d
		}
Packit 63bb0d
	}
Packit 63bb0d
	delete(q.dependants, id)
Packit 63bb0d
Packit 63bb0d
	return nil
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
Packit 63bb0d
	q.mu.Lock()
Packit 63bb0d
	defer q.mu.Unlock()
Packit 63bb0d
Packit 63bb0d
	j, err := q.readJob(id)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return err
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	if !j.FinishedAt.IsZero() {
Packit 63bb0d
		return nil
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	j.Canceled = true
Packit 63bb0d
Packit 63bb0d
	err = q.db.Write(id.String(), j)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return fmt.Errorf("error writing job %s: %v", id, err)
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	return nil
Packit 63bb0d
}
Packit 63bb0d
Packit Service 509fd4
func (q *fsJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
Packit 63bb0d
	j, err := q.readJob(id)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return
Packit 63bb0d
	}
Packit 63bb0d
Packit Service 509fd4
	result = j.Result
Packit 63bb0d
	queued = j.QueuedAt
Packit 63bb0d
	started = j.StartedAt
Packit 63bb0d
	finished = j.FinishedAt
Packit 63bb0d
	canceled = j.Canceled
Packit Service 509fd4
	deps = j.Dependencies
Packit 63bb0d
Packit 63bb0d
	return
Packit 63bb0d
}
Packit 63bb0d
Packit Service 3a6627
func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) {
Packit Service 3a6627
	j, err := q.readJob(id)
Packit Service 3a6627
	if err != nil {
Packit Service 3a6627
		return
Packit Service 3a6627
	}
Packit Service 3a6627
Packit Service 3a6627
	jobType = j.Type
Packit Service 3a6627
	args = j.Args
Packit Service 3a6627
	dependencies = j.Dependencies
Packit Service 3a6627
Packit Service 3a6627
	return
Packit Service 3a6627
}
Packit Service 3a6627
Packit 63bb0d
// Reads job with `id`. This is a thin wrapper around `q.db.Read`, which
Packit 63bb0d
// returns the job directly, or and error if a job with `id` does not exist.
Packit 63bb0d
func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {
Packit 63bb0d
	var j job
Packit 63bb0d
	exists, err := q.db.Read(id.String(), &j)
Packit 63bb0d
	if err != nil {
Packit 63bb0d
		return nil, fmt.Errorf("error reading job '%s': %v", id, err)
Packit 63bb0d
	}
Packit 63bb0d
	if !exists {
Packit 63bb0d
		// return corrupt database?
Packit 63bb0d
		return nil, jobqueue.ErrNotExist
Packit 63bb0d
	}
Packit 63bb0d
	return &j, nil
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// Enqueue `job` if it is pending and all its dependencies have finished.
Packit 63bb0d
// Update `q.dependants` if the job was not queued and updateDependants is true
Packit 63bb0d
// (i.e., when this is a new job).
Packit Service 509fd4
// `q.mu` must be locked when this method is called. The only exception is
Packit Service 509fd4
// `New()` because no concurrent calls are possible there.
Packit 63bb0d
func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
Packit 63bb0d
	if !j.StartedAt.IsZero() {
Packit 63bb0d
		return nil
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	depsFinished := true
Packit 63bb0d
	for _, id := range j.Dependencies {
Packit 63bb0d
		j, err := q.readJob(id)
Packit 63bb0d
		if err != nil {
Packit 63bb0d
			return err
Packit 63bb0d
		}
Packit 63bb0d
		if j.FinishedAt.IsZero() {
Packit 63bb0d
			depsFinished = false
Packit 63bb0d
			break
Packit 63bb0d
		}
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	if depsFinished {
Packit 63bb0d
		c, exists := q.pending[j.Type]
Packit 63bb0d
		if !exists {
Packit Service 509fd4
			c = make(chan uuid.UUID, channelSize)
Packit Service 509fd4
			q.pending[j.Type] = c
Packit 63bb0d
		}
Packit 63bb0d
		c <- j.Id
Packit 63bb0d
	} else if updateDependants {
Packit 63bb0d
		for _, id := range j.Dependencies {
Packit 63bb0d
			q.dependants[id] = append(q.dependants[id], j.Id)
Packit 63bb0d
		}
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	return nil
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// Select on a list of `chan uuid.UUID`s. Returns an error if one of the
Packit 63bb0d
// channels is closed.
Packit 63bb0d
//
Packit 63bb0d
// Uses reflect.Select(), because the `select` statement cannot operate on an
Packit 63bb0d
// unknown amount of channels.
Packit 63bb0d
func selectUUIDChannel(ctx context.Context, chans []chan uuid.UUID) (uuid.UUID, error) {
Packit 63bb0d
	cases := []reflect.SelectCase{
Packit 63bb0d
		{
Packit 63bb0d
			Dir:  reflect.SelectRecv,
Packit 63bb0d
			Chan: reflect.ValueOf(ctx.Done()),
Packit 63bb0d
		},
Packit 63bb0d
	}
Packit 63bb0d
	for _, c := range chans {
Packit 63bb0d
		cases = append(cases, reflect.SelectCase{
Packit 63bb0d
			Dir:  reflect.SelectRecv,
Packit 63bb0d
			Chan: reflect.ValueOf(c),
Packit 63bb0d
		})
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	chosen, value, recvOK := reflect.Select(cases)
Packit 63bb0d
	if !recvOK {
Packit 63bb0d
		if chosen == 0 {
Packit 63bb0d
			return uuid.Nil, ctx.Err()
Packit 63bb0d
		} else {
Packit 63bb0d
			return uuid.Nil, errors.New("channel was closed unexpectedly")
Packit 63bb0d
		}
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	return value.Interface().(uuid.UUID), nil
Packit 63bb0d
}