|
Packit |
63bb0d |
package eventstream
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
import (
|
|
Packit |
63bb0d |
"bytes"
|
|
Packit |
63bb0d |
"encoding/binary"
|
|
Packit |
63bb0d |
"hash"
|
|
Packit |
63bb0d |
"hash/crc32"
|
|
Packit |
63bb0d |
"io"
|
|
Packit |
63bb0d |
)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Encoder provides EventStream message encoding.
|
|
Packit |
63bb0d |
type Encoder struct {
|
|
Packit |
63bb0d |
w io.Writer
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
headersBuf *bytes.Buffer
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// NewEncoder initializes and returns an Encoder to encode Event Stream
|
|
Packit |
63bb0d |
// messages to an io.Writer.
|
|
Packit |
63bb0d |
func NewEncoder(w io.Writer) *Encoder {
|
|
Packit |
63bb0d |
return &Encoder{
|
|
Packit |
63bb0d |
w: w,
|
|
Packit |
63bb0d |
headersBuf: bytes.NewBuffer(nil),
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
// Encode encodes a single EventStream message to the io.Writer the Encoder
|
|
Packit |
63bb0d |
// was created with. An error is returned if writing the message fails.
|
|
Packit |
63bb0d |
func (e *Encoder) Encode(msg Message) error {
|
|
Packit |
63bb0d |
e.headersBuf.Reset()
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
err := encodeHeaders(e.headersBuf, msg.Headers)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
crc := crc32.New(crc32IEEETable)
|
|
Packit |
63bb0d |
hashWriter := io.MultiWriter(e.w, crc)
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
headersLen := uint32(e.headersBuf.Len())
|
|
Packit |
63bb0d |
payloadLen := uint32(len(msg.Payload))
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if err := encodePrelude(hashWriter, crc, headersLen, payloadLen); err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if headersLen > 0 {
|
|
Packit |
63bb0d |
if _, err := io.Copy(hashWriter, e.headersBuf); err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if payloadLen > 0 {
|
|
Packit |
63bb0d |
if _, err := hashWriter.Write(msg.Payload); err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
msgCRC := crc.Sum32()
|
|
Packit |
63bb0d |
return binary.Write(e.w, binary.BigEndian, msgCRC)
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func encodePrelude(w io.Writer, crc hash.Hash32, headersLen, payloadLen uint32) error {
|
|
Packit |
63bb0d |
p := messagePrelude{
|
|
Packit |
63bb0d |
Length: minMsgLen + headersLen + payloadLen,
|
|
Packit |
63bb0d |
HeadersLen: headersLen,
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
if err := p.ValidateLens(); err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
err := binaryWriteFields(w, binary.BigEndian,
|
|
Packit |
63bb0d |
p.Length,
|
|
Packit |
63bb0d |
p.HeadersLen,
|
|
Packit |
63bb0d |
)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
p.PreludeCRC = crc.Sum32()
|
|
Packit |
63bb0d |
err = binary.Write(w, binary.BigEndian, p.PreludeCRC)
|
|
Packit |
63bb0d |
if err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func encodeHeaders(w io.Writer, headers Headers) error {
|
|
Packit |
63bb0d |
for _, h := range headers {
|
|
Packit |
63bb0d |
hn := headerName{
|
|
Packit |
63bb0d |
Len: uint8(len(h.Name)),
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
copy(hn.Name[:hn.Len], h.Name)
|
|
Packit |
63bb0d |
if err := hn.encode(w); err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
if err := h.Value.encode(w); err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
return nil
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
|
|
Packit |
63bb0d |
func binaryWriteFields(w io.Writer, order binary.ByteOrder, vs ...interface{}) error {
|
|
Packit |
63bb0d |
for _, v := range vs {
|
|
Packit |
63bb0d |
if err := binary.Write(w, order, v); err != nil {
|
|
Packit |
63bb0d |
return err
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
}
|
|
Packit |
63bb0d |
return nil
|
|
Packit |
63bb0d |
}
|