package eventstreamapi import ( "fmt" "io" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/private/protocol" "github.com/aws/aws-sdk-go/private/protocol/eventstream" ) // Unmarshaler provides the interface for unmarshaling a EventStream // message into a SDK type. type Unmarshaler interface { UnmarshalEvent(protocol.PayloadUnmarshaler, eventstream.Message) error } // EventStream headers with specific meaning to async API functionality. const ( MessageTypeHeader = `:message-type` // Identifies type of message. EventMessageType = `event` ErrorMessageType = `error` ExceptionMessageType = `exception` // Message Events EventTypeHeader = `:event-type` // Identifies message event type e.g. "Stats". // Message Error ErrorCodeHeader = `:error-code` ErrorMessageHeader = `:error-message` // Message Exception ExceptionTypeHeader = `:exception-type` ) // EventReader provides reading from the EventStream of an reader. type EventReader struct { reader io.ReadCloser decoder *eventstream.Decoder unmarshalerForEventType func(string) (Unmarshaler, error) payloadUnmarshaler protocol.PayloadUnmarshaler payloadBuf []byte } // NewEventReader returns a EventReader built from the reader and unmarshaler // provided. Use ReadStream method to start reading from the EventStream. func NewEventReader( reader io.ReadCloser, payloadUnmarshaler protocol.PayloadUnmarshaler, unmarshalerForEventType func(string) (Unmarshaler, error), ) *EventReader { return &EventReader{ reader: reader, decoder: eventstream.NewDecoder(reader), payloadUnmarshaler: payloadUnmarshaler, unmarshalerForEventType: unmarshalerForEventType, payloadBuf: make([]byte, 10*1024), } } // UseLogger instructs the EventReader to use the logger and log level // specified. func (r *EventReader) UseLogger(logger aws.Logger, logLevel aws.LogLevelType) { if logger != nil && logLevel.Matches(aws.LogDebugWithEventStreamBody) { r.decoder.UseLogger(logger) } } // ReadEvent attempts to read a message from the EventStream and return the // unmarshaled event value that the message is for. // // For EventStream API errors check if the returned error satisfies the // awserr.Error interface to get the error's Code and Message components. // // EventUnmarshalers called with EventStream messages must take copies of the // message's Payload. The payload will is reused between events read. func (r *EventReader) ReadEvent() (event interface{}, err error) { msg, err := r.decoder.Decode(r.payloadBuf) if err != nil { return nil, err } defer func() { // Reclaim payload buffer for next message read. r.payloadBuf = msg.Payload[0:0] }() typ, err := GetHeaderString(msg, MessageTypeHeader) if err != nil { return nil, err } switch typ { case EventMessageType: return r.unmarshalEventMessage(msg) case ExceptionMessageType: err = r.unmarshalEventException(msg) return nil, err case ErrorMessageType: return nil, r.unmarshalErrorMessage(msg) default: return nil, fmt.Errorf("unknown eventstream message type, %v", typ) } } func (r *EventReader) unmarshalEventMessage( msg eventstream.Message, ) (event interface{}, err error) { eventType, err := GetHeaderString(msg, EventTypeHeader) if err != nil { return nil, err } ev, err := r.unmarshalerForEventType(eventType) if err != nil { return nil, err } err = ev.UnmarshalEvent(r.payloadUnmarshaler, msg) if err != nil { return nil, err } return ev, nil } func (r *EventReader) unmarshalEventException( msg eventstream.Message, ) (err error) { eventType, err := GetHeaderString(msg, ExceptionTypeHeader) if err != nil { return err } ev, err := r.unmarshalerForEventType(eventType) if err != nil { return err } err = ev.UnmarshalEvent(r.payloadUnmarshaler, msg) if err != nil { return err } var ok bool err, ok = ev.(error) if !ok { err = messageError{ code: "SerializationError", msg: fmt.Sprintf( "event stream exception %s mapped to non-error %T, %v", eventType, ev, ev, ), } } return err } func (r *EventReader) unmarshalErrorMessage(msg eventstream.Message) (err error) { var msgErr messageError msgErr.code, err = GetHeaderString(msg, ErrorCodeHeader) if err != nil { return err } msgErr.msg, err = GetHeaderString(msg, ErrorMessageHeader) if err != nil { return err } return msgErr } // Close closes the EventReader's EventStream reader. func (r *EventReader) Close() error { return r.reader.Close() } // GetHeaderString returns the value of the header as a string. If the header // is not set or the value is not a string an error will be returned. func GetHeaderString(msg eventstream.Message, headerName string) (string, error) { headerVal := msg.Headers.Get(headerName) if headerVal == nil { return "", fmt.Errorf("error header %s not present", headerName) } v, ok := headerVal.Get().(string) if !ok { return "", fmt.Errorf("error header value is not a string, %T", headerVal) } return v, nil }