|
Packit Service |
4d2de5 |
// Package testjobqueue implements jobqueue interface. It is meant for testing,
|
|
Packit Service |
4d2de5 |
// and as such doesn't implement two invariants of jobqueue: it is not safe for
|
|
Packit Service |
4d2de5 |
// concurrent access and `Dequeue()` doesn't wait for new jobs to appear.
|
|
Packit Service |
4d2de5 |
package testjobqueue
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
import (
|
|
Packit Service |
4d2de5 |
"context"
|
|
Packit Service |
4d2de5 |
"encoding/json"
|
|
Packit Service |
4d2de5 |
"errors"
|
|
Packit Service |
4d2de5 |
"fmt"
|
|
Packit Service |
4d2de5 |
"sort"
|
|
Packit Service |
4d2de5 |
"time"
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
"github.com/google/uuid"
|
|
Packit Service |
4d2de5 |
"github.com/osbuild/osbuild-composer/internal/jobqueue"
|
|
Packit Service |
4d2de5 |
)
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
type testJobQueue struct {
|
|
Packit Service |
4d2de5 |
jobs map[uuid.UUID]*job
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
pending map[string][]uuid.UUID
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
// Maps job ids to the jobs that depend on it
|
|
Packit Service |
4d2de5 |
dependants map[uuid.UUID][]uuid.UUID
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
type job struct {
|
|
Packit Service |
4d2de5 |
Id uuid.UUID
|
|
Packit Service |
4d2de5 |
Type string
|
|
Packit Service |
4d2de5 |
Args json.RawMessage
|
|
Packit Service |
4d2de5 |
Dependencies []uuid.UUID
|
|
Packit Service |
4d2de5 |
Result json.RawMessage
|
|
Packit Service |
4d2de5 |
QueuedAt time.Time
|
|
Packit Service |
4d2de5 |
StartedAt time.Time
|
|
Packit Service |
4d2de5 |
FinishedAt time.Time
|
|
Packit Service |
4d2de5 |
Canceled bool
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
func New() *testJobQueue {
|
|
Packit Service |
4d2de5 |
return &testJobQueue{
|
|
Packit Service |
4d2de5 |
jobs: make(map[uuid.UUID]*job),
|
|
Packit Service |
4d2de5 |
pending: make(map[string][]uuid.UUID),
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
func (q *testJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
|
|
Packit Service |
4d2de5 |
var j = job{
|
|
Packit Service |
4d2de5 |
Id: uuid.New(),
|
|
Packit Service |
4d2de5 |
Type: jobType,
|
|
Packit Service |
4d2de5 |
Dependencies: uniqueUUIDList(dependencies),
|
|
Packit Service |
4d2de5 |
QueuedAt: time.Now(),
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
var err error
|
|
Packit Service |
4d2de5 |
j.Args, err = json.Marshal(args)
|
|
Packit Service |
4d2de5 |
if err != nil {
|
|
Packit Service |
4d2de5 |
return uuid.Nil, err
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
q.jobs[j.Id] = &j
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
// Verify dependencies and check how many of them are already finished.
|
|
Packit Service |
4d2de5 |
finished, err := q.countFinishedJobs(j.Dependencies)
|
|
Packit Service |
4d2de5 |
if err != nil {
|
|
Packit Service |
4d2de5 |
return uuid.Nil, err
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
// If all dependencies have finished, or there are none, queue the job.
|
|
Packit Service |
4d2de5 |
// Otherwise, update dependants so that this check is done again when
|
|
Packit Service |
4d2de5 |
// FinishJob() is called for a dependency.
|
|
Packit Service |
4d2de5 |
if finished == len(j.Dependencies) {
|
|
Packit Service |
4d2de5 |
q.pending[j.Type] = append(q.pending[j.Type], j.Id)
|
|
Packit Service |
4d2de5 |
} else {
|
|
Packit Service |
4d2de5 |
for _, id := range j.Dependencies {
|
|
Packit Service |
4d2de5 |
q.dependants[id] = append(q.dependants[id], j.Id)
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
return j.Id, nil
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interface{}) (uuid.UUID, error) {
|
|
Packit Service |
4d2de5 |
for _, t := range jobTypes {
|
|
Packit Service |
4d2de5 |
if len(q.pending[t]) == 0 {
|
|
Packit Service |
4d2de5 |
continue
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
id := q.pending[t][0]
|
|
Packit Service |
4d2de5 |
q.pending[t] = q.pending[t][1:]
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
j := q.jobs[id]
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
err := json.Unmarshal(j.Args, args)
|
|
Packit Service |
4d2de5 |
if err != nil {
|
|
Packit Service |
4d2de5 |
return uuid.Nil, err
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
j.StartedAt = time.Now()
|
|
Packit Service |
4d2de5 |
return j.Id, nil
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
return uuid.Nil, errors.New("no job available")
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
func (q *testJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
|
|
Packit Service |
4d2de5 |
j, exists := q.jobs[id]
|
|
Packit Service |
4d2de5 |
if !exists {
|
|
Packit Service |
4d2de5 |
return jobqueue.ErrNotExist
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
|
|
Packit Service |
4d2de5 |
return jobqueue.ErrNotRunning
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
var err error
|
|
Packit Service |
4d2de5 |
j.Result, err = json.Marshal(result)
|
|
Packit Service |
4d2de5 |
if err != nil {
|
|
Packit Service |
4d2de5 |
return fmt.Errorf("error marshaling result: %v", err)
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
j.FinishedAt = time.Now()
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
for _, depid := range q.dependants[id] {
|
|
Packit Service |
4d2de5 |
dep := q.jobs[depid]
|
|
Packit Service |
4d2de5 |
n, err := q.countFinishedJobs(dep.Dependencies)
|
|
Packit Service |
4d2de5 |
if err != nil {
|
|
Packit Service |
4d2de5 |
return err
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
if n == len(dep.Dependencies) {
|
|
Packit Service |
4d2de5 |
q.pending[dep.Type] = append(q.pending[dep.Type], dep.Id)
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
delete(q.dependants, id)
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
return nil
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
func (q *testJobQueue) CancelJob(id uuid.UUID) error {
|
|
Packit Service |
4d2de5 |
j, exists := q.jobs[id]
|
|
Packit Service |
4d2de5 |
if !exists {
|
|
Packit Service |
4d2de5 |
return jobqueue.ErrNotExist
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
j.Canceled = true
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
return nil
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
func (q *testJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, canceled bool, err error) {
|
|
Packit Service |
4d2de5 |
j, exists := q.jobs[id]
|
|
Packit Service |
4d2de5 |
if !exists {
|
|
Packit Service |
4d2de5 |
err = jobqueue.ErrNotExist
|
|
Packit Service |
4d2de5 |
return
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
if !j.FinishedAt.IsZero() {
|
|
Packit Service |
4d2de5 |
err = json.Unmarshal(j.Result, result)
|
|
Packit Service |
4d2de5 |
if err != nil {
|
|
Packit Service |
4d2de5 |
return
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
queued = j.QueuedAt
|
|
Packit Service |
4d2de5 |
started = j.StartedAt
|
|
Packit Service |
4d2de5 |
finished = j.FinishedAt
|
|
Packit Service |
4d2de5 |
canceled = j.Canceled
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
return
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
// Returns the number of finished jobs in `ids`.
|
|
Packit Service |
4d2de5 |
func (q *testJobQueue) countFinishedJobs(ids []uuid.UUID) (int, error) {
|
|
Packit Service |
4d2de5 |
n := 0
|
|
Packit Service |
4d2de5 |
for _, id := range ids {
|
|
Packit Service |
4d2de5 |
j, exists := q.jobs[id]
|
|
Packit Service |
4d2de5 |
if !exists {
|
|
Packit Service |
4d2de5 |
return 0, jobqueue.ErrNotExist
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
if j.Result != nil {
|
|
Packit Service |
4d2de5 |
n += 1
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
return n, nil
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
// Sorts and removes duplicates from `ids`.
|
|
Packit Service |
4d2de5 |
// Copied from fsjobqueue, which also contains a test.
|
|
Packit Service |
4d2de5 |
func uniqueUUIDList(ids []uuid.UUID) []uuid.UUID {
|
|
Packit Service |
4d2de5 |
s := map[uuid.UUID]bool{}
|
|
Packit Service |
4d2de5 |
for _, id := range ids {
|
|
Packit Service |
4d2de5 |
s[id] = true
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
l := []uuid.UUID{}
|
|
Packit Service |
4d2de5 |
for id := range s {
|
|
Packit Service |
4d2de5 |
l = append(l, id)
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
sort.Slice(l, func(i, j int) bool {
|
|
Packit Service |
4d2de5 |
for b := 0; b < 16; b++ {
|
|
Packit Service |
4d2de5 |
if l[i][b] < l[j][b] {
|
|
Packit Service |
4d2de5 |
return true
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
}
|
|
Packit Service |
4d2de5 |
return false
|
|
Packit Service |
4d2de5 |
})
|
|
Packit Service |
4d2de5 |
|
|
Packit Service |
4d2de5 |
return l
|
|
Packit Service |
4d2de5 |
}
|