Blame internal/jobqueue/fsjobqueue/fsjobqueue_test.go

Packit 63bb0d
package fsjobqueue_test
Packit 63bb0d
Packit 63bb0d
import (
Packit 63bb0d
	"context"
Packit 63bb0d
	"encoding/json"
Packit 63bb0d
	"io/ioutil"
Packit 63bb0d
	"os"
Packit 63bb0d
	"testing"
Packit 63bb0d
	"time"
Packit 63bb0d
Packit 63bb0d
	"github.com/google/uuid"
Packit 63bb0d
	"github.com/stretchr/testify/require"
Packit 63bb0d
Packit 63bb0d
	"github.com/osbuild/osbuild-composer/internal/jobqueue"
Packit 63bb0d
	"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
Packit 63bb0d
)
Packit 63bb0d
Packit 63bb0d
type testResult struct {
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func cleanupTempDir(t *testing.T, dir string) {
Packit 63bb0d
	err := os.RemoveAll(dir)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func newTemporaryQueue(t *testing.T, jobTypes []string) (jobqueue.JobQueue, string) {
Packit 63bb0d
	dir, err := ioutil.TempDir("", "jobqueue-test-")
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
Packit 63bb0d
	q, err := fsjobqueue.New(dir, jobTypes)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.NotNil(t, q)
Packit 63bb0d
Packit 63bb0d
	return q, dir
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID {
Packit 63bb0d
	t.Helper()
Packit 63bb0d
	id, err := q.Enqueue(jobType, args, dependencies)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.NotEmpty(t, id)
Packit 63bb0d
	return id
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}) uuid.UUID {
Packit 63bb0d
	id, err := q.Dequeue(context.Background(), []string{jobType}, &json.RawMessage{})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.NotEmpty(t, id)
Packit 63bb0d
Packit 63bb0d
	err = q.FinishJob(id, result)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
Packit 63bb0d
	return id
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestNonExistant(t *testing.T) {
Packit 63bb0d
	q, err := fsjobqueue.New("/non-existant-directory", []string{})
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
	require.Nil(t, q)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestErrors(t *testing.T) {
Packit 63bb0d
	q, dir := newTemporaryQueue(t, []string{"test"})
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	// not serializable to JSON
Packit 63bb0d
	id, err := q.Enqueue("test", make(chan string), nil)
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
	require.Equal(t, uuid.Nil, id)
Packit 63bb0d
Packit 63bb0d
	// invalid dependency
Packit 63bb0d
	id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()})
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
	require.Equal(t, uuid.Nil, id)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestArgs(t *testing.T) {
Packit 63bb0d
	type argument struct {
Packit 63bb0d
		I int
Packit 63bb0d
		S string
Packit 63bb0d
	}
Packit 63bb0d
Packit 63bb0d
	q, dir := newTemporaryQueue(t, []string{"fish", "octopus"})
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	oneargs := argument{7, "🐠"}
Packit 63bb0d
	one := pushTestJob(t, q, "fish", oneargs, nil)
Packit 63bb0d
Packit 63bb0d
	twoargs := argument{42, "🐙"}
Packit 63bb0d
	two := pushTestJob(t, q, "octopus", twoargs, nil)
Packit 63bb0d
Packit 63bb0d
	var args argument
Packit 63bb0d
	id, err := q.Dequeue(context.Background(), []string{"octopus"}, &args)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.Equal(t, two, id)
Packit 63bb0d
	require.Equal(t, twoargs, args)
Packit 63bb0d
Packit 63bb0d
	id, err = q.Dequeue(context.Background(), []string{"fish"}, &args)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.Equal(t, one, id)
Packit 63bb0d
	require.Equal(t, oneargs, args)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestJobTypes(t *testing.T) {
Packit 63bb0d
	q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"})
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	one := pushTestJob(t, q, "octopus", nil, nil)
Packit 63bb0d
	two := pushTestJob(t, q, "clownfish", nil, nil)
Packit 63bb0d
Packit 63bb0d
	require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{}))
Packit 63bb0d
	require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}))
