Blob Blame History Raw
package fsjobqueue_test

import (
	"context"
	"encoding/json"
	"io/ioutil"
	"os"
	"testing"
	"time"

	"github.com/google/uuid"
	"github.com/stretchr/testify/require"

	"github.com/osbuild/osbuild-composer/internal/jobqueue"
	"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
)

type testResult struct {
}

func cleanupTempDir(t *testing.T, dir string) {
	err := os.RemoveAll(dir)
	require.NoError(t, err)
}

func newTemporaryQueue(t *testing.T) (jobqueue.JobQueue, string) {
	dir, err := ioutil.TempDir("", "jobqueue-test-")
	require.NoError(t, err)

	q, err := fsjobqueue.New(dir)
	require.NoError(t, err)
	require.NotNil(t, q)

	return q, dir
}

func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID {
	t.Helper()
	id, err := q.Enqueue(jobType, args, dependencies)
	require.NoError(t, err)
	require.NotEmpty(t, id)
	return id
}

func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}, deps []uuid.UUID) uuid.UUID {
	id, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType})
	require.NoError(t, err)
	require.NotEmpty(t, id)
	require.ElementsMatch(t, deps, d)
	require.Equal(t, jobType, typ)
	require.NotNil(t, args)

	err = q.FinishJob(id, result)
	require.NoError(t, err)

	return id
}

func TestNonExistant(t *testing.T) {
	q, err := fsjobqueue.New("/non-existant-directory")
	require.Error(t, err)
	require.Nil(t, q)
}

func TestErrors(t *testing.T) {
	q, dir := newTemporaryQueue(t)
	defer cleanupTempDir(t, dir)

	// not serializable to JSON
	id, err := q.Enqueue("test", make(chan string), nil)
	require.Error(t, err)
	require.Equal(t, uuid.Nil, id)

	// invalid dependency
	id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()})
	require.Error(t, err)
	require.Equal(t, uuid.Nil, id)
}

func TestArgs(t *testing.T) {
	type argument struct {
		I int
		S string
	}

	q, dir := newTemporaryQueue(t)
	defer cleanupTempDir(t, dir)

	oneargs := argument{7, "🐠"}
	one := pushTestJob(t, q, "fish", oneargs, nil)

	twoargs := argument{42, "🐙"}
	two := pushTestJob(t, q, "octopus", twoargs, nil)

	var parsedArgs argument

	id, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"})
	require.NoError(t, err)
	require.Equal(t, two, id)
	require.Empty(t, deps)
	require.Equal(t, "octopus", typ)
	err = json.Unmarshal(args, &parsedArgs)
	require.NoError(t, err)
	require.Equal(t, twoargs, parsedArgs)

	// Read job params after Dequeue
	jtype, jargs, jdeps, err := q.Job(id)
	require.NoError(t, err)
	require.Equal(t, args, jargs)
	require.Equal(t, deps, jdeps)
	require.Equal(t, typ, jtype)

	id, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"})
	require.NoError(t, err)
	require.Equal(t, one, id)
	require.Empty(t, deps)
	require.Equal(t, "fish", typ)
	err = json.Unmarshal(args, &parsedArgs)
	require.NoError(t, err)
	require.Equal(t, oneargs, parsedArgs)

	jtype, jargs, jdeps, err = q.Job(id)
	require.NoError(t, err)
	require.Equal(t, args, jargs)
	require.Equal(t, deps, jdeps)
	require.Equal(t, typ, jtype)

	_, _, _, err = q.Job(uuid.New())
	require.Error(t, err)
}

func TestJobTypes(t *testing.T) {
	q, dir := newTemporaryQueue(t)
	defer cleanupTempDir(t, dir)

	one := pushTestJob(t, q, "octopus", nil, nil)
	two := pushTestJob(t, q, "clownfish", nil, nil)

	require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{}, nil))
	require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil))

	ctx, cancel := context.WithCancel(context.Background())
	cancel()
	id, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"})
	require.Equal(t, err, context.Canceled)
	require.Equal(t, uuid.Nil, id)
	require.Empty(t, deps)
	require.Equal(t, "", typ)
	require.Nil(t, args)
}

