Blame vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_retry_reader.go

Packit 63bb0d
package azblob
Packit 63bb0d
Packit 63bb0d
import (
Packit 63bb0d
	"context"
Packit 63bb0d
	"io"
Packit 63bb0d
	"net"
Packit 63bb0d
	"net/http"
Packit 63bb0d
	"strings"
Packit 63bb0d
	"sync"
Packit 63bb0d
)
Packit 63bb0d
Packit 63bb0d
const CountToEnd = 0
Packit 63bb0d
Packit 63bb0d
// HTTPGetter is a function type that refers to a method that performs an HTTP GET operation.
Packit 63bb0d
type HTTPGetter func(ctx context.Context, i HTTPGetterInfo) (*http.Response, error)
Packit 63bb0d
Packit 63bb0d
// HTTPGetterInfo is passed to an HTTPGetter function passing it parameters
Packit 63bb0d
// that should be used to make an HTTP GET request.
Packit 63bb0d
type HTTPGetterInfo struct {
Packit 63bb0d
	// Offset specifies the start offset that should be used when
Packit 63bb0d
	// creating the HTTP GET request's Range header
Packit 63bb0d
	Offset int64
Packit 63bb0d
Packit 63bb0d
	// Count specifies the count of bytes that should be used to calculate
Packit 63bb0d
	// the end offset when creating the HTTP GET request's Range header
Packit 63bb0d
	Count int64
Packit 63bb0d
Packit 63bb0d
	// ETag specifies the resource's etag that should be used when creating
Packit 63bb0d
	// the HTTP GET request's If-Match header
Packit 63bb0d
	ETag ETag
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// FailedReadNotifier is a function type that represents the notification function called when a read fails
Packit 63bb0d
type FailedReadNotifier func(failureCount int, lastError error, offset int64, count int64, willRetry bool)
Packit 63bb0d
Packit 63bb0d
// RetryReaderOptions contains properties which can help to decide when to do retry.
Packit 63bb0d
type RetryReaderOptions struct {
Packit 63bb0d
	// MaxRetryRequests specifies the maximum number of HTTP GET requests that will be made
Packit 63bb0d
	// while reading from a RetryReader. A value of zero means that no additional HTTP
Packit 63bb0d
	// GET requests will be made.
Packit 63bb0d
	MaxRetryRequests   int
Packit 63bb0d
	doInjectError      bool
Packit 63bb0d
	doInjectErrorRound int
Packit Service 3a6627
	injectedError      error
Packit 63bb0d
Packit 63bb0d
	// NotifyFailedRead is called, if non-nil, after any failure to read. Expected usage is diagnostic logging.
Packit 63bb0d
	NotifyFailedRead FailedReadNotifier
Packit 63bb0d
Packit 63bb0d
	// TreatEarlyCloseAsError can be set to true to prevent retries after "read on closed response body". By default,
Packit 63bb0d
	// retryReader has the following special behaviour: closing the response body before it is all read is treated as a
Packit 63bb0d
	// retryable error. This is to allow callers to force a retry by closing the body from another goroutine (e.g. if the =
Packit 63bb0d
	// read is too slow, caller may want to force a retry in the hope that the retry will be quicker).  If
Packit 63bb0d
	// TreatEarlyCloseAsError is true, then retryReader's special behaviour is suppressed, and "read on closed body" is instead
Packit 63bb0d
	// treated as a fatal (non-retryable) error.
Packit 63bb0d
	// Note that setting TreatEarlyCloseAsError only guarantees that Closing will produce a fatal error if the Close happens
Packit 63bb0d
	// from the same "thread" (goroutine) as Read.  Concurrent Close calls from other goroutines may instead produce network errors
Packit 63bb0d
	// which will be retried.
Packit 63bb0d
	TreatEarlyCloseAsError bool
Packit Service 3a6627
Packit Service 3a6627
	ClientProvidedKeyOptions ClientProvidedKeyOptions
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// retryReader implements io.ReaderCloser methods.
Packit 63bb0d
// retryReader tries to read from response, and if there is retriable network error
Packit 63bb0d
// returned during reading, it will retry according to retry reader option through executing
Packit 63bb0d
// user defined action with provided data to get a new response, and continue the overall reading process
Packit 63bb0d
// through reading from the new response.
Packit 63bb0d
type retryReader struct {
Packit 63bb0d
	ctx             context.Context
Packit 63bb0d
	info            HTTPGetterInfo
Packit 63bb0d
	countWasBounded bool
Packit 63bb0d
	o               RetryReaderOptions
Packit 63bb0d
	getter          HTTPGetter
Packit 63bb0d
Packit 63bb0d
	// we support Close-ing during Reads (from other goroutines), so we protect the shared state, which is response
Packit 63bb0d
	responseMu *sync.Mutex
Packit 63bb0d
	response   *http.Response
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// NewRetryReader creates a retry reader.
Packit 63bb0d
func NewRetryReader(ctx context.Context, initialResponse *http.Response,
Packit 63bb0d
	info HTTPGetterInfo, o RetryReaderOptions, getter HTTPGetter) io.ReadCloser {
Packit 63bb0d
	return &retryReader{
Packit 63bb0d
		ctx:             ctx,
Packit 63bb0d
		getter:          getter,
Packit 63bb0d
		info:            info,
Packit 63bb0d
		countWasBounded: info.Count != CountToEnd,
Packit 63bb0d
		response:        initialResponse,
Packit 63bb0d
		responseMu:      &sync.Mutex{},
Packit 63bb0d
		o:               o}
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *retryReader) setResponse(r *http.Response) {
Packit 63bb0d
	s.responseMu.Lock()
Packit 63bb0d
	defer s.responseMu.Unlock()
Packit 63bb0d
	s.response = r
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
func (s *retryReader) Read(p []byte) (n int, err error) {
Packit 63bb0d
	for try := 0; ; try++ {
Packit 63bb0d
		//fmt.Println(try)       // Comment out for debugging.
Packit 63bb0d
		if s.countWasBounded && s.info.Count == CountToEnd {
Packit 63bb0d
			// User specified an original count and the remaining bytes are 0, return 0, EOF
Packit 63bb0d
			return 0, io.EOF
Packit 63bb0d
		}
Packit 63bb0d
Packit 63bb0d
		s.responseMu.Lock()
Packit 63bb0d
		resp := s.response
Packit 63bb0d
		s.responseMu.Unlock()
Packit 63bb0d
		if resp == nil { // We don't have a response stream to read from, try to get one.
Packit 63bb0d
			newResponse, err := s.getter(s.ctx, s.info)
Packit 63bb0d
			if err != nil {
Packit 63bb0d
				return 0, err
Packit 63bb0d
			}
Packit 63bb0d
			// Successful GET; this is the network stream we'll read from.
Packit 63bb0d
			s.setResponse(newResponse)
Packit 63bb0d
			resp = newResponse
Packit 63bb0d
		}
Packit 63bb0d
		n, err := resp.Body.Read(p) // Read from the stream (this will return non-nil err if forceRetry is called, from another goroutine, while it is running)
Packit 63bb0d
Packit 63bb0d
		// Injection mechanism for testing.
Packit 63bb0d
		if s.o.doInjectError && try == s.o.doInjectErrorRound {
Packit Service 3a6627
			if s.o.injectedError != nil {
Packit Service 3a6627
				err = s.o.injectedError
Packit Service 3a6627
			} else {
Packit Service 3a6627
				err = &net.DNSError{IsTemporary: true}
Packit Service 3a6627
			}
Packit 63bb0d
		}
Packit 63bb0d
Packit 63bb0d
		// We successfully read data or end EOF.
Packit 63bb0d
		if err == nil || err == io.EOF {
Packit 63bb0d
			s.info.Offset += int64(n) // Increments the start offset in case we need to make a new HTTP request in the future
Packit 63bb0d
			if s.info.Count != CountToEnd {
Packit 63bb0d
				s.info.Count -= int64(n) // Decrement the count in case we need to make a new HTTP request in the future
Packit 63bb0d
			}
Packit 63bb0d
			return n, err // Return the return to the caller
Packit 63bb0d
		}
Packit 63bb0d
		s.Close()          // Error, close stream
Packit 63bb0d
		s.setResponse(nil) // Our stream is no longer good
Packit 63bb0d
Packit 63bb0d
		// Check the retry count and error code, and decide whether to retry.
Packit 63bb0d
		retriesExhausted := try >= s.o.MaxRetryRequests
Packit 63bb0d
		_, isNetError := err.(net.Error)
Packit Service 3a6627
		isUnexpectedEOF := err == io.ErrUnexpectedEOF
Packit Service 3a6627
		willRetry := (isNetError || isUnexpectedEOF || s.wasRetryableEarlyClose(err)) && !retriesExhausted
Packit 63bb0d
Packit 63bb0d
		// Notify, for logging purposes, of any failures
Packit 63bb0d
		if s.o.NotifyFailedRead != nil {
Packit 63bb0d
			failureCount := try + 1 // because try is zero-based
Packit 63bb0d
			s.o.NotifyFailedRead(failureCount, err, s.info.Offset, s.info.Count, willRetry)
Packit 63bb0d
		}
Packit 63bb0d
Packit 63bb0d
		if willRetry {
Packit 63bb0d
			continue
Packit 63bb0d
			// Loop around and try to get and read from new stream.
Packit 63bb0d
		}
Packit 63bb0d
		return n, err // Not retryable, or retries exhausted, so just return
Packit 63bb0d
	}
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
// By default, we allow early Closing, from another concurrent goroutine, to be used to force a retry
Packit 63bb0d
// Is this safe, to close early from another goroutine?  Early close ultimately ends up calling
Packit 63bb0d
// net.Conn.Close, and that is documented as "Any blocked Read or Write operations will be unblocked and return errors"
Packit 63bb0d
// which is exactly the behaviour we want.
Packit 63bb0d
// NOTE: that if caller has forced an early Close from a separate goroutine (separate from the Read)
Packit 63bb0d
// then there are two different types of error that may happen - either the one one we check for here,
Packit 63bb0d
// or a net.Error (due to closure of connection). Which one happens depends on timing. We only need this routine
Packit 63bb0d
// to check for one, since the other is a net.Error, which our main Read retry loop is already handing.
Packit 63bb0d
func (s *retryReader) wasRetryableEarlyClose(err error) bool {
Packit 63bb0d
	if s.o.TreatEarlyCloseAsError {
Packit 63bb0d
		return false // user wants all early closes to be errors, and so not retryable
Packit 63bb0d
	}
Packit 63bb0d
	// unfortunately, http.errReadOnClosedResBody is private, so the best we can do here is to check for its text
Packit 63bb0d
	return strings.HasSuffix(err.Error(), ReadOnClosedBodyMessage)
Packit 63bb0d
}
Packit 63bb0d
Packit 63bb0d
const ReadOnClosedBodyMessage = "read on closed response body"
Packit 63bb0d
Packit 63bb0d
func (s *retryReader) Close() error {
Packit 63bb0d
	s.responseMu.Lock()
Packit 63bb0d
	defer s.responseMu.Unlock()
Packit 63bb0d
	if s.response != nil && s.response.Body != nil {
Packit 63bb0d
		return s.response.Body.Close()
Packit 63bb0d
	}
Packit 63bb0d
	return nil
Packit 63bb0d
}