Blame internal/jobqueue/fsjobqueue/fsjobqueue_test.go

Packit 63bb0d
package fsjobqueue_test
Packit 63bb0d
Packit 63bb0d
import (
Packit 63bb0d
	"context"
Packit 63bb0d
	"encoding/json"
Packit 63bb0d
	"io/ioutil"
Packit 63bb0d
	"os"
Packit 63bb0d
	"testing"
Packit 63bb0d
	"time"
Packit 63bb0d
Packit 63bb0d
	"github.com/google/uuid"
Packit 63bb0d
	"github.com/stretchr/testify/require"
Packit 63bb0d
Packit 63bb0d
	"github.com/osbuild/osbuild-composer/internal/jobqueue"
Packit 63bb0d
	"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
Packit 63bb0d
)
Packit 63bb0d
Packit 63bb0d
type testResult struct {
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func cleanupTempDir(t *testing.T, dir string) {
Packit 63bb0d
	err := os.RemoveAll(dir)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
}
Packit 63bb0d
Packit Service 509fd4
func newTemporaryQueue(t *testing.T) (jobqueue.JobQueue, string) {
Packit 63bb0d
	dir, err := ioutil.TempDir("", "jobqueue-test-")
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
Packit Service 509fd4
	q, err := fsjobqueue.New(dir)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.NotNil(t, q)
Packit 63bb0d
Packit 63bb0d
	return q, dir
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID {
Packit 63bb0d
	t.Helper()
Packit 63bb0d
	id, err := q.Enqueue(jobType, args, dependencies)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.NotEmpty(t, id)
Packit 63bb0d
	return id
Packit 63bb0d
}
Packit 63bb0d
Packit Service 509fd4
func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}, deps []uuid.UUID) uuid.UUID {
Packit Service 509fd4
	id, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.NotEmpty(t, id)
Packit Service 509fd4
	require.ElementsMatch(t, deps, d)
Packit Service 509fd4
	require.Equal(t, jobType, typ)
Packit Service 509fd4
	require.NotNil(t, args)
Packit 63bb0d
Packit 63bb0d
	err = q.FinishJob(id, result)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
Packit 63bb0d
	return id
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestNonExistant(t *testing.T) {
Packit Service 509fd4
	q, err := fsjobqueue.New("/non-existant-directory")
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
	require.Nil(t, q)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestErrors(t *testing.T) {
Packit Service 509fd4
	q, dir := newTemporaryQueue(t)
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	// not serializable to JSON
Packit 63bb0d
	id, err := q.Enqueue("test", make(chan string), nil)
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
	require.Equal(t, uuid.Nil, id)
Packit 63bb0d
Packit 63bb0d
	// invalid dependency
Packit 63bb0d
	id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()})
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
	require.Equal(t, uuid.Nil, id)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestArgs(t *testing.T) {
Packit 63bb0d
	type argument struct {
Packit 63bb0d
		I int
Packit 63bb0d
		S string
Packit 63bb0d
	}
Packit 63bb0d
Packit Service 509fd4
	q, dir := newTemporaryQueue(t)
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	oneargs := argument{7, "🐠"}
Packit 63bb0d
	one := pushTestJob(t, q, "fish", oneargs, nil)
Packit 63bb0d
Packit 63bb0d
	twoargs := argument{42, "🐙"}
Packit 63bb0d
	two := pushTestJob(t, q, "octopus", twoargs, nil)
Packit 63bb0d
Packit Service 509fd4
	var parsedArgs argument
Packit Service 509fd4
Packit Service 509fd4
	id, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.Equal(t, two, id)
Packit Service 509fd4
	require.Empty(t, deps)
Packit Service 509fd4
	require.Equal(t, "octopus", typ)
Packit Service 509fd4
	err = json.Unmarshal(args, &parsedArgs)
Packit Service 509fd4
	require.NoError(t, err)
Packit Service 509fd4
	require.Equal(t, twoargs, parsedArgs)
Packit 63bb0d
Packit Service 3a6627
	// Read job params after Dequeue
Packit Service 3a6627
	jtype, jargs, jdeps, err := q.Job(id)
Packit Service 3a6627
	require.NoError(t, err)
Packit Service 3a6627
	require.Equal(t, args, jargs)
Packit Service 3a6627
	require.Equal(t, deps, jdeps)
Packit Service 3a6627
	require.Equal(t, typ, jtype)
Packit Service 3a6627
Packit Service 509fd4
	id, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.Equal(t, one, id)
Packit Service 509fd4
	require.Empty(t, deps)
Packit Service 509fd4
	require.Equal(t, "fish", typ)
Packit Service 509fd4
	err = json.Unmarshal(args, &parsedArgs)
Packit Service 509fd4
	require.NoError(t, err)
Packit Service 509fd4
	require.Equal(t, oneargs, parsedArgs)
Packit Service 3a6627
Packit Service 3a6627
	jtype, jargs, jdeps, err = q.Job(id)
Packit Service 3a6627
	require.NoError(t, err)
Packit Service 3a6627
	require.Equal(t, args, jargs)
Packit Service 3a6627
	require.Equal(t, deps, jdeps)
Packit Service 3a6627
	require.Equal(t, typ, jtype)
Packit Service 3a6627
Packit Service 3a6627
	_, _, _, err = q.Job(uuid.New())
Packit Service 3a6627
	require.Error(t, err)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestJobTypes(t *testing.T) {
Packit Service 509fd4
	q, dir := newTemporaryQueue(t)
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	one := pushTestJob(t, q, "octopus", nil, nil)
Packit 63bb0d
	two := pushTestJob(t, q, "clownfish", nil, nil)
Packit 63bb0d
Packit Service 509fd4
	require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{}, nil))
Packit Service 509fd4
	require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil))
