Blame internal/jobqueue/testjobqueue/testjobqueue.go

Packit Service 4d2de5
// Package testjobqueue implements jobqueue interface. It is meant for testing,
Packit Service 4d2de5
// and as such doesn't implement two invariants of jobqueue: it is not safe for
Packit Service 4d2de5
// concurrent access and `Dequeue()` doesn't wait for new jobs to appear.
Packit Service 4d2de5
package testjobqueue
Packit Service 4d2de5
Packit Service 4d2de5
import (
Packit Service 4d2de5
	"context"
Packit Service 4d2de5
	"encoding/json"
Packit Service 4d2de5
	"errors"
Packit Service 4d2de5
	"fmt"
Packit Service 4d2de5
	"sort"
Packit Service 4d2de5
	"time"
Packit Service 4d2de5
Packit Service 4d2de5
	"github.com/google/uuid"
Packit Service 4d2de5
	"github.com/osbuild/osbuild-composer/internal/jobqueue"
Packit Service 4d2de5
)
Packit Service 4d2de5
Packit Service 4d2de5
type testJobQueue struct {
Packit Service 4d2de5
	jobs map[uuid.UUID]*job
Packit Service 4d2de5
Packit Service 4d2de5
	pending map[string][]uuid.UUID
Packit Service 4d2de5
Packit Service 4d2de5
	// Maps job ids to the jobs that depend on it
Packit Service 4d2de5
	dependants map[uuid.UUID][]uuid.UUID
Packit Service 4d2de5
}
Packit Service 4d2de5
Packit Service 4d2de5
type job struct {
Packit Service 4d2de5
	Id           uuid.UUID
Packit Service 4d2de5
	Type         string
Packit Service 4d2de5
	Args         json.RawMessage
Packit Service 4d2de5
	Dependencies []uuid.UUID
Packit Service 4d2de5
	Result       json.RawMessage
Packit Service 4d2de5
	QueuedAt     time.Time
Packit Service 4d2de5
	StartedAt    time.Time
Packit Service 4d2de5
	FinishedAt   time.Time
Packit Service 4d2de5
	Canceled     bool
Packit Service 4d2de5
}
Packit Service 4d2de5
Packit Service 4d2de5
func New() *testJobQueue {
Packit Service 4d2de5
	return &testJobQueue{
Packit Service 4d2de5
		jobs:    make(map[uuid.UUID]*job),
Packit Service 4d2de5
		pending: make(map[string][]uuid.UUID),
Packit Service 4d2de5
	}
Packit Service 4d2de5
}
Packit Service 4d2de5
Packit Service 4d2de5
func (q *testJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
Packit Service 4d2de5
	var j = job{
Packit Service 4d2de5
		Id:           uuid.New(),
Packit Service 4d2de5
		Type:         jobType,
Packit Service 4d2de5
		Dependencies: uniqueUUIDList(dependencies),
Packit Service 4d2de5
		QueuedAt:     time.Now(),
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	var err error
Packit Service 4d2de5
	j.Args, err = json.Marshal(args)
Packit Service 4d2de5
	if err != nil {
Packit Service 4d2de5
		return uuid.Nil, err
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	q.jobs[j.Id] = &j
Packit Service 4d2de5
Packit Service 4d2de5
	// Verify dependencies and check how many of them are already finished.
Packit Service 4d2de5
	finished, err := q.countFinishedJobs(j.Dependencies)
Packit Service 4d2de5
	if err != nil {
Packit Service 4d2de5
		return uuid.Nil, err
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	// If all dependencies have finished, or there are none, queue the job.
Packit Service 4d2de5
	// Otherwise, update dependants so that this check is done again when
Packit Service 4d2de5
	// FinishJob() is called for a dependency.
Packit Service 4d2de5
	if finished == len(j.Dependencies) {
Packit Service 4d2de5
		q.pending[j.Type] = append(q.pending[j.Type], j.Id)
Packit Service 4d2de5
	} else {
Packit Service 4d2de5
		for _, id := range j.Dependencies {
Packit Service 4d2de5
			q.dependants[id] = append(q.dependants[id], j.Id)
Packit Service 4d2de5
		}
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	return j.Id, nil
Packit Service 4d2de5
}
Packit Service 4d2de5
Packit Service 4d2de5
func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interface{}) (uuid.UUID, error) {
Packit Service 4d2de5
	for _, t := range jobTypes {
Packit Service 4d2de5
		if len(q.pending[t]) == 0 {
Packit Service 4d2de5
			continue
Packit Service 4d2de5
		}
Packit Service 4d2de5
Packit Service 4d2de5
		id := q.pending[t][0]
Packit Service 4d2de5
		q.pending[t] = q.pending[t][1:]
Packit Service 4d2de5
Packit Service 4d2de5
		j := q.jobs[id]
Packit Service 4d2de5
Packit Service 4d2de5
		err := json.Unmarshal(j.Args, args)
Packit Service 4d2de5
		if err != nil {
Packit Service 4d2de5
			return uuid.Nil, err
Packit Service 4d2de5
		}
Packit Service 4d2de5
Packit Service 4d2de5
		j.StartedAt = time.Now()
Packit Service 4d2de5
		return j.Id, nil
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	return uuid.Nil, errors.New("no job available")
Packit Service 4d2de5
}
Packit Service 4d2de5
Packit Service 4d2de5
func (q *testJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
Packit Service 4d2de5
	j, exists := q.jobs[id]
Packit Service 4d2de5
	if !exists {
Packit Service 4d2de5
		return jobqueue.ErrNotExist
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
Packit Service 4d2de5
		return jobqueue.ErrNotRunning
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	var err error
Packit Service 4d2de5
	j.Result, err = json.Marshal(result)
Packit Service 4d2de5
	if err != nil {
Packit Service 4d2de5
		return fmt.Errorf("error marshaling result: %v", err)
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	j.FinishedAt = time.Now()
Packit Service 4d2de5
Packit Service 4d2de5
	for _, depid := range q.dependants[id] {
Packit Service 4d2de5
		dep := q.jobs[depid]
Packit Service 4d2de5
		n, err := q.countFinishedJobs(dep.Dependencies)
Packit Service 4d2de5
		if err != nil {
Packit Service 4d2de5
			return err
Packit Service 4d2de5
		}
Packit Service 4d2de5
		if n == len(dep.Dependencies) {
Packit Service 4d2de5
			q.pending[dep.Type] = append(q.pending[dep.Type], dep.Id)
Packit Service 4d2de5
		}
Packit Service 4d2de5
	}
Packit Service 4d2de5
	delete(q.dependants, id)
Packit Service 4d2de5
Packit Service 4d2de5
	return nil
Packit Service 4d2de5
}
Packit Service 4d2de5
Packit Service 4d2de5
func (q *testJobQueue) CancelJob(id uuid.UUID) error {
Packit Service 4d2de5
	j, exists := q.jobs[id]
Packit Service 4d2de5
	if !exists {
Packit Service 4d2de5
		return jobqueue.ErrNotExist
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	j.Canceled = true
Packit Service 4d2de5
Packit Service 4d2de5
	return nil
Packit Service 4d2de5
}
Packit Service 4d2de5
Packit Service 4d2de5
func (q *testJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, canceled bool, err error) {
Packit Service 4d2de5
	j, exists := q.jobs[id]
Packit Service 4d2de5
	if !exists {
Packit Service 4d2de5
		err = jobqueue.ErrNotExist
Packit Service 4d2de5
		return
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	if !j.FinishedAt.IsZero() {
Packit Service 4d2de5
		err = json.Unmarshal(j.Result, result)
Packit Service 4d2de5
		if err != nil {
Packit Service 4d2de5
			return
Packit Service 4d2de5
		}
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	queued = j.QueuedAt
Packit Service 4d2de5
	started = j.StartedAt
Packit Service 4d2de5
	finished = j.FinishedAt
Packit Service 4d2de5
	canceled = j.Canceled
Packit Service 4d2de5
Packit Service 4d2de5
	return
Packit Service 4d2de5
}
Packit Service 4d2de5
Packit Service 4d2de5
// Returns the number of finished jobs in `ids`.
Packit Service 4d2de5
func (q *testJobQueue) countFinishedJobs(ids []uuid.UUID) (int, error) {
Packit Service 4d2de5
	n := 0
Packit Service 4d2de5
	for _, id := range ids {
Packit Service 4d2de5
		j, exists := q.jobs[id]
Packit Service 4d2de5
		if !exists {
Packit Service 4d2de5
			return 0, jobqueue.ErrNotExist
Packit Service 4d2de5
		}
Packit Service 4d2de5
		if j.Result != nil {
Packit Service 4d2de5
			n += 1
Packit Service 4d2de5
		}
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	return n, nil
Packit Service 4d2de5
}
Packit Service 4d2de5
Packit Service 4d2de5
// Sorts and removes duplicates from `ids`.
Packit Service 4d2de5
// Copied from fsjobqueue, which also contains a test.
Packit Service 4d2de5
func uniqueUUIDList(ids []uuid.UUID) []uuid.UUID {
Packit Service 4d2de5
	s := map[uuid.UUID]bool{}
Packit Service 4d2de5
	for _, id := range ids {
Packit Service 4d2de5
		s[id] = true
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	l := []uuid.UUID{}
Packit Service 4d2de5
	for id := range s {
Packit Service 4d2de5
		l = append(l, id)
Packit Service 4d2de5
	}
Packit Service 4d2de5
Packit Service 4d2de5
	sort.Slice(l, func(i, j int) bool {
Packit Service 4d2de5
		for b := 0; b < 16; b++ {
Packit Service 4d2de5
			if l[i][b] < l[j][b] {
Packit Service 4d2de5
				return true
Packit Service 4d2de5
			}
Packit Service 4d2de5
		}
Packit Service 4d2de5
		return false
Packit Service 4d2de5
	})
Packit Service 4d2de5
Packit Service 4d2de5
	return l
Packit Service 4d2de5
}