|
Packit |
63bb0d |
package fsjobqueue_test
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
import (
|
|
Packit |
63bb0d |
"context"
|
|
Packit |
63bb0d |
"encoding/json"
|
|
Packit |
63bb0d |
"io/ioutil"
|
|
Packit |
63bb0d |
"os"
|
|
Packit |
63bb0d |
"testing"
|
|
Packit |
63bb0d |
"time"
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
"github.com/google/uuid"
|
|
Packit |
63bb0d |
"github.com/stretchr/testify/require"
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
"github.com/osbuild/osbuild-composer/internal/jobqueue"
|
|
Packit |
63bb0d |
"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
|
|
Packit |
63bb0d |
)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
type testResult struct {
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func cleanupTempDir(t *testing.T, dir string) {
|
|
Packit |
63bb0d |
err := os.RemoveAll(dir)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
func newTemporaryQueue(t *testing.T) (jobqueue.JobQueue, string) {
|
|
Packit |
63bb0d |
dir, err := ioutil.TempDir("", "jobqueue-test-")
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
q, err := fsjobqueue.New(dir)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.NotNil(t, q)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return q, dir
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID {
|
|
Packit |
63bb0d |
t.Helper()
|
|
Packit |
63bb0d |
id, err := q.Enqueue(jobType, args, dependencies)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.NotEmpty(t, id)
|
|
Packit |
63bb0d |
return id
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}, deps []uuid.UUID) uuid.UUID {
|
|
Packit Service |
509fd4 |
id, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType})
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.NotEmpty(t, id)
|
|
Packit Service |
509fd4 |
require.ElementsMatch(t, deps, d)
|
|
Packit Service |
509fd4 |
require.Equal(t, jobType, typ)
|
|
Packit Service |
509fd4 |
require.NotNil(t, args)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
err = q.FinishJob(id, result)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return id
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func TestNonExistant(t *testing.T) {
|
|
Packit Service |
509fd4 |
q, err := fsjobqueue.New("/non-existant-directory")
|
|
Packit |
63bb0d |
require.Error(t, err)
|
|
Packit |
63bb0d |
require.Nil(t, q)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func TestErrors(t *testing.T) {
|
|
Packit Service |
509fd4 |
q, dir := newTemporaryQueue(t)
|
|
Packit |
63bb0d |
defer cleanupTempDir(t, dir)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// not serializable to JSON
|
|
Packit |
63bb0d |
id, err := q.Enqueue("test", make(chan string), nil)
|
|
Packit |
63bb0d |
require.Error(t, err)
|
|
Packit |
63bb0d |
require.Equal(t, uuid.Nil, id)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// invalid dependency
|
|
Packit |
63bb0d |
id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()})
|
|
Packit |
63bb0d |
require.Error(t, err)
|
|
Packit |
63bb0d |
require.Equal(t, uuid.Nil, id)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func TestArgs(t *testing.T) {
|
|
Packit |
63bb0d |
type argument struct {
|
|
Packit |
63bb0d |
I int
|
|
Packit |
63bb0d |
S string
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
q, dir := newTemporaryQueue(t)
|
|
Packit |
63bb0d |
defer cleanupTempDir(t, dir)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
oneargs := argument{7, "🐠"}
|
|
Packit |
63bb0d |
one := pushTestJob(t, q, "fish", oneargs, nil)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
twoargs := argument{42, "🐙"}
|
|
Packit |
63bb0d |
two := pushTestJob(t, q, "octopus", twoargs, nil)
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
var parsedArgs argument
|
|
Packit Service |
509fd4 |
|
|
Packit Service |
509fd4 |
id, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"})
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.Equal(t, two, id)
|
|
Packit Service |
509fd4 |
require.Empty(t, deps)
|
|
Packit Service |
509fd4 |
require.Equal(t, "octopus", typ)
|
|
Packit Service |
509fd4 |
err = json.Unmarshal(args, &parsedArgs)
|
|
Packit Service |
509fd4 |
require.NoError(t, err)
|
|
Packit Service |
509fd4 |
require.Equal(t, twoargs, parsedArgs)
|
|
Packit |
63bb0d |
|
|
Packit Service |
3a6627 |
// Read job params after Dequeue
|
|
Packit Service |
3a6627 |
jtype, jargs, jdeps, err := q.Job(id)
|
|
Packit Service |
3a6627 |
require.NoError(t, err)
|
|
Packit Service |
3a6627 |
require.Equal(t, args, jargs)
|
|
Packit Service |
3a6627 |
require.Equal(t, deps, jdeps)
|
|
Packit Service |
3a6627 |
require.Equal(t, typ, jtype)
|
|
Packit Service |
3a6627 |
|
|
Packit Service |
509fd4 |
id, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"})
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.Equal(t, one, id)
|
|
Packit Service |
509fd4 |
require.Empty(t, deps)
|
|
Packit Service |
509fd4 |
require.Equal(t, "fish", typ)
|
|
Packit Service |
509fd4 |
err = json.Unmarshal(args, &parsedArgs)
|
|
Packit Service |
509fd4 |
require.NoError(t, err)
|
|
Packit Service |
509fd4 |
require.Equal(t, oneargs, parsedArgs)
|
|
Packit Service |
3a6627 |
|
|
Packit Service |
3a6627 |
jtype, jargs, jdeps, err = q.Job(id)
|
|
Packit Service |
3a6627 |
require.NoError(t, err)
|
|
Packit Service |
3a6627 |
require.Equal(t, args, jargs)
|
|
Packit Service |
3a6627 |
require.Equal(t, deps, jdeps)
|
|
Packit Service |
3a6627 |
require.Equal(t, typ, jtype)
|
|
Packit Service |
3a6627 |
|
|
Packit Service |
3a6627 |
_, _, _, err = q.Job(uuid.New())
|
|
Packit Service |
3a6627 |
require.Error(t, err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func TestJobTypes(t *testing.T) {
|
|
Packit Service |
509fd4 |
q, dir := newTemporaryQueue(t)
|
|
Packit |
63bb0d |
defer cleanupTempDir(t, dir)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
one := pushTestJob(t, q, "octopus", nil, nil)
|
|
Packit |
63bb0d |
two := pushTestJob(t, q, "clownfish", nil, nil)
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{}, nil))
|
|
Packit Service |
509fd4 |
require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil))
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
ctx, cancel := context.WithCancel(context.Background())
|
|
Packit |
63bb0d |
cancel()
|
|
Packit Service |
509fd4 |
id, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"})
|
|
Packit |
63bb0d |
require.Equal(t, err, context.Canceled)
|
|
Packit |
63bb0d |
require.Equal(t, uuid.Nil, id)
|
|
Packit Service |
509fd4 |
require.Empty(t, deps)
|
|
Packit Service |
509fd4 |
require.Equal(t, "", typ)
|
|
Packit Service |
509fd4 |
require.Nil(t, args)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func TestDependencies(t *testing.T) {
|
|
Packit Service |
509fd4 |
q, dir := newTemporaryQueue(t)
|
|
Packit |
63bb0d |
defer cleanupTempDir(t, dir)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
t.Run("done-before-pushing-dependant", func(t *testing.T) {
|
|
Packit |
63bb0d |
one := pushTestJob(t, q, "test", nil, nil)
|
|
Packit |
63bb0d |
two := pushTestJob(t, q, "test", nil, nil)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
r := []uuid.UUID{}
|
|
Packit Service |
509fd4 |
r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
|
|
Packit Service |
509fd4 |
r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
|
|
Packit |
63bb0d |
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
|
|
Packit Service |
509fd4 |
_, queued, started, finished, canceled, deps, err := q.JobStatus(j)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.True(t, !queued.IsZero())
|
|
Packit |
63bb0d |
require.True(t, started.IsZero())
|
|
Packit |
63bb0d |
require.True(t, finished.IsZero())
|
|
Packit |
63bb0d |
require.False(t, canceled)
|
|
Packit Service |
509fd4 |
require.ElementsMatch(t, deps, []uuid.UUID{one, two})
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.True(t, !queued.IsZero())
|
|
Packit |
63bb0d |
require.True(t, !started.IsZero())
|
|
Packit |
63bb0d |
require.True(t, !finished.IsZero())
|
|
Packit |
63bb0d |
require.False(t, canceled)
|
|
Packit Service |
509fd4 |
require.ElementsMatch(t, deps, []uuid.UUID{one, two})
|
|
Packit Service |
509fd4 |
|
|
Packit Service |
509fd4 |
err = json.Unmarshal(result, &testResult{})
|
|
Packit Service |
509fd4 |
require.NoError(t, err)
|
|
Packit |
63bb0d |
})
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
t.Run("done-after-pushing-dependant", func(t *testing.T) {
|
|
Packit |
63bb0d |
one := pushTestJob(t, q, "test", nil, nil)
|
|
Packit |
63bb0d |
two := pushTestJob(t, q, "test", nil, nil)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
|
|
Packit Service |
509fd4 |
_, queued, started, finished, canceled, deps, err := q.JobStatus(j)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.True(t, !queued.IsZero())
|
|
Packit |
63bb0d |
require.True(t, started.IsZero())
|
|
Packit |
63bb0d |
require.True(t, finished.IsZero())
|
|
Packit |
63bb0d |
require.False(t, canceled)
|
|
Packit Service |
509fd4 |
require.ElementsMatch(t, deps, []uuid.UUID{one, two})
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
r := []uuid.UUID{}
|
|
Packit Service |
509fd4 |
r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
|
|
Packit Service |
509fd4 |
r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
|
|
Packit |
63bb0d |
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.True(t, !queued.IsZero())
|
|
Packit |
63bb0d |
require.True(t, !started.IsZero())
|
|
Packit |
63bb0d |
require.True(t, !finished.IsZero())
|
|
Packit |
63bb0d |
require.False(t, canceled)
|
|
Packit Service |
509fd4 |
require.ElementsMatch(t, deps, []uuid.UUID{one, two})
|
|
Packit Service |
509fd4 |
|
|
Packit Service |
509fd4 |
err = json.Unmarshal(result, &testResult{})
|
|
Packit Service |
509fd4 |
require.NoError(t, err)
|
|
Packit |
63bb0d |
})
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Test that a job queue allows parallel access to multiple workers, mainly to
|
|
Packit |
63bb0d |
// verify the quirky unlocking in Dequeue().
|
|
Packit |
63bb0d |
func TestMultipleWorkers(t *testing.T) {
|
|
Packit Service |
509fd4 |
q, dir := newTemporaryQueue(t)
|
|
Packit |
63bb0d |
defer cleanupTempDir(t, dir)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
done := make(chan struct{})
|
|
Packit |
63bb0d |
go func() {
|
|
Packit |
63bb0d |
defer close(done)
|
|
Packit |
63bb0d |
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
Packit |
63bb0d |
defer cancel()
|
|
Packit Service |
509fd4 |
id, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"})
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.NotEmpty(t, id)
|
|
Packit Service |
509fd4 |
require.Empty(t, deps)
|
|
Packit Service |
509fd4 |
require.Equal(t, "octopus", typ)
|
|
Packit Service |
509fd4 |
require.Equal(t, json.RawMessage("null"), args)
|
|
Packit |
63bb0d |
}()
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Increase the likelihood that the above goroutine was scheduled and
|
|
Packit |
63bb0d |
// is waiting in Dequeue().
|
|
Packit |
63bb0d |
time.Sleep(10 * time.Millisecond)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// This call to Dequeue() should not block on the one in the goroutine.
|
|
Packit |
63bb0d |
id := pushTestJob(t, q, "clownfish", nil, nil)
|
|
Packit Service |
509fd4 |
r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.Equal(t, id, r)
|
|
Packit Service |
509fd4 |
require.Empty(t, deps)
|
|
Packit Service |
509fd4 |
require.Equal(t, "clownfish", typ)
|
|
Packit Service |
509fd4 |
require.Equal(t, json.RawMessage("null"), args)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Now wake up the Dequeue() in the goroutine and wait for it to finish.
|
|
Packit |
63bb0d |
_ = pushTestJob(t, q, "octopus", nil, nil)
|
|
Packit |
63bb0d |
<-done
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func TestCancel(t *testing.T) {
|
|
Packit Service |
509fd4 |
q, dir := newTemporaryQueue(t)
|
|
Packit |
63bb0d |
defer cleanupTempDir(t, dir)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Cancel a non-existing job
|
|
Packit |
63bb0d |
err := q.CancelJob(uuid.New())
|
|
Packit |
63bb0d |
require.Error(t, err)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Cancel a pending job
|
|
Packit |
63bb0d |
id := pushTestJob(t, q, "clownfish", nil, nil)
|
|
Packit |
63bb0d |
require.NotEmpty(t, id)
|
|
Packit |
63bb0d |
err = q.CancelJob(id)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit Service |
509fd4 |
result, _, _, _, canceled, _, err := q.JobStatus(id)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.True(t, canceled)
|
|
Packit Service |
509fd4 |
require.Nil(t, result)
|
|
Packit |
63bb0d |
err = q.FinishJob(id, &testResult{})
|
|
Packit |
63bb0d |
require.Error(t, err)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Cancel a running job, which should not dequeue the canceled job from above
|
|
Packit |
63bb0d |
id = pushTestJob(t, q, "clownfish", nil, nil)
|
|
Packit |
63bb0d |
require.NotEmpty(t, id)
|
|
Packit Service |
509fd4 |
r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.Equal(t, id, r)
|
|
Packit Service |
509fd4 |
require.Empty(t, deps)
|
|
Packit Service |
509fd4 |
require.Equal(t, "clownfish", typ)
|
|
Packit Service |
509fd4 |
require.Equal(t, json.RawMessage("null"), args)
|
|
Packit |
63bb0d |
err = q.CancelJob(id)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit Service |
509fd4 |
result, _, _, _, canceled, _, err = q.JobStatus(id)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.True(t, canceled)
|
|
Packit Service |
509fd4 |
require.Nil(t, result)
|
|
Packit |
63bb0d |
err = q.FinishJob(id, &testResult{})
|
|
Packit |
63bb0d |
require.Error(t, err)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Cancel a finished job, which is a no-op
|
|
Packit |
63bb0d |
id = pushTestJob(t, q, "clownfish", nil, nil)
|
|
Packit |
63bb0d |
require.NotEmpty(t, id)
|
|
Packit Service |
509fd4 |
r, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"})
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.Equal(t, id, r)
|
|
Packit Service |
509fd4 |
require.Empty(t, deps)
|
|
Packit Service |
509fd4 |
require.Equal(t, "clownfish", typ)
|
|
Packit Service |
509fd4 |
require.Equal(t, json.RawMessage("null"), args)
|
|
Packit |
63bb0d |
err = q.FinishJob(id, &testResult{})
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
err = q.CancelJob(id)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit Service |
509fd4 |
result, _, _, _, canceled, _, err = q.JobStatus(id)
|
|
Packit |
63bb0d |
require.NoError(t, err)
|
|
Packit |
63bb0d |
require.False(t, canceled)
|
|
Packit Service |
509fd4 |
err = json.Unmarshal(result, &testResult{})
|
|
Packit Service |
509fd4 |
require.NoError(t, err)
|
|
Packit |
63bb0d |
}
|