diff --git a/vendor.mod b/vendor.mod index 4ff13ebd61..cd04219c02 100644 --- a/vendor.mod +++ b/vendor.mod @@ -43,7 +43,7 @@ require ( github.com/hashicorp/go-immutable-radix v1.3.1 github.com/hashicorp/go-memdb v1.3.2 github.com/hashicorp/memberlist v0.2.4 - github.com/hashicorp/serf v0.8.2 + github.com/hashicorp/serf v0.8.5 github.com/imdario/mergo v0.3.12 github.com/ishidawataru/sctp v0.0.0-20210707070123-9a39160e9062 github.com/klauspost/compress v1.15.1 diff --git a/vendor.sum b/vendor.sum index 6e4ad9f996..3c552e20a7 100644 --- a/vendor.sum +++ b/vendor.sum @@ -640,8 +640,9 @@ github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0m github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/memberlist v0.2.4 h1:OOhYzSvFnkFQXm1ysE8RjXTHsqSRDyP4emusC9K7DYg= github.com/hashicorp/memberlist v0.2.4/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= +github.com/hashicorp/serf v0.8.5 h1:ZynDUIQiA8usmRgPdGPHFdPnb1wgGI9tK3mO9hcAJjc= +github.com/hashicorp/serf v0.8.5/go.mod h1:UpNcs7fFbpKIyZaUuSW6EPiH+eZC7OuyFD+wc1oal+k= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hugelgupf/socketpair v0.0.0-20190730060125-05d35a94e714/go.mod h1:2Goc3h8EklBH5mspfHFxBnEoURQCGzQQH1ga9Myjvis= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= diff --git a/vendor/github.com/hashicorp/serf/serf/config.go b/vendor/github.com/hashicorp/serf/serf/config.go index 79f36f57c7..0de4247c5b 100644 --- a/vendor/github.com/hashicorp/serf/serf/config.go +++ b/vendor/github.com/hashicorp/serf/serf/config.go @@ -242,6 +242,10 @@ type Config struct { // Merge can be optionally provided to intercept a cluster merge // and conditionally abort the merge. Merge MergeDelegate + + // UserEventSizeLimit is maximum byte size limit of user event `name` + `payload` in bytes. + // It's optimal to be relatively small, since it's going to be gossiped through the cluster. + UserEventSizeLimit int } // Init allocates the subdata structures @@ -282,5 +286,6 @@ func DefaultConfig() *Config { QuerySizeLimit: 1024, EnableNameConflictResolution: true, DisableCoordinates: false, + UserEventSizeLimit: 512, } } diff --git a/vendor/github.com/hashicorp/serf/serf/messages.go b/vendor/github.com/hashicorp/serf/serf/messages.go index 20df5b8e83..138817c50e 100644 --- a/vendor/github.com/hashicorp/serf/serf/messages.go +++ b/vendor/github.com/hashicorp/serf/serf/messages.go @@ -55,6 +55,7 @@ type messageJoin struct { type messageLeave struct { LTime LamportTime Node string + Prune bool } // messagePushPullType is used when doing a state exchange. This diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index bb6c22fe7b..9fe4cc3a71 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -223,8 +223,8 @@ type queries struct { } const ( - UserEventSizeLimit = 512 // Maximum byte size for event name and payload snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot + UserEventSizeLimit = 9 * 1024 // Maximum 9KB for event name and payload ) // Create creates a new Serf instance, starting all the background tasks @@ -242,6 +242,10 @@ func Create(conf *Config) (*Serf, error) { conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) } + if conf.UserEventSizeLimit > UserEventSizeLimit { + return nil, fmt.Errorf("user event size limit exceeds limit of %d bytes", UserEventSizeLimit) + } + logger := conf.Logger if logger == nil { logOutput := conf.LogOutput @@ -437,14 +441,25 @@ func (s *Serf) KeyManager() *KeyManager { } // UserEvent is used to broadcast a custom user event with a given -// name and payload. The events must be fairly small, and if the -// size limit is exceeded and error will be returned. If coalesce is enabled, -// nodes are allowed to coalesce this event. Coalescing is only available -// starting in v0.2 +// name and payload. If the configured size limit is exceeded and error will be returned. +// If coalesce is enabled, nodes are allowed to coalesce this event. +// Coalescing is only available starting in v0.2 func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error { - // Check the size limit - if len(name)+len(payload) > UserEventSizeLimit { - return fmt.Errorf("user event exceeds limit of %d bytes", UserEventSizeLimit) + payloadSizeBeforeEncoding := len(name) + len(payload) + + // Check size before encoding to prevent needless encoding and return early if it's over the specified limit. + if payloadSizeBeforeEncoding > s.config.UserEventSizeLimit { + return fmt.Errorf( + "user event exceeds configured limit of %d bytes before encoding", + s.config.UserEventSizeLimit, + ) + } + + if payloadSizeBeforeEncoding > UserEventSizeLimit { + return fmt.Errorf( + "user event exceeds sane limit of %d bytes before encoding", + UserEventSizeLimit, + ) } // Create a message @@ -454,16 +469,34 @@ func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error { Payload: payload, CC: coalesce, } - s.eventClock.Increment() - - // Process update locally - s.handleUserEvent(&msg) // Start broadcasting the event raw, err := encodeMessage(messageUserEventType, &msg) if err != nil { return err } + + // Check the size after encoding to be sure again that + // we're not attempting to send over the specified size limit. + if len(raw) > s.config.UserEventSizeLimit { + return fmt.Errorf( + "encoded user event exceeds configured limit of %d bytes after encoding", + s.config.UserEventSizeLimit, + ) + } + + if len(raw) > UserEventSizeLimit { + return fmt.Errorf( + "encoded user event exceeds sane limit of %d bytes before encoding", + UserEventSizeLimit, + ) + } + + s.eventClock.Increment() + + // Process update locally + s.handleUserEvent(&msg) + s.eventBroadcasts.QueueBroadcast(&broadcast{ msg: raw, }) @@ -748,15 +781,26 @@ func (s *Serf) Members() []Member { return members } -// RemoveFailedNode forcibly removes a failed node from the cluster +// RemoveFailedNode is a backwards compatible form +// of forceleave +func (s *Serf) RemoveFailedNode(node string) error { + return s.forceLeave(node, false) +} + +func (s *Serf) RemoveFailedNodePrune(node string) error { + return s.forceLeave(node, true) +} + +// ForceLeave forcibly removes a failed node from the cluster // immediately, instead of waiting for the reaper to eventually reclaim it. // This also has the effect that Serf will no longer attempt to reconnect // to this node. -func (s *Serf) RemoveFailedNode(node string) error { +func (s *Serf) forceLeave(node string, prune bool) error { // Construct the message to broadcast msg := messageLeave{ LTime: s.clock.Time(), Node: node, + Prune: prune, } s.clock.Increment() @@ -1027,6 +1071,7 @@ func (s *Serf) handleNodeUpdate(n *memberlist.Node) { // handleNodeLeaveIntent is called when an intent to leave is received. func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { + // Witness a potentially newer time s.clock.Witness(leaveMsg.LTime) @@ -1057,6 +1102,10 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { case StatusAlive: member.Status = StatusLeaving member.statusLTime = leaveMsg.LTime + + if leaveMsg.Prune { + s.handlePrune(member) + } return true case StatusFailed: member.Status = StatusLeft @@ -1065,6 +1114,7 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { // Remove from the failed list and add to the left list. We add // to the left list so that when we do a sync, other nodes will // remove it from their failed list. + s.failedMembers = removeOldMember(s.failedMembers, member.Name) s.leftMembers = append(s.leftMembers, member) @@ -1079,12 +1129,40 @@ func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { Members: []Member{member.Member}, } } + + if leaveMsg.Prune { + s.handlePrune(member) + } + + return true + + case StatusLeaving, StatusLeft: + if leaveMsg.Prune { + s.handlePrune(member) + } return true default: return false } } +// handlePrune waits for nodes that are leaving and then forcibly +// erases a member from the list of members +func (s *Serf) handlePrune(member *memberState) { + if member.Status == StatusLeaving { + time.Sleep(s.config.BroadcastTimeout + s.config.LeavePropagateDelay) + } + + s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr) + + //If we are leaving or left we may be in that list of members + if member.Status == StatusLeaving || member.Status == StatusLeft { + s.leftMembers = removeOldMember(s.leftMembers, member.Name) + } + s.eraseNode(member) + +} + // handleNodeJoinIntent is called when a node broadcasts a // join message to set the lamport time of its join func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool { @@ -1405,6 +1483,30 @@ func (s *Serf) resolveNodeConflict() { } } +//eraseNode takes a node completely out of the member list +func (s *Serf) eraseNode(m *memberState) { + // Delete from members + delete(s.members, m.Name) + + // Tell the coordinate client the node has gone away and delete + // its cached coordinates. + if !s.config.DisableCoordinates { + s.coordClient.ForgetNode(m.Name) + + s.coordCacheLock.Lock() + delete(s.coordCache, m.Name) + s.coordCacheLock.Unlock() + } + + // Send an event along + if s.config.EventCh != nil { + s.config.EventCh <- MemberEvent{ + Type: EventMemberReap, + Members: []Member{m.Member}, + } + } +} + // handleReap periodically reaps the list of failed and left members, as well // as old buffered intents. func (s *Serf) handleReap() { @@ -1455,27 +1557,10 @@ func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) [] n-- i-- - // Delete from members - delete(s.members, m.Name) - - // Tell the coordinate client the node has gone away and delete - // its cached coordinates. - if !s.config.DisableCoordinates { - s.coordClient.ForgetNode(m.Name) - - s.coordCacheLock.Lock() - delete(s.coordCache, m.Name) - s.coordCacheLock.Unlock() - } - - // Send an event along + // Delete from members and send out event s.logger.Printf("[INFO] serf: EventMemberReap: %s", m.Name) - if s.config.EventCh != nil { - s.config.EventCh <- MemberEvent{ - Type: EventMemberReap, - Members: []Member{m.Member}, - } - } + s.eraseNode(m) + } return old diff --git a/vendor/modules.txt b/vendor/modules.txt index 65986f6534..48864d0a76 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -432,7 +432,7 @@ github.com/hashicorp/golang-lru/simplelru # github.com/hashicorp/memberlist v0.2.4 ## explicit; go 1.12 github.com/hashicorp/memberlist -# github.com/hashicorp/serf v0.8.2 +# github.com/hashicorp/serf v0.8.5 ## explicit github.com/hashicorp/serf/coordinate github.com/hashicorp/serf/serf