Packit 63bb0d
Packit 63bb0d
	ctx, cancel := context.WithCancel(context.Background())
Packit 63bb0d
	cancel()
Packit 63bb0d
	id, err := q.Dequeue(ctx, []string{"zebra"}, nil)
Packit 63bb0d
	require.Equal(t, err, context.Canceled)
Packit 63bb0d
	require.Equal(t, uuid.Nil, id)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestDependencies(t *testing.T) {
Packit 63bb0d
	q, dir := newTemporaryQueue(t, []string{"test"})
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	t.Run("done-before-pushing-dependant", func(t *testing.T) {
Packit 63bb0d
		one := pushTestJob(t, q, "test", nil, nil)
Packit 63bb0d
		two := pushTestJob(t, q, "test", nil, nil)
Packit 63bb0d
Packit 63bb0d
		r := []uuid.UUID{}
Packit 63bb0d
		r = append(r, finishNextTestJob(t, q, "test", testResult{}))
Packit 63bb0d
		r = append(r, finishNextTestJob(t, q, "test", testResult{}))
Packit 63bb0d
		require.ElementsMatch(t, []uuid.UUID{one, two}, r)
Packit 63bb0d
Packit 63bb0d
		j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
Packit 63bb0d
		queued, started, finished, canceled, err := q.JobStatus(j, nil)
Packit 63bb0d
		require.NoError(t, err)
Packit 63bb0d
		require.True(t, !queued.IsZero())
Packit 63bb0d
		require.True(t, started.IsZero())
Packit 63bb0d
		require.True(t, finished.IsZero())
Packit 63bb0d
		require.False(t, canceled)
Packit 63bb0d
Packit 63bb0d
		require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}))
Packit 63bb0d
Packit 63bb0d
		queued, started, finished, canceled, err = q.JobStatus(j, &testResult{})
Packit 63bb0d
		require.NoError(t, err)
Packit 63bb0d
		require.True(t, !queued.IsZero())
Packit 63bb0d
		require.True(t, !started.IsZero())
Packit 63bb0d
		require.True(t, !finished.IsZero())
Packit 63bb0d
		require.False(t, canceled)
Packit 63bb0d
	})
Packit 63bb0d
Packit 63bb0d
	t.Run("done-after-pushing-dependant", func(t *testing.T) {
Packit 63bb0d
		one := pushTestJob(t, q, "test", nil, nil)
Packit 63bb0d
		two := pushTestJob(t, q, "test", nil, nil)
Packit 63bb0d
Packit 63bb0d
		j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
Packit 63bb0d
		queued, started, finished, canceled, err := q.JobStatus(j, nil)
Packit 63bb0d
		require.NoError(t, err)
Packit 63bb0d
		require.True(t, !queued.IsZero())
Packit 63bb0d
		require.True(t, started.IsZero())
Packit 63bb0d
		require.True(t, finished.IsZero())
Packit 63bb0d
		require.False(t, canceled)
Packit 63bb0d
Packit 63bb0d
		r := []uuid.UUID{}
Packit 63bb0d
		r = append(r, finishNextTestJob(t, q, "test", testResult{}))
Packit 63bb0d
		r = append(r, finishNextTestJob(t, q, "test", testResult{}))
Packit 63bb0d
		require.ElementsMatch(t, []uuid.UUID{one, two}, r)
Packit 63bb0d
Packit 63bb0d
		require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}))
Packit 63bb0d
Packit 63bb0d
		queued, started, finished, canceled, err = q.JobStatus(j, &testResult{})
Packit 63bb0d
		require.NoError(t, err)
Packit 63bb0d
		require.True(t, !queued.IsZero())
Packit 63bb0d
		require.True(t, !started.IsZero())
Packit 63bb0d
		require.True(t, !finished.IsZero())
