Blob Blame History Raw
package eventstream

import (
	"encoding/base64"
	"encoding/binary"
	"fmt"
	"io"
	"strconv"
	"time"
)

const maxHeaderValueLen = 1<<15 - 1 // 2^15-1 or 32KB - 1

// valueType is the EventStream header value type.
type valueType uint8

// Header value types
const (
	trueValueType valueType = iota
	falseValueType
	int8ValueType  // Byte
	int16ValueType // Short
	int32ValueType // Integer
	int64ValueType // Long
	bytesValueType
	stringValueType
	timestampValueType
	uuidValueType
)

func (t valueType) String() string {
	switch t {
	case trueValueType:
		return "bool"
	case falseValueType:
		return "bool"
	case int8ValueType:
		return "int8"
	case int16ValueType:
		return "int16"
	case int32ValueType:
		return "int32"
	case int64ValueType:
		return "int64"
	case bytesValueType:
		return "byte_array"
	case stringValueType:
		return "string"
	case timestampValueType:
		return "timestamp"
	case uuidValueType:
		return "uuid"
	default:
		return fmt.Sprintf("unknown value type %d", uint8(t))
	}
}

type rawValue struct {
	Type  valueType
	Len   uint16 // Only set for variable length slices
	Value []byte // byte representation of value, BigEndian encoding.
}

func (r rawValue) encodeScalar(w io.Writer, v interface{}) error {
	return binaryWriteFields(w, binary.BigEndian,
		r.Type,
		v,
	)
}

func (r rawValue) encodeFixedSlice(w io.Writer, v []byte) error {
	binary.Write(w, binary.BigEndian, r.Type)

	_, err := w.Write(v)
	return err
}

func (r rawValue) encodeBytes(w io.Writer, v []byte) error {
	if len(v) > maxHeaderValueLen {
		return LengthError{
			Part: "header value",
			Want: maxHeaderValueLen, Have: len(v),
			Value: v,
		}
	}
	r.Len = uint16(len(v))

	err := binaryWriteFields(w, binary.BigEndian,
		r.Type,
		r.Len,
	)
	if err != nil {
		return err
	}

	_, err = w.Write(v)
	return err
}

func (r rawValue) encodeString(w io.Writer, v string) error {
	if len(v) > maxHeaderValueLen {
		return LengthError{
			Part: "header value",
			Want: maxHeaderValueLen, Have: len(v),
			Value: v,
		}
	}
	r.Len = uint16(len(v))

	type stringWriter interface {
		WriteString(string) (int, error)
	}

	err := binaryWriteFields(w, binary.BigEndian,
		r.Type,
		r.Len,
	)
	if err != nil {
		return err
	}

	if sw, ok := w.(stringWriter); ok {
		_, err = sw.WriteString(v)
	} else {
		_, err = w.Write([]byte(v))
	}

	return err
}

func decodeFixedBytesValue(r io.Reader, buf []byte) error {
	_, err := io.ReadFull(r, buf)
	return err
}

func decodeBytesValue(r io.Reader) ([]byte, error) {
	var raw rawValue
	var err error
	raw.Len, err = decodeUint16(r)
	if err != nil {
		return nil, err
	}

	buf := make([]byte, raw.Len)
	_, err = io.ReadFull(r, buf)
	if err != nil {
		return nil, err
	}

	return buf, nil
}

func decodeStringValue(r io.Reader) (string, error) {
	v, err := decodeBytesValue(r)
	return string(v), err
}

// Value represents the abstract header value.
type Value interface {
	Get() interface{}
	String() string
	valueType() valueType
	encode(io.Writer) error
}

// An BoolValue provides eventstream encoding, and representation
// of a Go bool value.
type BoolValue bool

// Get returns the underlying type
func (v BoolValue) Get() interface{} {
	return bool(v)
}

// valueType returns the EventStream header value type value.
func (v BoolValue) valueType() valueType {
	if v {
		return trueValueType
	}
	return falseValueType
}

func (v BoolValue) String() string {
	return strconv.FormatBool(bool(v))
}

// encode encodes the BoolValue into an eventstream binary value
// representation.
func (v BoolValue) encode(w io.Writer) error {
	return binary.Write(w, binary.BigEndian, v.valueType())
}

// An Int8Value provides eventstream encoding, and representation of a Go
// int8 value.
type Int8Value int8

// Get returns the underlying value.
func (v Int8Value) Get() interface{} {
	return int8(v)
}

// valueType returns the EventStream header value type value.
func (Int8Value) valueType() valueType {
	return int8ValueType
}

func (v Int8Value) String() string {
	return fmt.Sprintf("0x%02x", int8(v))
}

// encode encodes the Int8Value into an eventstream binary value
// representation.
func (v Int8Value) encode(w io.Writer) error {
	raw := rawValue{
		Type: v.valueType(),
	}

	return raw.encodeScalar(w, v)
}

func (v *Int8Value) decode(r io.Reader) error {
	n, err := decodeUint8(r)
	if err != nil {
		return err
	}

	*v = Int8Value(n)
	return nil
}

// An Int16Value provides eventstream encoding, and representation of a Go
// int16 value.
type Int16Value int16

// Get returns the underlying value.
func (v Int16Value) Get() interface{} {
	return int16(v)
}

// valueType returns the EventStream header value type value.
func (Int16Value) valueType() valueType {
	return int16ValueType
}

func (v Int16Value) String() string {
	return fmt.Sprintf("0x%04x", int16(v))
}

// encode encodes the Int16Value into an eventstream binary value
// representation.
func (v Int16Value) encode(w io.Writer) error {
	raw := rawValue{
		Type: v.valueType(),
	}
	return raw.encodeScalar(w, v)
}

