|
Packit |
63bb0d |
// Package fsjobqueue implements a filesystem-backed job queue. It implements
|
|
Packit |
63bb0d |
// the interfaces in package jobqueue.
|
|
Packit |
63bb0d |
//
|
|
Packit |
63bb0d |
// Jobs are stored in the file system, using the `jsondb` package. However,
|
|
Packit |
63bb0d |
// this package does not use the file system as a database, but keeps some
|
|
Packit |
63bb0d |
// state in memory. This means that access to a given directory must be
|
|
Packit |
63bb0d |
// exclusive to only one `fsJobQueue` object at a time. A single `fsJobQueue`
|
|
Packit |
63bb0d |
// can be safely accessed from multiple goroutines, though.
|
|
Packit |
63bb0d |
//
|
|
Packit |
63bb0d |
// Data is stored non-reduntantly. Any data structure necessary for efficient
|
|
Packit |
63bb0d |
// access (e.g., dependants) are kept in memory.
|
|
Packit |
63bb0d |
package fsjobqueue
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
import (
|
|
Packit |
63bb0d |
"context"
|
|
Packit |
63bb0d |
"encoding/json"
|
|
Packit |
63bb0d |
"errors"
|
|
Packit |
63bb0d |
"fmt"
|
|
Packit |
63bb0d |
"reflect"
|
|
Packit |
63bb0d |
"sync"
|
|
Packit |
63bb0d |
"time"
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
"github.com/google/uuid"
|
|
Packit |
63bb0d |
"github.com/osbuild/osbuild-composer/internal/jobqueue"
|
|
Packit |
63bb0d |
"github.com/osbuild/osbuild-composer/internal/jsondb"
|
|
Packit |
63bb0d |
)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
type fsJobQueue struct {
|
|
Packit |
63bb0d |
// Protects all fields of this struct. In particular, it ensures
|
|
Packit |
63bb0d |
// transactions on `db` are atomic. All public functions except
|
|
Packit |
63bb0d |
// JobStatus hold it while they're running. Dequeue() releases it
|
|
Packit |
63bb0d |
// briefly while waiting on pending channels.
|
|
Packit |
63bb0d |
mu sync.Mutex
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
db *jsondb.JSONDatabase
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Maps job types to channels of job ids for that type.
|
|
Packit |
63bb0d |
pending map[string]chan uuid.UUID
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Maps job ids to the jobs that depend on it, if any of those
|
|
Packit |
63bb0d |
// dependants have not yet finished.
|
|
Packit |
63bb0d |
dependants map[uuid.UUID][]uuid.UUID
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// On-disk job struct. Contains all necessary (but non-redundant) information
|
|
Packit |
63bb0d |
// about a job. These are not held in memory by the job queue, but
|
|
Packit |
63bb0d |
// (de)serialized on each access.
|
|
Packit |
63bb0d |
type job struct {
|
|
Packit |
63bb0d |
Id uuid.UUID `json:"id"`
|
|
Packit |
63bb0d |
Type string `json:"type"`
|
|
Packit |
63bb0d |
Args json.RawMessage `json:"args,omitempty"`
|
|
Packit |
63bb0d |
Dependencies []uuid.UUID `json:"dependencies"`
|
|
Packit |
63bb0d |
Result json.RawMessage `json:"result,omitempty"`
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
QueuedAt time.Time `json:"queued_at,omitempty"`
|
|
Packit |
63bb0d |
StartedAt time.Time `json:"started_at,omitempty"`
|
|
Packit |
63bb0d |
FinishedAt time.Time `json:"finished_at,omitempty"`
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
Canceled bool `json:"canceled,omitempty"`
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
// The size of channels used in fsJobQueue for queueing jobs.
|
|
Packit Service |
509fd4 |
// Note that each job type has its own queue.
|
|
Packit Service |
509fd4 |
const channelSize = 100
|
|
Packit Service |
509fd4 |
|
|
Packit |
63bb0d |
// Create a new fsJobQueue object for `dir`. This object must have exclusive
|
|
Packit |
63bb0d |
// access to `dir`. If `dir` contains jobs created from previous runs, they are
|
|
Packit |
63bb0d |
// loaded and rescheduled to run if necessary.
|
|
Packit Service |
509fd4 |
func New(dir string) (*fsJobQueue, error) {
|
|
Packit |
63bb0d |
q := &fsJobQueue{
|
|
Packit |
63bb0d |
db: jsondb.New(dir, 0600),
|
|
Packit |
63bb0d |
pending: make(map[string]chan uuid.UUID),
|
|
Packit |
63bb0d |
dependants: make(map[uuid.UUID][]uuid.UUID),
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Look for jobs that are still pending and build the dependant map.
|
|
Packit |
63bb0d |
ids, err := q.db.List()
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return nil, fmt.Errorf("error listing jobs: %v", err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
for _, id := range ids {
|
|
Packit |
63bb0d |
uuid, err := uuid.Parse(id)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return nil, fmt.Errorf("invalid job '%s' in db: %v", id, err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
j, err := q.readJob(uuid)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return nil, err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
err = q.maybeEnqueue(j, true)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return nil, err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return q, nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
|
|
Packit |
63bb0d |
q.mu.Lock()
|
|
Packit |
63bb0d |
defer q.mu.Unlock()
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
var j = job{
|
|
Packit |
63bb0d |
Id: uuid.New(),
|
|
Packit |
63bb0d |
Type: jobType,
|
|
Packit Service |
509fd4 |
Dependencies: dependencies,
|
|
Packit |
63bb0d |
QueuedAt: time.Now(),
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
var err error
|
|
Packit |
63bb0d |
j.Args, err = json.Marshal(args)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return uuid.Nil, fmt.Errorf("error marshaling job arguments: %v", err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Verify dependendencies early, so that the job doesn't get written
|
|
Packit |
63bb0d |
// when one of them doesn't exist.
|
|
Packit |
63bb0d |
for _, d := range j.Dependencies {
|
|
Packit |
63bb0d |
exists, err := q.db.Read(d.String(), nil)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return uuid.Nil, err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
if !exists {
|
|
Packit |
63bb0d |
return uuid.Nil, jobqueue.ErrNotExist
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Write the job before updating in-memory state, so that the latter
|
|
Packit |
63bb0d |
// doesn't become corrupt when writing fails.
|
|
Packit |
63bb0d |
err = q.db.Write(j.Id.String(), j)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return uuid.Nil, fmt.Errorf("cannot write job: %v:", err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
err = q.maybeEnqueue(&j, true)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return uuid.Nil, err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return j.Id, nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
|
|
Packit |
63bb0d |
q.mu.Lock()
|
|
Packit |
63bb0d |
defer q.mu.Unlock()
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Return early if the context is already canceled.
|
|
Packit |
63bb0d |
if err := ctx.Err(); err != nil {
|
|
Packit Service |
509fd4 |
return uuid.Nil, nil, "", nil, err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Filter q.pending by the `jobTypes`. Ignore those job types that this
|
|
Packit |
63bb0d |
// queue doesn't accept.
|
|
Packit |
63bb0d |
chans := []chan uuid.UUID{}
|
|
Packit |
63bb0d |
for _, jt := range jobTypes {
|
|
Packit Service |
509fd4 |
c, exists := q.pending[jt]
|
|
Packit Service |
509fd4 |
if !exists {
|
|
Packit Service |
509fd4 |
c = make(chan uuid.UUID, channelSize)
|
|
Packit Service |
509fd4 |
q.pending[jt] = c
|
|
Packit |
63bb0d |
}
|
|
Packit Service |
509fd4 |
chans = append(chans, c)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Loop until finding a non-canceled job.
|
|
Packit |
63bb0d |
var j *job
|
|
Packit |
63bb0d |
for {
|
|
Packit |
63bb0d |
// Unlock the mutex while polling channels, so that multiple goroutines
|
|
Packit |
63bb0d |
// can wait at the same time.
|
|
Packit |
63bb0d |
q.mu.Unlock()
|
|
Packit |
63bb0d |
id, err := selectUUIDChannel(ctx, chans)
|
|
Packit |
63bb0d |
q.mu.Lock()
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
// Delete empty channels
|
|
Packit Service |
509fd4 |
for _, jt := range jobTypes {
|
|
Packit Service |
509fd4 |
c, exists := q.pending[jt]
|
|
Packit Service |
509fd4 |
if exists && len(c) == 0 {
|
|
Packit Service |
509fd4 |
close(c)
|
|
Packit Service |
509fd4 |
delete(q.pending, jt)
|
|
Packit Service |
509fd4 |
}
|
|
Packit Service |
509fd4 |
}
|
|
Packit Service |
509fd4 |
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit Service |
509fd4 |
return uuid.Nil, nil, "", nil, err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
j, err = q.readJob(id)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit Service |
509fd4 |
return uuid.Nil, nil, "", nil, err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if !j.Canceled {
|
|
Packit |
63bb0d |
break
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
j.StartedAt = time.Now()
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
err := q.db.Write(j.Id.String(), j)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit Service |
509fd4 |
return uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
return j.Id, j.Dependencies, j.Type, j.Args, nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
|
|
Packit |
63bb0d |
q.mu.Lock()
|
|
Packit |
63bb0d |
defer q.mu.Unlock()
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
j, err := q.readJob(id)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if j.Canceled {
|
|
Packit |
63bb0d |
return jobqueue.ErrCanceled
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
|
|
Packit |
63bb0d |
return jobqueue.ErrNotRunning
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
j.FinishedAt = time.Now()
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
j.Result, err = json.Marshal(result)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return fmt.Errorf("error marshaling result: %v", err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Write before notifying dependants, because it will be read again.
|
|
Packit |
63bb0d |
err = q.db.Write(id.String(), j)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return fmt.Errorf("error writing job %s: %v", id, err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
for _, depid := range q.dependants[id] {
|
|
Packit |
63bb0d |
dep, err := q.readJob(depid)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
err = q.maybeEnqueue(dep, false)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
delete(q.dependants, id)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
|
|
Packit |
63bb0d |
q.mu.Lock()
|
|
Packit |
63bb0d |
defer q.mu.Unlock()
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
j, err := q.readJob(id)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if !j.FinishedAt.IsZero() {
|
|
Packit |
63bb0d |
return nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
j.Canceled = true
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
err = q.db.Write(id.String(), j)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return fmt.Errorf("error writing job %s: %v", id, err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
func (q *fsJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
|
|
Packit |
63bb0d |
j, err := q.readJob(id)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit Service |
509fd4 |
result = j.Result
|
|
Packit |
63bb0d |
queued = j.QueuedAt
|
|
Packit |
63bb0d |
started = j.StartedAt
|
|
Packit |
63bb0d |
finished = j.FinishedAt
|
|
Packit |
63bb0d |
canceled = j.Canceled
|
|
Packit Service |
509fd4 |
deps = j.Dependencies
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit Service |
3a6627 |
func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) {
|
|
Packit Service |
3a6627 |
j, err := q.readJob(id)
|
|
Packit Service |
3a6627 |
if err != nil {
|
|
Packit Service |
3a6627 |
return
|
|
Packit Service |
3a6627 |
}
|
|
Packit Service |
3a6627 |
|
|
Packit Service |
3a6627 |
jobType = j.Type
|
|
Packit Service |
3a6627 |
args = j.Args
|
|
Packit Service |
3a6627 |
dependencies = j.Dependencies
|
|
Packit Service |
3a6627 |
|
|
Packit Service |
3a6627 |
return
|
|
Packit Service |
3a6627 |
}
|
|
Packit Service |
3a6627 |
|
|
Packit |
63bb0d |
// Reads job with `id`. This is a thin wrapper around `q.db.Read`, which
|
|
Packit |
63bb0d |
// returns the job directly, or and error if a job with `id` does not exist.
|
|
Packit |
63bb0d |
func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {
|
|
Packit |
63bb0d |
var j job
|
|
Packit |
63bb0d |
exists, err := q.db.Read(id.String(), &j)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return nil, fmt.Errorf("error reading job '%s': %v", id, err)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
if !exists {
|
|
Packit |
63bb0d |
// return corrupt database?
|
|
Packit |
63bb0d |
return nil, jobqueue.ErrNotExist
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
return &j, nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Enqueue `job` if it is pending and all its dependencies have finished.
|
|
Packit |
63bb0d |
// Update `q.dependants` if the job was not queued and updateDependants is true
|
|
Packit |
63bb0d |
// (i.e., when this is a new job).
|
|
Packit Service |
509fd4 |
// `q.mu` must be locked when this method is called. The only exception is
|
|
Packit Service |
509fd4 |
// `New()` because no concurrent calls are possible there.
|
|
Packit |
63bb0d |
func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
|
|
Packit |
63bb0d |
if !j.StartedAt.IsZero() {
|
|
Packit |
63bb0d |
return nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
depsFinished := true
|
|
Packit |
63bb0d |
for _, id := range j.Dependencies {
|
|
Packit |
63bb0d |
j, err := q.readJob(id)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
if j.FinishedAt.IsZero() {
|
|
Packit |
63bb0d |
depsFinished = false
|
|
Packit |
63bb0d |
break
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if depsFinished {
|
|
Packit |
63bb0d |
c, exists := q.pending[j.Type]
|
|
Packit |
63bb0d |
if !exists {
|
|
Packit Service |
509fd4 |
c = make(chan uuid.UUID, channelSize)
|
|
Packit Service |
509fd4 |
q.pending[j.Type] = c
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
c <- j.Id
|
|
Packit |
63bb0d |
} else if updateDependants {
|
|
Packit |
63bb0d |
for _, id := range j.Dependencies {
|
|
Packit |
63bb0d |
q.dependants[id] = append(q.dependants[id], j.Id)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Select on a list of `chan uuid.UUID`s. Returns an error if one of the
|
|
Packit |
63bb0d |
// channels is closed.
|
|
Packit |
63bb0d |
//
|
|
Packit |
63bb0d |
// Uses reflect.Select(), because the `select` statement cannot operate on an
|
|
Packit |
63bb0d |
// unknown amount of channels.
|
|
Packit |
63bb0d |
func selectUUIDChannel(ctx context.Context, chans []chan uuid.UUID) (uuid.UUID, error) {
|
|
Packit |
63bb0d |
cases := []reflect.SelectCase{
|
|
Packit |
63bb0d |
{
|
|
Packit |
63bb0d |
Dir: reflect.SelectRecv,
|
|
Packit |
63bb0d |
Chan: reflect.ValueOf(ctx.Done()),
|
|
Packit |
63bb0d |
},
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
for _, c := range chans {
|
|
Packit |
63bb0d |
cases = append(cases, reflect.SelectCase{
|
|
Packit |
63bb0d |
Dir: reflect.SelectRecv,
|
|
Packit |
63bb0d |
Chan: reflect.ValueOf(c),
|
|
Packit |
63bb0d |
})
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
chosen, value, recvOK := reflect.Select(cases)
|
|
Packit |
63bb0d |
if !recvOK {
|
|
Packit |
63bb0d |
if chosen == 0 {
|
|
Packit |
63bb0d |
return uuid.Nil, ctx.Err()
|
|
Packit |
63bb0d |
} else {
|
|
Packit |
63bb0d |
return uuid.Nil, errors.New("channel was closed unexpectedly")
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return value.Interface().(uuid.UUID), nil
|
|
Packit |
63bb0d |
}
|