func TestDependencies(t *testing.T) {
	q, dir := newTemporaryQueue(t)
	defer cleanupTempDir(t, dir)

	t.Run("done-before-pushing-dependant", func(t *testing.T) {
		one := pushTestJob(t, q, "test", nil, nil)
		two := pushTestJob(t, q, "test", nil, nil)

		r := []uuid.UUID{}
		r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
		r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
		require.ElementsMatch(t, []uuid.UUID{one, two}, r)

		j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
		_, queued, started, finished, canceled, deps, err := q.JobStatus(j)
		require.NoError(t, err)
		require.True(t, !queued.IsZero())
		require.True(t, started.IsZero())
		require.True(t, finished.IsZero())
		require.False(t, canceled)
		require.ElementsMatch(t, deps, []uuid.UUID{one, two})

		require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))

		result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
		require.NoError(t, err)
		require.True(t, !queued.IsZero())
		require.True(t, !started.IsZero())
		require.True(t, !finished.IsZero())
		require.False(t, canceled)
		require.ElementsMatch(t, deps, []uuid.UUID{one, two})

		err = json.Unmarshal(result, &testResult{})
		require.NoError(t, err)
	})

	t.Run("done-after-pushing-dependant", func(t *testing.T) {
		one := pushTestJob(t, q, "test", nil, nil)
		two := pushTestJob(t, q, "test", nil, nil)

		j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
		_, queued, started, finished, canceled, deps, err := q.JobStatus(j)
		require.NoError(t, err)
		require.True(t, !queued.IsZero())
		require.True(t, started.IsZero())
		require.True(t, finished.IsZero())
		require.False(t, canceled)
		require.ElementsMatch(t, deps, []uuid.UUID{one, two})

		r := []uuid.UUID{}
		r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
		r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
		require.ElementsMatch(t, []uuid.UUID{one, two}, r)

		require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))

		result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
		require.NoError(t, err)
		require.True(t, !queued.IsZero())
		require.True(t, !started.IsZero())
		require.True(t, !finished.IsZero())
		require.False(t, canceled)
		require.ElementsMatch(t, deps, []uuid.UUID{one, two})

		err = json.Unmarshal(result, &testResult{})
		require.NoError(t, err)
	})
}

// Test that a job queue allows parallel access to multiple workers, mainly to
// verify the quirky unlocking in Dequeue().
func TestMultipleWorkers(t *testing.T) {
	q, dir := newTemporaryQueue(t)
	defer cleanupTempDir(t, dir)

	done := make(chan struct{})
	go func() {
		defer close(done)
		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
		defer cancel()
		id, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"})
		require.NoError(t, err)
		require.NotEmpty(t, id)
		require.Empty(t, deps)
		require.Equal(t, "octopus", typ)
		require.Equal(t, json.RawMessage("null"), args)
	}()

	// Increase the likelihood that the above goroutine was scheduled and
	// is waiting in Dequeue().
	time.Sleep(10 * time.Millisecond)

	// This call to Dequeue() should not block on the one in the goroutine.
	id := pushTestJob(t, q, "clownfish", nil, nil)
	r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
	require.NoError(t, err)
	require.Equal(t, id, r)
	require.Empty(t, deps)
	require.Equal(t, "clownfish", typ)
	require.Equal(t, json.RawMessage("null"), args)

	// Now wake up the Dequeue() in the goroutine and wait for it to finish.
	_ = pushTestJob(t, q, "octopus", nil, nil)
	<-done
}

func TestCancel(t *testing.T) {
	q, dir := newTemporaryQueue(t)
	defer cleanupTempDir(t, dir)

	// Cancel a non-existing job
	err := q.CancelJob(uuid.New())
	require.Error(t, err)

	// Cancel a pending job
	id := pushTestJob(t, q, "clownfish", nil, nil)
	require.NotEmpty(t, id)
	err = q.CancelJob(id)
	require.NoError(t, err)
	result, _, _, _, canceled, _, err := q.JobStatus(id)
	require.NoError(t, err)
	require.True(t, canceled)
	require.Nil(t, result)
	err = q.FinishJob(id, &testResult{})
	require.Error(t, err)

	// Cancel a running job, which should not dequeue the canceled job from above
	id = pushTestJob(t, q, "clownfish", nil, nil)
	require.NotEmpty(t, id)
	r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
	require.NoError(t, err)
	require.Equal(t, id, r)
	require.Empty(t, deps)
	require.Equal(t, "clownfish", typ)
	require.Equal(t, json.RawMessage("null"), args)
	err = q.CancelJob(id)
	require.NoError(t, err)
	result, _, _, _, canceled, _, err = q.JobStatus(id)
	require.NoError(t, err)
	require.True(t, canceled)
	require.Nil(t, result)
	err = q.FinishJob(id, &testResult{})
	require.Error(t, err)

	// Cancel a finished job, which is a no-op
	id = pushTestJob(t, q, "clownfish", nil, nil)
	require.NotEmpty(t, id)
	r, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"})
	require.NoError(t, err)
	require.Equal(t, id, r)
	require.Empty(t, deps)
	require.Equal(t, "clownfish", typ)
	require.Equal(t, json.RawMessage("null"), args)
	err = q.FinishJob(id, &testResult{})
	require.NoError(t, err)
	err = q.CancelJob(id)
	require.NoError(t, err)
	result, _, _, _, canceled, _, err = q.JobStatus(id)
	require.NoError(t, err)
	require.False(t, canceled)
	err = json.Unmarshal(result, &testResult{})
	require.NoError(t, err)
}