Packit 63bb0d
Packit 63bb0d
	ctx, cancel := context.WithCancel(context.Background())
Packit 63bb0d
	cancel()
Packit Service 509fd4
	id, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"})
Packit 63bb0d
	require.Equal(t, err, context.Canceled)
Packit 63bb0d
	require.Equal(t, uuid.Nil, id)
Packit Service 509fd4
	require.Empty(t, deps)
Packit Service 509fd4
	require.Equal(t, "", typ)
Packit Service 509fd4
	require.Nil(t, args)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestDependencies(t *testing.T) {
Packit Service 509fd4
	q, dir := newTemporaryQueue(t)
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	t.Run("done-before-pushing-dependant", func(t *testing.T) {
Packit 63bb0d
		one := pushTestJob(t, q, "test", nil, nil)
Packit 63bb0d
		two := pushTestJob(t, q, "test", nil, nil)
Packit 63bb0d
Packit 63bb0d
		r := []uuid.UUID{}
Packit Service 509fd4
		r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
Packit Service 509fd4
		r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
Packit 63bb0d
		require.ElementsMatch(t, []uuid.UUID{one, two}, r)
Packit 63bb0d
Packit 63bb0d
		j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
Packit Service 509fd4
		_, queued, started, finished, canceled, deps, err := q.JobStatus(j)
Packit 63bb0d
		require.NoError(t, err)
Packit 63bb0d
		require.True(t, !queued.IsZero())
Packit 63bb0d
		require.True(t, started.IsZero())
Packit 63bb0d
		require.True(t, finished.IsZero())
Packit 63bb0d
		require.False(t, canceled)
Packit Service 509fd4
		require.ElementsMatch(t, deps, []uuid.UUID{one, two})
Packit 63bb0d
Packit Service 509fd4
		require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
Packit 63bb0d
Packit Service 509fd4
		result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
Packit 63bb0d
		require.NoError(t, err)
Packit 63bb0d
		require.True(t, !queued.IsZero())
Packit 63bb0d
		require.True(t, !started.IsZero())
Packit 63bb0d
		require.True(t, !finished.IsZero())
Packit 63bb0d
		require.False(t, canceled)
Packit Service 509fd4
		require.ElementsMatch(t, deps, []uuid.UUID{one, two})
Packit Service 509fd4
Packit Service 509fd4
		err = json.Unmarshal(result, &testResult{})
Packit Service 509fd4
		require.NoError(t, err)
Packit 63bb0d
	})
Packit 63bb0d
Packit 63bb0d
	t.Run("done-after-pushing-dependant", func(t *testing.T) {
Packit 63bb0d
		one := pushTestJob(t, q, "test", nil, nil)
Packit 63bb0d
		two := pushTestJob(t, q, "test", nil, nil)
Packit 63bb0d
Packit 63bb0d
		j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
Packit Service 509fd4
		_, queued, started, finished, canceled, deps, err := q.JobStatus(j)
Packit 63bb0d
		require.NoError(t, err)
Packit 63bb0d
		require.True(t, !queued.IsZero())
Packit 63bb0d
		require.True(t, started.IsZero())
Packit 63bb0d
		require.True(t, finished.IsZero())
Packit 63bb0d
		require.False(t, canceled)
Packit Service 509fd4
		require.ElementsMatch(t, deps, []uuid.UUID{one, two})
Packit 63bb0d
Packit 63bb0d
		r := []uuid.UUID{}
Packit Service 509fd4
		r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
Packit Service 509fd4
		r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
Packit 63bb0d
		require.ElementsMatch(t, []uuid.UUID{one, two}, r)
Packit 63bb0d
Packit Service 509fd4
		require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
Packit 63bb0d
Packit Service 509fd4
		result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
Packit 63bb0d
		require.NoError(t, err)
Packit 63bb0d
		require.True(t, !queued.IsZero())
Packit 63bb0d
		require.True(t, !started.IsZero())
Packit 63bb0d
		require.True(t, !finished.IsZero())
Packit 63bb0d
		require.False(t, canceled)
Packit Service 509fd4
		require.ElementsMatch(t, deps, []uuid.UUID{one, two})
Packit Service 509fd4
Packit Service 509fd4
		err = json.Unmarshal(result, &testResult{})
Packit Service 509fd4
		require.NoError(t, err)
Packit 63bb0d
	})
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// Test that a job queue allows parallel access to multiple workers, mainly to
Packit 63bb0d
// verify the quirky unlocking in Dequeue().
Packit 63bb0d
func TestMultipleWorkers(t *testing.T) {
Packit Service 509fd4
	q, dir := newTemporaryQueue(t)
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	done := make(chan struct{})
Packit 63bb0d
	go func() {
Packit 63bb0d
		defer close(done)
Packit 63bb0d
		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Packit 63bb0d
		defer cancel()
Packit Service 509fd4
		id, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"})
Packit 63bb0d
		require.NoError(t, err)
Packit 63bb0d
		require.NotEmpty(t, id)
Packit Service 509fd4
		require.Empty(t, deps)
Packit Service 509fd4
		require.Equal(t, "octopus", typ)
Packit Service 509fd4
		require.Equal(t, json.RawMessage("null"), args)
Packit 63bb0d
	}()
Packit 63bb0d
Packit 63bb0d
	// Increase the likelihood that the above goroutine was scheduled and
Packit 63bb0d
	// is waiting in Dequeue().
Packit 63bb0d
	time.Sleep(10 * time.Millisecond)
Packit 63bb0d
Packit 63bb0d
	// This call to Dequeue() should not block on the one in the goroutine.
Packit 63bb0d
	id := pushTestJob(t, q, "clownfish", nil, nil)
Packit Service 509fd4
	r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.Equal(t, id, r)
Packit Service 509fd4
	require.Empty(t, deps)
Packit Service 509fd4
	require.Equal(t, "clownfish", typ)
Packit Service 509fd4
	require.Equal(t, json.RawMessage("null"), args)
Packit 63bb0d
Packit 63bb0d
	// Now wake up the Dequeue() in the goroutine and wait for it to finish.
Packit 63bb0d
	_ = pushTestJob(t, q, "octopus", nil, nil)
Packit 63bb0d
	<-done
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestCancel(t *testing.T) {
Packit Service 509fd4
	q, dir := newTemporaryQueue(t)
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	// Cancel a non-existing job
Packit 63bb0d
	err := q.CancelJob(uuid.New())
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
Packit 63bb0d
	// Cancel a pending job
Packit 63bb0d
	id := pushTestJob(t, q, "clownfish", nil, nil)
Packit 63bb0d
	require.NotEmpty(t, id)
Packit 63bb0d
	err = q.CancelJob(id)
Packit 63bb0d
	require.NoError(t, err)
Packit Service 509fd4
	result, _, _, _, canceled, _, err := q.JobStatus(id)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.True(t, canceled)
Packit Service 509fd4
	require.Nil(t, result)
Packit 63bb0d
	err = q.FinishJob(id, &testResult{})
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
Packit 63bb0d
	// Cancel a running job, which should not dequeue the canceled job from above
Packit 63bb0d
	id = pushTestJob(t, q, "clownfish", nil, nil)
Packit 63bb0d
	require.NotEmpty(t, id)
Packit Service 509fd4
	r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.Equal(t, id, r)
Packit Service 509fd4
	require.Empty(t, deps)
Packit Service 509fd4
	require.Equal(t, "clownfish", typ)
Packit Service 509fd4
	require.Equal(t, json.RawMessage("null"), args)
Packit 63bb0d
	err = q.CancelJob(id)
Packit 63bb0d
	require.NoError(t, err)
Packit Service 509fd4
	result, _, _, _, canceled, _, err = q.JobStatus(id)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.True(t, canceled)
Packit Service 509fd4
	require.Nil(t, result)
Packit 63bb0d
	err = q.FinishJob(id, &testResult{})
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
Packit 63bb0d
	// Cancel a finished job, which is a no-op
Packit 63bb0d
	id = pushTestJob(t, q, "clownfish", nil, nil)
Packit 63bb0d
	require.NotEmpty(t, id)
Packit Service 509fd4
	r, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.Equal(t, id, r)
Packit Service 509fd4
	require.Empty(t, deps)
Packit Service 509fd4
	require.Equal(t, "clownfish", typ)
Packit Service 509fd4
	require.Equal(t, json.RawMessage("null"), args)
Packit 63bb0d
	err = q.FinishJob(id, &testResult{})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	err = q.CancelJob(id)
Packit 63bb0d
	require.NoError(t, err)
Packit Service 509fd4
	result, _, _, _, canceled, _, err = q.JobStatus(id)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.False(t, canceled)
Packit Service 509fd4
	err = json.Unmarshal(result, &testResult{})
Packit Service 509fd4
	require.NoError(t, err)
Packit 63bb0d
}