Blob Blame History Raw
// Package testjobqueue implements jobqueue interface. It is meant for testing,
// and as such doesn't implement two invariants of jobqueue: it is not safe for
// concurrent access and `Dequeue()` doesn't wait for new jobs to appear.
package testjobqueue

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"time"

	"github.com/google/uuid"
	"github.com/osbuild/osbuild-composer/internal/jobqueue"
)

type testJobQueue struct {
	jobs map[uuid.UUID]*job

	pending map[string][]uuid.UUID

	// Maps job ids to the jobs that depend on it
	dependants map[uuid.UUID][]uuid.UUID
}

type job struct {
	Id           uuid.UUID
	Type         string
	Args         json.RawMessage
	Dependencies []uuid.UUID
	Result       json.RawMessage
	QueuedAt     time.Time
	StartedAt    time.Time
	FinishedAt   time.Time
	Canceled     bool
}

func New() *testJobQueue {
	return &testJobQueue{
		jobs:       make(map[uuid.UUID]*job),
		pending:    make(map[string][]uuid.UUID),
		dependants: make(map[uuid.UUID][]uuid.UUID),
	}
}

func (q *testJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
	var j = job{
		Id:           uuid.New(),
		Type:         jobType,
		Dependencies: dependencies,
		QueuedAt:     time.Now(),
	}

	var err error
	j.Args, err = json.Marshal(args)
	if err != nil {
		return uuid.Nil, err
	}

	q.jobs[j.Id] = &j

	// Verify dependencies and check how many of them are already finished.
	finished, err := q.countFinishedJobs(j.Dependencies)
	if err != nil {
		return uuid.Nil, err
	}

	// If all dependencies have finished, or there are none, queue the job.
	// Otherwise, update dependants so that this check is done again when
	// FinishJob() is called for a dependency.
	if finished == len(j.Dependencies) {
		q.pending[j.Type] = append(q.pending[j.Type], j.Id)
	} else {
		for _, id := range j.Dependencies {
			q.dependants[id] = append(q.dependants[id], j.Id)
		}
	}

	return j.Id, nil
}

func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
	for _, t := range jobTypes {
		if len(q.pending[t]) == 0 {
			continue
		}

		id := q.pending[t][0]
		q.pending[t] = q.pending[t][1:]

		j := q.jobs[id]

		j.StartedAt = time.Now()
		return j.Id, j.Dependencies, j.Type, j.Args, nil
	}

	return uuid.Nil, nil, "", nil, errors.New("no job available")
}

func (q *testJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
	j, exists := q.jobs[id]
	if !exists {
		return jobqueue.ErrNotExist
	}

	if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
		return jobqueue.ErrNotRunning
	}

	var err error
	j.Result, err = json.Marshal(result)
	if err != nil {
		return fmt.Errorf("error marshaling result: %v", err)
	}

	j.FinishedAt = time.Now()

	for _, depid := range q.dependants[id] {
		dep := q.jobs[depid]
		n, err := q.countFinishedJobs(dep.Dependencies)
		if err != nil {
			return err
		}
		if n == len(dep.Dependencies) {
			q.pending[dep.Type] = append(q.pending[dep.Type], dep.Id)
		}
	}
	delete(q.dependants, id)

	return nil
}

func (q *testJobQueue) CancelJob(id uuid.UUID) error {
	j, exists := q.jobs[id]
	if !exists {
		return jobqueue.ErrNotExist
	}

	j.Canceled = true

	return nil
}

func (q *testJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
	j, exists := q.jobs[id]
	if !exists {
		err = jobqueue.ErrNotExist
		return
	}

	result = j.Result
	queued = j.QueuedAt
	started = j.StartedAt
	finished = j.FinishedAt
	canceled = j.Canceled
	deps = j.Dependencies

	return
}

// Returns the number of finished jobs in `ids`.
func (q *testJobQueue) countFinishedJobs(ids []uuid.UUID) (int, error) {
	n := 0
	for _, id := range ids {
		j, exists := q.jobs[id]
		if !exists {
			return 0, jobqueue.ErrNotExist
		}
		if j.Result != nil {
			n += 1
		}
	}

	return n, nil
}