func (v *Int16Value) decode(r io.Reader) error {
	n, err := decodeUint16(r)
	if err != nil {
		return err
	}

	*v = Int16Value(n)
	return nil
}

// An Int32Value provides eventstream encoding, and representation of a Go
// int32 value.
type Int32Value int32

// Get returns the underlying value.
func (v Int32Value) Get() interface{} {
	return int32(v)
}

// valueType returns the EventStream header value type value.
func (Int32Value) valueType() valueType {
	return int32ValueType
}

func (v Int32Value) String() string {
	return fmt.Sprintf("0x%08x", int32(v))
}

// encode encodes the Int32Value into an eventstream binary value
// representation.
func (v Int32Value) encode(w io.Writer) error {
	raw := rawValue{
		Type: v.valueType(),
	}
	return raw.encodeScalar(w, v)
}

func (v *Int32Value) decode(r io.Reader) error {
	n, err := decodeUint32(r)
	if err != nil {
		return err
	}

	*v = Int32Value(n)
	return nil
}

// An Int64Value provides eventstream encoding, and representation of a Go
// int64 value.
type Int64Value int64

// Get returns the underlying value.
func (v Int64Value) Get() interface{} {
	return int64(v)
}

// valueType returns the EventStream header value type value.
func (Int64Value) valueType() valueType {
	return int64ValueType
}

func (v Int64Value) String() string {
	return fmt.Sprintf("0x%016x", int64(v))
}

// encode encodes the Int64Value into an eventstream binary value
// representation.
func (v Int64Value) encode(w io.Writer) error {
	raw := rawValue{
		Type: v.valueType(),
	}
	return raw.encodeScalar(w, v)
}

func (v *Int64Value) decode(r io.Reader) error {
	n, err := decodeUint64(r)
	if err != nil {
		return err
	}

	*v = Int64Value(n)
	return nil
}

// An BytesValue provides eventstream encoding, and representation of a Go
// byte slice.
type BytesValue []byte

// Get returns the underlying value.
func (v BytesValue) Get() interface{} {
	return []byte(v)
}

// valueType returns the EventStream header value type value.
func (BytesValue) valueType() valueType {
	return bytesValueType
}

func (v BytesValue) String() string {
	return base64.StdEncoding.EncodeToString([]byte(v))
}

// encode encodes the BytesValue into an eventstream binary value
// representation.
func (v BytesValue) encode(w io.Writer) error {
	raw := rawValue{
		Type: v.valueType(),
	}

	return raw.encodeBytes(w, []byte(v))
}

func (v *BytesValue) decode(r io.Reader) error {
	buf, err := decodeBytesValue(r)
	if err != nil {
		return err
	}

	*v = BytesValue(buf)
	return nil
}

// An StringValue provides eventstream encoding, and representation of a Go
// string.
type StringValue string

// Get returns the underlying value.
func (v StringValue) Get() interface{} {
	return string(v)
}

// valueType returns the EventStream header value type value.
func (StringValue) valueType() valueType {
	return stringValueType
}

func (v StringValue) String() string {
	return string(v)
}

// encode encodes the StringValue into an eventstream binary value
// representation.
func (v StringValue) encode(w io.Writer) error {
	raw := rawValue{
		Type: v.valueType(),
	}

	return raw.encodeString(w, string(v))
}

func (v *StringValue) decode(r io.Reader) error {
	s, err := decodeStringValue(r)
	if err != nil {
		return err
	}

	*v = StringValue(s)
	return nil
}

// An TimestampValue provides eventstream encoding, and representation of a Go
// timestamp.
type TimestampValue time.Time

// Get returns the underlying value.
func (v TimestampValue) Get() interface{} {
	return time.Time(v)
}

// valueType returns the EventStream header value type value.
func (TimestampValue) valueType() valueType {
	return timestampValueType
}

func (v TimestampValue) epochMilli() int64 {
	nano := time.Time(v).UnixNano()
	msec := nano / int64(time.Millisecond)
	return msec
}

func (v TimestampValue) String() string {
	msec := v.epochMilli()
	return strconv.FormatInt(msec, 10)
}

// encode encodes the TimestampValue into an eventstream binary value
// representation.
func (v TimestampValue) encode(w io.Writer) error {
	raw := rawValue{
		Type: v.valueType(),
	}

	msec := v.epochMilli()
	return raw.encodeScalar(w, msec)
}

func (v *TimestampValue) decode(r io.Reader) error {
	n, err := decodeUint64(r)
	if err != nil {
		return err
	}

	*v = TimestampValue(timeFromEpochMilli(int64(n)))
	return nil
}

func timeFromEpochMilli(t int64) time.Time {
	secs := t / 1e3
	msec := t % 1e3
	return time.Unix(secs, msec*int64(time.Millisecond)).UTC()
}

// An UUIDValue provides eventstream encoding, and representation of a UUID
// value.
type UUIDValue [16]byte

// Get returns the underlying value.
func (v UUIDValue) Get() interface{} {
	return v[:]
}

// valueType returns the EventStream header value type value.
func (UUIDValue) valueType() valueType {
	return uuidValueType
}

func (v UUIDValue) String() string {
	return fmt.Sprintf(`%X-%X-%X-%X-%X`, v[0:4], v[4:6], v[6:8], v[8:10], v[10:])
}

// encode encodes the UUIDValue into an eventstream binary value
// representation.
func (v UUIDValue) encode(w io.Writer) error {
	raw := rawValue{
		Type: v.valueType(),
	}

	return raw.encodeFixedSlice(w, v[:])
}

func (v *UUIDValue) decode(r io.Reader) error {
	tv := (*v)[:]
	return decodeFixedBytesValue(r, tv)
}