Packit 63bb0d
		require.False(t, canceled)
Packit 63bb0d
	})
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// Test that a job queue allows parallel access to multiple workers, mainly to
Packit 63bb0d
// verify the quirky unlocking in Dequeue().
Packit 63bb0d
func TestMultipleWorkers(t *testing.T) {
Packit 63bb0d
	q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"})
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	done := make(chan struct{})
Packit 63bb0d
	go func() {
Packit 63bb0d
		defer close(done)
Packit 63bb0d
		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Packit 63bb0d
		defer cancel()
Packit 63bb0d
		id, err := q.Dequeue(ctx, []string{"octopus"}, &json.RawMessage{})
Packit 63bb0d
		require.NoError(t, err)
Packit 63bb0d
		require.NotEmpty(t, id)
Packit 63bb0d
	}()
Packit 63bb0d
Packit 63bb0d
	// Increase the likelihood that the above goroutine was scheduled and
Packit 63bb0d
	// is waiting in Dequeue().
Packit 63bb0d
	time.Sleep(10 * time.Millisecond)
Packit 63bb0d
Packit 63bb0d
	// This call to Dequeue() should not block on the one in the goroutine.
Packit 63bb0d
	id := pushTestJob(t, q, "clownfish", nil, nil)
Packit 63bb0d
	r, err := q.Dequeue(context.Background(), []string{"clownfish"}, &json.RawMessage{})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.Equal(t, id, r)
Packit 63bb0d
Packit 63bb0d
	// Now wake up the Dequeue() in the goroutine and wait for it to finish.
Packit 63bb0d
	_ = pushTestJob(t, q, "octopus", nil, nil)
Packit 63bb0d
	<-done
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func TestCancel(t *testing.T) {
Packit 63bb0d
	q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"})
Packit 63bb0d
	defer cleanupTempDir(t, dir)
Packit 63bb0d
Packit 63bb0d
	// Cancel a non-existing job
Packit 63bb0d
	err := q.CancelJob(uuid.New())
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
Packit 63bb0d
	// Cancel a pending job
Packit 63bb0d
	id := pushTestJob(t, q, "clownfish", nil, nil)
Packit 63bb0d
	require.NotEmpty(t, id)
Packit 63bb0d
	err = q.CancelJob(id)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	_, _, _, canceled, err := q.JobStatus(id, &testResult{})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.True(t, canceled)
Packit 63bb0d
	err = q.FinishJob(id, &testResult{})
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
Packit 63bb0d
	// Cancel a running job, which should not dequeue the canceled job from above
Packit 63bb0d
	id = pushTestJob(t, q, "clownfish", nil, nil)
Packit 63bb0d
	require.NotEmpty(t, id)
Packit 63bb0d
	r, err := q.Dequeue(context.Background(), []string{"clownfish"}, &json.RawMessage{})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.Equal(t, id, r)
Packit 63bb0d
	err = q.CancelJob(id)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	_, _, _, canceled, err = q.JobStatus(id, &testResult{})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.True(t, canceled)
Packit 63bb0d
	err = q.FinishJob(id, &testResult{})
Packit 63bb0d
	require.Error(t, err)
Packit 63bb0d
Packit 63bb0d
	// Cancel a finished job, which is a no-op
Packit 63bb0d
	id = pushTestJob(t, q, "clownfish", nil, nil)
Packit 63bb0d
	require.NotEmpty(t, id)
Packit 63bb0d
	r, err = q.Dequeue(context.Background(), []string{"clownfish"}, &json.RawMessage{})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.Equal(t, id, r)
Packit 63bb0d
	err = q.FinishJob(id, &testResult{})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	err = q.CancelJob(id)
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	_, _, _, canceled, err = q.JobStatus(id, &testResult{})
Packit 63bb0d
	require.NoError(t, err)
Packit 63bb0d
	require.False(t, canceled)
Packit 63bb0d
}